Interactive Queries
Learn how to expose state stores to external queries.
The data stored in Kafka Streams state stores could potentially be used outside the context of the applications. We could save it in an external database to be queried using a different application or microservice, but Kafka Streams provides us with an easier and faster solution: interactive queries.
Interactive queries allow us to query state stores and expose the results, usually through a REST API. We will run a simple stateful topology with a reduce
operator, which concatenates received records to a single sentence by the record’s key.
Then, we will expose a REST endpoint that will retrieve values from the state store associated with the reduce
operator by key.
We will be using the following code to start:
package io.github.stavshamir; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import spark.Spark; import java.util.List; import java.util.Properties; public class StatefulApplication { public static void main(String[] args) { createTopics(); Properties props = buildConfiguration(); Topology topology = buildTopology(); var streams = new KafkaStreams(topology, props); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); System.out.println("Starting"); streams.start(); } private static Topology buildTopology() { var builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream("words"); stream.peek((k, v) -> System.out.printf("Incoming record - Key: %s, Value: %s%n", k, v)) .groupByKey() .reduce((a, b) -> a + " " + b) .toStream() .peek((k, v) -> System.out.printf("Reduced - Key: %s, Value: %s%n", k, v)); return builder.build(); } private static Properties buildConfiguration() { var properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateful"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); return properties; } private static void createTopics() { var properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); try (var adminClient = AdminClient.create(properties)) { adminClient.createTopics(List.of( new NewTopic("words", 1, (short) 1) )); } } }
Materializing a state store
Stateful operators ...