...

/

Processor Error Handling

Processor Error Handling

Learn how to handle processor errors in Kafka Streams applications.

Processor errors are exceptions thrown from the code running in a stream processor, which include operations such as filter, map, and flatMap. That means that the source of these errors is the code containing our application’s logic.

Default behavior

When Kafka Streams encounters a processor exception, its default behavior is to shut down the application. Let’s simulate this—below, we have a stateless topology. It has a mapValues operator which throws an exception randomly:

package io.github.stavshamir.api;

import io.github.stavshamir.types.feelings.*;

import java.util.List;
import java.util.UUID;

public class Api {

    public static String getLyrics(UUID trackId) {
        return "mock lyrics for track id " + trackId;
    }

    public static List<Feeling> getFeelingsFromLyrics(String lyrics) {
        return List.of(new Anger(), new Sadness(), new Happiness(), new Love(), new Fear());
    }

    public static List<Feeling> getFeelingsById(UUID trackId) {
        return List.of(new Anger(), new Sadness(), new Happiness(), new Love(), new Fear());
    }

}
Simplified stateless topology application

If we run the application and publish a message to the tracks topic, the application will eventually encounter the random error and shut ...