What is the Producer-Consumer problem?

Background and introduction

The Producer-Consumer problem is a classic synchronization problem in operating systems.

The problem is defined as follows: there is a fixed-size buffer and a Producer process, and a Consumer process.

The Producer process creates an item and adds it to the shared buffer. The Consumer process takes items out of the shared buffer and “consumes” them.

The tricky part

Certain conditions must be met by the Producer and the Consumer processes to have consistent data synchronization:

  1. The Producer process must not produce an item if the shared buffer is full.

  2. The Consumer process must not consume an item if the shared buffer is empty.

  3. Access to the shared buffer must be mutually exclusive; this means that at any given instance, only one process should be able to access the shared buffer and make changes to it.

Solution

The solution to the Producer-Consumer problem involves three semaphore variables.

  • semaphore Full: Tracks the space filled by the Producer process. It is initialized with a value of 00 as the buffer will have 00 filled spaces at the beginning
  • semaphore Empty: Tracks the empty space in the buffer. It is initially set to buffer_size as the whole buffer is empty at the beginning.
  • semaphore mutex: Used for mutual exclusion so that only one process can access the shared buffer at a time.

Using the signal() and wait() operations on these semaphores, we can arrive at a solution.

Let’s look at the code for the Producer and Consumer processes.

void Producer(){
    while(true){
        // Produce an item
        wait(Empty);
        wait(mutex);
        add();
        signal(mutex);
        signal(Full);
    }
}

In the code above, the Producer process waits for the Empty semaphore. This means that the Producer process is kept in busy-waiting if the Empty semaphore value is 00, indicating that there are 00 empty spaces available. The Producer will have to wait for the Consumer to consume some items from the buffer and make some space available for itself.

The Producer then waits for the mutex semaphore, which merely ensures that once a thread has entered the critical section of the code, the rest of the threads cannot access it and cause race conditions.

The add() function appends the item to the shared buffer. Once a Producer process reaches this point in the code, it is guaranteed that no other process is accessing the shared buffer concurrently, preventing data inconsistency.

After the Producer process adds the item to the shared buffer, it uses the signal() operation to increase the value of the mutex semaphore by one, thereby allowing any other threads which were busy-waiting in the mutex semaphore to access the critical section.

Lastly, the Producer process uses the signal() operation on the Full semaphore, increasing its value by 11, indicating that an item has been added to the shared buffer and the count for the filled spaces has increased by one.

The code for the Consumer process is as follows.

void Consumer(){
    while(true){
        wait(Full);
        wait(mutex);
        consume();
        signal(mutex);
        signal(Empty)
    }
}

The Consumer waits for the Full semaphore. If the Full semaphore value is 0, it indicates that there are no items to consume, and it must wait for the Producer process to produce an item and add it to the shared buffer for consumption.

As previously mentioned, the mutex semaphore ensures mutually exclusive access to the critical section of the code so that the shared buffer is only accessed by one thread at a time for data synchronization.

Once the Consumer process reaches the critical section of the code, i.e., the consume() function, it executes the function and takes one item from the shared buffer.

After taking an item from the buffer, the Consumer process first uses signal(mutex) to release the mutex semaphore, allowing other threads that may have been busy-waiting in the mutex to access the critical section.

Lastly, the Consumer uses signal(Empty) to increase the value of the Empty semaphore by one, indicating that a free slot has been made in the shared buffer. Any Producer processes that may have been waiting in the Empty semaphore are now allowed to add an item to the shared buffer.

Coding example

Now, let's enhance our understanding with an executable example. Click the "Run" to execute the below example.

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h> // For sleep function
#include <cstdlib> // For rand function
#define BUFFER_SIZE 5 // Define the size of the buffer
#define NUM_PRODUCERS 2 // Number of producer threads
#define NUM_CONSUMERS 2 // Number of consumer threads
#define MAX_ITEMS 20 // Maximum items to produce/consume
sem_t full, empty, mutex; // Define semaphores
bool running = true; // Global flag to control thread termination
int buffer[BUFFER_SIZE]; // Shared buffer
int in = 0; // Index to add an item
int out = 0; // Index to remove an item
void add(int item) {
buffer[in] = item; // Adding an item to the buffer
in = (in + 1) % BUFFER_SIZE;
}
int consume() {
int item = buffer[out]; // Consuming an item from the buffer
buffer[out] = 0; // Reset buffer slot
out = (out + 1) % BUFFER_SIZE;
return item;
}
void *producer(void *arg) {
int producer_id = *((int*)arg);
while (running) {
int item = rand() % 100 + 1; // Produce an item (random number between 1 and 100)
sleep(1); // Simulate production time
sem_wait(&empty); // Wait for space in the buffer
sem_wait(&mutex); // Acquire the mutex to access the buffer
add(item); // Add item to the buffer
sem_post(&mutex); // Release the mutex
sem_post(&full); // Increment the full count
std::cout << "Producer " << producer_id << " produced item: " << item << std::endl;
}
pthread_exit(NULL);
}
void *consumer(void *arg) {
int consumer_id = *((int*)arg);
while (running) {
sleep(1); // Simulate consumption time
sem_wait(&full); // Wait for an item to be available
sem_wait(&mutex); // Acquire the mutex to access the buffer
int item = consume(); // Consume item from the buffer
sem_post(&mutex); // Release the mutex
sem_post(&empty); // Increment the empty count
std::cout << "Consumer " << consumer_id << " consumed item: " << item << std::endl;
}
pthread_exit(NULL);
}
int main() {
// Initialize semaphores
sem_init(&full, 0, 0);
sem_init(&empty, 0, BUFFER_SIZE);
sem_init(&mutex, 0, 1);
// Create producer threads
pthread_t producerThreads[NUM_PRODUCERS];
int producer_ids[NUM_PRODUCERS];
for (int i = 0; i < NUM_PRODUCERS; ++i) {
producer_ids[i] = i + 1;
pthread_create(&producerThreads[i], NULL, producer, &producer_ids[i]);
}
// Create consumer threads
pthread_t consumerThreads[NUM_CONSUMERS];
int consumer_ids[NUM_CONSUMERS];
for (int i = 0; i < NUM_CONSUMERS; ++i) {
consumer_ids[i] = i + 1;
pthread_create(&consumerThreads[i], NULL, consumer, &consumer_ids[i]);
}
// Let the program run for a while
sleep(10);
// Set running flag to false to terminate threads
running = false;
// Join producer threads
for (int i = 0; i < NUM_PRODUCERS; ++i) {
pthread_join(producerThreads[i], NULL);
}
// Join consumer threads
for (int i = 0; i < NUM_CONSUMERS; ++i) {
pthread_join(consumerThreads[i], NULL);
}
// Destroy semaphores
sem_destroy(&full);
sem_destroy(&empty);
sem_destroy(&mutex);
return 0;
}

Free Resources

Copyright ©2025 Educative, Inc. All rights reserved