The previous shot covered the Process and Pool classes. In this shot, we will be learning about Pipes and Queues and Locks.
The multiprocessing module provides two ways to communicate between the process.
The Pipe()
, by default, returns a pair of connection objects connected by a
Each connection object has send()
and recv()
methods to send and receive messages.
The following example illustrates how one process produces messages and another process listens to the messages.
import multiprocessingdef process1_send_function(conn, events):for event in events:conn.send(event)print(f"Event Sent: {event}")def process2_recv_function(conn):while True:event = conn.recv()if event == "eod":print("Event Received: End of Day")returnprint(f"Event Received: {event}")def run():events = ["get up", "brush your teeth", "shower", "work", "eod"]conn1, conn2 = multiprocessing.Pipe()process_1 = multiprocessing.Process(target=process1_send_function, args=(conn1, events))process_2 = multiprocessing.Process(target=process2_recv_function, args=(conn2,))process_1.start()process_2.start()process_1.join()process_2.join()if __name__ == "__main__":run()
The Queue()
returns a process shared queue. Think of a queue as a data structure where the producer produces messages and the consumer consumes it. Queues are thread and process safe.
We can modify the above example used for pipes to make use of the queue.
import multiprocessingdef process1_send_function(queue, events):for event in events:queue.put(event)print(f"Event Sent: {event}")def process2_recv_function(queue):while True:event = queue.get()if event == "eod":print("Event Received: End of Day")returnprint(f"Event Received: {event}")def run():events = ["get up", "brush your teeth", "shower", "work", "eod"]queue = multiprocessing.Queue()process_1 = multiprocessing.Process(target=process1_send_function, args=(queue, events))process_2 = multiprocessing.Process(target=process2_recv_function, args=(queue,))process_1.start()process_2.start()process_1.join()process_2.join()if __name__ == "__main__":run()
Process synchronization makes sure that no two processes execute the same part of the program, called critical section, at the same time.
To achieve this, before executing the critical section, the process has to acquire the lock. Once, the work in the critical section is over, the process has to release the lock.
Take a look at following example.
import multiprocessingdef process_function(lock):lock.acquire()# CRITICAL SECTIONprint("CRITICAL SECTION")print("Only One Process has to access at a given time")lock.release()def run():lock = multiprocessing.Lock()process_1 = multiprocessing.Process(target=process_function, args=(lock,))process_2 = multiprocessing.Process(target=process_function, args=(lock,))process_1.start()process_2.start()process_1.join()process_2.join()if __name__ == "__main__":run()