The DSL API’s Stateless Operations: Application
Let’s continue our exploration of the Kafka Stream’s stateless operations with a hands-on example.
We'll cover the following...
Understand stateless operations with an example
Let’s understand stateless operations with an application that will filter words based on a criterion and transform the results further.
Sample code
Click the “Run” button in the widget below. This will initiate the build process and start the Kafka Streams application. Once the application has started, you should see the started kafka streams app
message.
After that, follow the steps outlined below:
package com.example; import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.common.serialization.Serdes; public class KafkaStreamsDSLExample { public static void main(String[] args) { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dsl-api-demo"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> input = builder.stream("input-topic"); // filter words with length > 5 KStream<String, String> filtered = input.filter((k, v) -> k.length() > 5); // map - each value to upper case KStream<String, String> upperCased = filtered.mapValues(value -> value.toUpperCase()); // terminal operation - peek (inspect each key and value) upperCased.peek((k, v) -> System.out.println("[peeked] k-v " + k + ", " + v)); // terminal operation - to upperCased.to("output-topic"); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, config); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); System.out.println("started kafka streams app....."); streams.start(); } }
The Kafka Streams DSL API’s stateless operations
Click the “+” button to open a new terminal tab, and enter the command to start the Kafka consumer CLI. This will wait for messages from the output-topic
:
Press + to interact
/app/confluent-7.3.1/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic output-topic --from-beginning --property "print.key=true" --property "key.separator=:"
Click the “+” button to open a new terminal tab, ...
Access this course and 1400+ top-rated courses and projects.