Non-Blocking Queue

This lesson is a follow-up to the blocking queue question and explores the various ways in which we can make a blocking queue non-blocking.

We'll cover the following...

Non-Blocking Queue

Problem

We have seen the blocking version of a queue in the previous question that blocks a producer or a consumer when the queue is full or empty respectively. In this problem, you are asked to implement a queue that is non-blocking. The requirement on non-blocking is intentionally left open to interpretation to see how you think through the problem.

This question is inspired by one of David Beazley's Python talks.

Solution

Let's first define the notion of non-blocking. If a consumer or a producer can successfully enqueue or dequeue an item, it is considered non-blocking. However, if the queue is full or empty then a producer or a consumer (respectively) need not wait until the queue can be added to or taken from.

First Cut

The trivial solution is to return a boolean value indicating the success of an operation. If the invoker of either enqueue() or dequeue() receives False, then it is the responsibility of the invoker to retry the operation at a later time. This trivial solution appears in the code widget below.

Press + to interact
from threading import Thread
from threading import Lock
from threading import current_thread
from concurrent.futures import Future
import time
import random
class NonBlockingQueue:
def __init__(self, max_size):
self.max_size = max_size
self.q = []
self.lock = Lock()
def dequeue(self):
with self.lock:
curr_size = len(self.q)
if curr_size != 0:
return self.q.pop(0)
else:
return False
def enqueue(self, item):
with self.lock:
curr_size = len(self.q)
if curr_size == self.max_size:
return False
else:
self.q.append(item)
return True
def consumer_thread(q):
while 1:
item = q.dequeue()
if item == False:
print("Consumer couldn't dequeue an item")
else:
print("\n{0} consumed item {1}".format(current_thread().getName(), item), flush=True)
time.sleep(random.randint(1, 3))
def producer_thread(q):
item = 1
while 1:
result = q.enqueue(item)
if result is True:
print("\n {0} produced item".format(current_thread().getName()), flush=True)
item += 1
if __name__ == "__main__":
no_block_q = NonBlockingQueue(5)
consumerThread1 = Thread(target=consumer_thread, name="consumer", args=(no_block_q,), daemon=True)
producerThread1 = Thread(target=producer_thread, name="producer", args=(no_block_q,), daemon=True)
consumerThread1.start()
producerThread1.start()
time.sleep(15)
print("Main thread exiting")

In the above example, note that we use a Lock object before we attempt to enqueue or dequeue an item in our internal queue. We must ensure serial access to shared data-structures.

Second Cut

If we want to get more sophisticated in our approach we can return an object of ...