The Producer-Consumer problem is a classic synchronization problem in operating systems.
The problem is defined as follows: there is a fixed-size buffer and a Producer process, and a Consumer process.
The Producer process creates an item and adds it to the shared buffer. The Consumer process takes items out of the shared buffer and “consumes” them.
Certain conditions must be met by the Producer and the Consumer processes to have consistent data synchronization:
The Producer process must not produce an item if the shared buffer is full.
The Consumer process must not consume an item if the shared buffer is empty.
Access to the shared buffer must be mutually exclusive; this means that at any given instance, only one process should be able to access the shared buffer and make changes to it.
The solution to the Producer-Consumer problem involves three semaphore variables.
Full
: Tracks the space filled by the Producer process. It is initialized with a value of as the buffer will have filled spaces at the beginningEmpty
: Tracks the empty space in the buffer. It is initially set to buffer_size as the whole buffer is empty at the beginning.mutex
: Used for mutual exclusion so that only one process can access the shared buffer at a time.Using the signal()
and wait()
operations on these semaphores, we can arrive at a solution.
Let’s look at the code for the Producer and Consumer processes.
void Producer(){
while(true){
// Produce an item
wait(Empty);
wait(mutex);
add();
signal(mutex);
signal(Full);
}
}
In the code above, the Producer process waits for the Empty
semaphore. This means that the Producer process is kept in busy-waiting if the Empty
semaphore value is , indicating that there are empty spaces available. The Producer will have to wait for the Consumer to consume some items from the buffer and make some space available for itself.
The Producer then waits for the mutex
semaphore, which merely ensures that once a thread has entered the critical section of the code, the rest of the threads cannot access it and cause race conditions.
The add()
function appends the item to the shared buffer. Once a Producer process reaches this point in the code, it is guaranteed that no other process is accessing the shared buffer concurrently, preventing data inconsistency.
After the Producer process adds the item to the shared buffer, it uses the signal()
operation to increase the value of the mutex
semaphore by one, thereby allowing any other threads which were busy-waiting in the mutex
semaphore to access the critical section.
Lastly, the Producer process uses the signal()
operation on the Full
semaphore, increasing its value by , indicating that an item has been added to the shared buffer and the count for the filled spaces has increased by one.
The code for the Consumer process is as follows.
void Consumer(){
while(true){
wait(Full);
wait(mutex);
consume();
signal(mutex);
signal(Empty)
}
}
The Consumer waits for the Full
semaphore. If the Full
semaphore value is 0, it indicates that there are no items to consume, and it must wait for the Producer process to produce an item and add it to the shared buffer for consumption.
As previously mentioned, the mutex
semaphore ensures mutually exclusive access to the critical section of the code so that the shared buffer is only accessed by one thread at a time for data synchronization.
Once the Consumer process reaches the critical section of the code, i.e., the consume()
function, it executes the function and takes one item from the shared buffer.
After taking an item from the buffer, the Consumer process first uses signal(mutex)
to release the mutex
semaphore, allowing other threads that may have been busy-waiting in the mutex
to access the critical section.
Lastly, the Consumer uses signal(Empty)
to increase the value of the Empty
semaphore by one, indicating that a free slot has been made in the shared buffer. Any Producer processes that may have been waiting in the Empty
semaphore are now allowed to add an item to the shared buffer.
Now, let's enhance our understanding with an executable example. Click the "Run" to execute the below example.
#include <iostream>#include <pthread.h>#include <semaphore.h>#include <unistd.h> // For sleep function#include <cstdlib> // For rand function#define BUFFER_SIZE 5 // Define the size of the buffer#define NUM_PRODUCERS 2 // Number of producer threads#define NUM_CONSUMERS 2 // Number of consumer threads#define MAX_ITEMS 20 // Maximum items to produce/consumesem_t full, empty, mutex; // Define semaphoresbool running = true; // Global flag to control thread terminationint buffer[BUFFER_SIZE]; // Shared bufferint in = 0; // Index to add an itemint out = 0; // Index to remove an itemvoid add(int item) {buffer[in] = item; // Adding an item to the bufferin = (in + 1) % BUFFER_SIZE;}int consume() {int item = buffer[out]; // Consuming an item from the bufferbuffer[out] = 0; // Reset buffer slotout = (out + 1) % BUFFER_SIZE;return item;}void *producer(void *arg) {int producer_id = *((int*)arg);while (running) {int item = rand() % 100 + 1; // Produce an item (random number between 1 and 100)sleep(1); // Simulate production timesem_wait(&empty); // Wait for space in the buffersem_wait(&mutex); // Acquire the mutex to access the bufferadd(item); // Add item to the buffersem_post(&mutex); // Release the mutexsem_post(&full); // Increment the full countstd::cout << "Producer " << producer_id << " produced item: " << item << std::endl;}pthread_exit(NULL);}void *consumer(void *arg) {int consumer_id = *((int*)arg);while (running) {sleep(1); // Simulate consumption timesem_wait(&full); // Wait for an item to be availablesem_wait(&mutex); // Acquire the mutex to access the bufferint item = consume(); // Consume item from the buffersem_post(&mutex); // Release the mutexsem_post(&empty); // Increment the empty countstd::cout << "Consumer " << consumer_id << " consumed item: " << item << std::endl;}pthread_exit(NULL);}int main() {// Initialize semaphoressem_init(&full, 0, 0);sem_init(&empty, 0, BUFFER_SIZE);sem_init(&mutex, 0, 1);// Create producer threadspthread_t producerThreads[NUM_PRODUCERS];int producer_ids[NUM_PRODUCERS];for (int i = 0; i < NUM_PRODUCERS; ++i) {producer_ids[i] = i + 1;pthread_create(&producerThreads[i], NULL, producer, &producer_ids[i]);}// Create consumer threadspthread_t consumerThreads[NUM_CONSUMERS];int consumer_ids[NUM_CONSUMERS];for (int i = 0; i < NUM_CONSUMERS; ++i) {consumer_ids[i] = i + 1;pthread_create(&consumerThreads[i], NULL, consumer, &consumer_ids[i]);}// Let the program run for a whilesleep(10);// Set running flag to false to terminate threadsrunning = false;// Join producer threadsfor (int i = 0; i < NUM_PRODUCERS; ++i) {pthread_join(producerThreads[i], NULL);}// Join consumer threadsfor (int i = 0; i < NUM_CONSUMERS; ++i) {pthread_join(consumerThreads[i], NULL);}// Destroy semaphoressem_destroy(&full);sem_destroy(&empty);sem_destroy(&mutex);return 0;}