Reliable Producers and Consumers

This lesson explores the various configurations for Kafka producers and consumers for reliable message delivery.

Even if we configure Kafka brokers with the most reliable configuration settings, we can still experience data loss across the system because of settings used on the producers or consumers. Consider the following scenario.

  • Say, we have three replicas running, and the option to have an unclean election is turned off. The producer uses the configuration setting acks=1 when sending messages to the partition leader. The producer sends a message to the partition leader which they successfully write. Before the message can be replicated by the in-sync replicas from the partition leader, it crashes. One of the in-sync followers which doesn’t have this latest message now becomes the leader (election is clean since the replica was in-sync). In this scenario, the producer thinks the message was successfully written, while in reality it was never committed since all the in-sync replicas apart from the partition leader were unable to copy it. The consumers will never see the message because it was never committed. Remember, consumers only read committed messages. The system is still consistent because the consumers never see the message, but from the producer’s perspective, the message is lost.

  • One response to the above scenario is to change the producer setting to acks=all. Now the producer receives a “message written successfully” notification only when the message has been replicated by all the in-sync replicas including the partition. But what if the producer sends a message in the midst of an election after a partition leader crashes? The producer will receive an exception LeaderNotAvailableException. The onus of handling that error and retrying to send the message is on the producer. The system remains consistent since the consumers never see this message. However, if the producer doesn’t handle retries, the message will be lost.

For producers, correctly configuring the acks parameter and handling errors is important to meet the reliability expectations.

  • When acks=0 the producer doesn’t wait for an acknowledgement from the broker. This setting results in the highest bandwidth utilization and throughput but is prone to losing messages (e.g. when messages are sent while a leader is being elected).

  • When acks=1 the producer waits to receive an acknowledgement from the leader but the message can still be lost if the leader crashes before followers get a chance to replicate the message.

  • When acks=all the producers receive an acknowledgement when all in-sync replicas have replicated the message. The parameter min.insync.replica specifies how many in-sync replicas must get the message before the producer can consider the message committed. This is the slowest but safest of all the options since the producer has to wait for all the in-sync replicas to replicate. Using async mode with this option can slightly improve the latency.

Even when using ack=all, the producer can still receive errors. Some of the errors are retryable errors such as the partition leader not being available or a temporary network glitch, both of which may resolve in a few seconds. Other errors aren’t retryable like invalid configuration, authorization exception, message size, etc. For retryable errors, the producer can be configured to retry sending the message again. A consequence of this approach is that the broker may receive the same message twice. This can happen if the acknowledgement from the broker is lost in the network and doesn’t reach the producer. The producer will attempt a retry and consider the message committed when it receives the acknowledgement on the retry. This situation is generally handled by including a unique identifier with each message. The consumer can look at the message identifier to know the message was recorded twice and discard the duplicate. Another approach is to make the message idempotent, i.e. the reprocessing of the same message doesn’t have any consequences for correctness.

Consumers

As mentioned earlier, consumers only read committed messages. However, a message that gets read but not processed by the consumer when the offset is already committed can be lost if the consumer crashes before processing the message. The offset for the message is either at or behind the committed offset. Recall that when a consumer crashes and another one starts (it may very well be the same consumer that just crashed), the new consumer needs to know what offset to start reading messages from. A consumer reading a partition stores/commits its current location or offset within the partition so that a new consumer can start where it left off.

We’ll go over some of the important configuration properties of consumers that affect reliability behavior:

  • group.id: Recall that all the consumers within a consumer group read all the messages from a topic as a whole. but Individually, though, each consumer reads only a subset of partitions in the topic. If we want a single consumer to read all the messages in a topic, we’ll need to assign a unique group.id to this consumer.

  • auto.offset.reset: This configuration defines consumer behavior when it initially starts and finds no committed offset or when the offset is missing. If set to earliest, the consumer will start processing messages from the very beginning of the partition, which may mean processing some messages twice but guarantees minimum data loss. The other setting we can use for this configuration parameter is latest, which will have the consumer read messages from the end of the partition. With this option, duplicate processing of messages is minimized at the cost of an increased chance of data loss.

  • enable.auto.commit: Setting this configuration property to true offloads the responsibility of committing the offset to the consumer. However, this decision can have major implications if records are processed outside of the polling loop that we discussed earlier (e.g. handing off records to another thread for processing). If the consumer crashes before the thread is done processing the passed-in records but the offset has already been automatically committed, then messages will be lost. Similarly, if you process records within the processing loop, you may never miss a message since the offset is committed only on the next poll() invocation. You may end up processing messages twice, though, if the consumer crashes midway while processing a batch and then restarts.

  • auto.commit.interval.ms: This configuration parameter controls how frequently offsets are committed automatically. By default, it is set to five seconds. A lower value results in offsets being committed more frequently with a slight overhead, but the number of duplicate messages to process in case of a crash reduces.

Explicit offset committing

You should only commit offsets after the messages have been processed. If you don’t maintain any state within the poll loop and all the processing is contained within the poll loop, then you may use automatic committing of offset. Note that reducing the commit frequency takes a toll on performance. There is always some overhead involved when committing offsets. Another note is to make sure to commit the offset of the message that was processed and not the offset that was read from the last poll() invocation. Committing offsets for messages read but not processed can result in the consumer missing messages.

Be cognizant of rebalances; you should commit offsets before partitions are revoked and clean up any state you maintain before new partitions are assigned.

Consider the scenario of a consumer that polls for records and is unable to process some of the received records. It could be that the consumer writes the records to a database and the database may be unavailable. Unlike other messaging systems, we don’t acknowledge each individual message. Rather, when we commit offset for, say, message #91, all the records up to the offset #91 are considered read and processed even though we might have not been able to write message #89 into our database. We can solve this situation in one of the following two ways:

  • Store the messages for which a retriable error was received in a buffer for later processing. Recall that we can’t stop poll()-ing, otherwise, the broker will think that the consumer is dead and trigger a rebalance. We can also use the pause() API to stop fetching messages from the subscribed topics and take our time to process the failed messages. When pause() is invoked, the next call to poll() will not return any records until the resume() call is invoked.

  • Another strategy is to write the message for which a retriable error was encountered to another topic and continue. A separate consumer group can handle messages written to the retry topic or the same consumer can also subscribe to the retry topic in addition to the main topic.

We can use Kafka for use cases which require us to maintain a result or state computed so far, such as calculating moving averages. We can store the result of a computation in a results topic, but it may happen that we store the result and the consumer crashes before the offset is committed or vice versa. This task isn’t trivial to accomplish since Kafka doesn’t offer transactions yet. It is recommended to look at a library like Kafka Streams, which provides high level DSL-like APIs for aggregation, joins, windows, and other complex analytics.

Given that in some versions of Kafka we must keep polling to send out heartbeats to the Kafka broker, we can’t take too long in the poll loop to process records. If we expect processing to take significant time, we can use a threadpool to hand off records for processing. While the thread completes the computation, we can pause the consumer and continue to poll so that no data is fetched but the heartbeats are still sent out.

Exactly Once Delivery

Kafka doesn’t support exactly once delivery of messages out of the box, but there are ways developers can guarantee exactly once delivery that we’ll discuss next. One approach is for the consumer to write the results to a system that supports unique keys. Any key-value store can be used for this purpose (e.g. relational databases or ElasticSearch). The records can come with their own unique keys and if the records don’t have keys, we can create one using the combination of topic + partition + offset for the message. Every message is uniquely identified using this combination. Thereafter, even if a record has been duplicated, we’ll simply overwrite the same value for the key. This pattern is known as idempotent write.

The other approach is to rely on an external system that offers transactions. We store the message as well as its offset in a single transaction. After a crash or when the consumer starts up the first time, it can query the external store and retrieve the offset of the last record read and start consuming records from that offset onwards.

Get hands-on with 1400+ tech skills courses.