Non-Blocking Queue
This lesson is a follow-up to the blocking queue question and explores the various ways in which we can make a blocking queue non-blocking.
We'll cover the following...
Non-Blocking Queue
Problem
We have seen the blocking version of a queue in the previous question that blocks a producer or a consumer when the queue is full or empty respectively. In this problem, you are asked to implement a queue that is non-blocking. The requirement on non-blocking is intentionally left open to interpretation to see how you think through the problem.
This question is inspired by one of David Beazley's Python talks.
Solution
Let's first define the notion of non-blocking. If a consumer or a producer can successfully enqueue or dequeue an item, it is considered non-blocking. However, if the queue is full or empty then a producer or a consumer (respectively) need not wait until the queue can be added to or taken from.
First Cut
The trivial solution is to return a boolean value indicating the success of an operation. If the invoker of either enqueue()
or dequeue()
receives False, then it is the responsibility of the invoker to retry the operation at a later time. This trivial solution appears in the code widget below.
from threading import Threadfrom threading import Lockfrom threading import current_threadfrom concurrent.futures import Futureimport timeimport randomclass NonBlockingQueue:def __init__(self, max_size):self.max_size = max_sizeself.q = []self.lock = Lock()def dequeue(self):with self.lock:curr_size = len(self.q)if curr_size != 0:return self.q.pop(0)else:return Falsedef enqueue(self, item):with self.lock:curr_size = len(self.q)if curr_size == self.max_size:return Falseelse:self.q.append(item)return Truedef consumer_thread(q):while 1:item = q.dequeue()if item == False:print("Consumer couldn't dequeue an item")else:print("\n{0} consumed item {1}".format(current_thread().getName(), item), flush=True)time.sleep(random.randint(1, 3))def producer_thread(q):item = 1while 1:result = q.enqueue(item)if result is True:print("\n {0} produced item".format(current_thread().getName()), flush=True)item += 1if __name__ == "__main__":no_block_q = NonBlockingQueue(5)consumerThread1 = Thread(target=consumer_thread, name="consumer", args=(no_block_q,), daemon=True)producerThread1 = Thread(target=producer_thread, name="producer", args=(no_block_q,), daemon=True)consumerThread1.start()producerThread1.start()time.sleep(15)print("Main thread exiting")
In the above example, note that we use a Lock
object before we attempt to enqueue or dequeue an item in our internal queue. We must ensure serial access to shared data-structures.
Second Cut
If we want to get more sophisticated in our approach we can return an object of ...