Processor Error Handling
Understand how Kafka Streams handles processor errors and learn to implement the StreamsUncaughtExceptionHandler interface to manage exceptions effectively. This lesson guides you through controlling thread behavior when errors occur, preventing application shutdowns, and ensuring reliable message processing in your stream topologies.
We'll cover the following...
Processor errors are exceptions thrown from the code running in a stream processor, which include operations such as filter, map, and flatMap. That means that the source of these errors is the code containing our application’s logic.
Default behavior
When Kafka Streams encounters a processor exception, its default behavior is to shut down the application. Let’s simulate this—below, we have a stateless topology. It has a mapValues operator which throws an exception randomly:
package io.github.stavshamir.api;
import io.github.stavshamir.types.feelings.*;
import java.util.List;
import java.util.UUID;
public class Api {
public static String getLyrics(UUID trackId) {
return "mock lyrics for track id " + trackId;
}
public static List<Feeling> getFeelingsFromLyrics(String lyrics) {
return List.of(new Anger(), new Sadness(), new Happiness(), new Love(), new Fear());
}
public static List<Feeling> getFeelingsById(UUID trackId) {
return List.of(new Anger(), new Sadness(), new Happiness(), new Love(), new Fear());
}
}
If we run the application and publish a message to the tracks topic, the application will eventually encounter the random error and shut down. Click ...