Stopping a Consumer
This lesson explains how the pool loop of a Kafka consumer can be cleanly exited.
We'll cover the following
In the previous lessons, we showed code for the Kafka consumer that ran an infinite poll loop. This brings up the question of how we can gracefully stop a consumer. The KafkaConsumer
object exposes a method wakeup()
that can be invoked from a different thread to stop the consumer. Note that wakeup()
is the only method of the consumer object that can be invoked safely from another thread. When wakeup()
is invoked on the consumer object, the consumer throws a WakeupException
if the consumer is already waiting on the poll()
method. If not, the exception is thrown the next time the consumer invokes the poll()
method. As a developer, you don’t need to handle the WakeupException
, but you must invoke close()
on the consumer object in the finally
block. Closing the consumer commits offsets and informs the broker that the consumer is leaving. The broker can then trigger a rebalance immediately rather than wait for a session timeout to assign the partitions owned by the exiting consumer to other consumers in the group.
The code widget below demonstrates how to write a consumer to gracefully exit the infinite poll loop.
Get hands-on with 1400+ tech skills courses.