Pipes, queues, and lock in multiprocessing in Python

Share

The previous shot covered the Process and Pool classes. In this shot, we will be learning about Pipes and Queues and Locks.

Pipes and Queues

The multiprocessing module provides two ways to communicate between the process.

Pipes

The Pipe(), by default, returns a pair of connection objects connected by a pipea duplex in nature, i.e., bidirectional.

Each connection object has send() and recv() methods to send and receive messages.

The following example illustrates how one process produces messages and another process listens to the messages.

import multiprocessing
def process1_send_function(conn, events):
for event in events:
conn.send(event)
print(f"Event Sent: {event}")
def process2_recv_function(conn):
while True:
event = conn.recv()
if event == "eod":
print("Event Received: End of Day")
return
print(f"Event Received: {event}")
def run():
events = ["get up", "brush your teeth", "shower", "work", "eod"]
conn1, conn2 = multiprocessing.Pipe()
process_1 = multiprocessing.Process(target=process1_send_function, args=(conn1, events))
process_2 = multiprocessing.Process(target=process2_recv_function, args=(conn2,))
process_1.start()
process_2.start()
process_1.join()
process_2.join()
if __name__ == "__main__":
run()

Queues

The Queue() returns a process shared queue. Think of a queue as a data structure where the producer produces messages and the consumer consumes it. Queues are thread and process safe.

We can modify the above example used for pipes to make use of the queue.

import multiprocessing
def process1_send_function(queue, events):
for event in events:
queue.put(event)
print(f"Event Sent: {event}")
def process2_recv_function(queue):
while True:
event = queue.get()
if event == "eod":
print("Event Received: End of Day")
return
print(f"Event Received: {event}")
def run():
events = ["get up", "brush your teeth", "shower", "work", "eod"]
queue = multiprocessing.Queue()
process_1 = multiprocessing.Process(target=process1_send_function, args=(queue, events))
process_2 = multiprocessing.Process(target=process2_recv_function, args=(queue,))
process_1.start()
process_2.start()
process_1.join()
process_2.join()
if __name__ == "__main__":
run()

Locks

Process synchronization makes sure that no two processes execute the same part of the program, called critical section, at the same time.

To achieve this, before executing the critical section, the process has to acquire the lock. Once, the work in the critical section is over, the process has to release the lock.

Take a look at following example.

import multiprocessing
def process_function(lock):
lock.acquire()
# CRITICAL SECTION
print("CRITICAL SECTION")
print("Only One Process has to access at a given time")
lock.release()
def run():
lock = multiprocessing.Lock()
process_1 = multiprocessing.Process(target=process_function, args=(lock,))
process_2 = multiprocessing.Process(target=process_function, args=(lock,))
process_1.start()
process_2.start()
process_1.join()
process_2.join()
if __name__ == "__main__":
run()