Felpfe Inc.
Search
Close this search box.
call 24/7

+484 237-1364‬

Search
Close this search box.

Interactive Queries for real-time data access

Introduction to Interactive Queries

In this section, we will explore interactive queries in Kafka Streams, which provide the ability to perform real-time data access and querying on the stateful data maintained by stream processing applications. Interactive queries enable applications to fetch and retrieve data from internal state stores, allowing for dynamic and interactive analysis of real-time data.

Topics covered in this section:

  1. Introduction to interactive queries and their significance in stream processing.
  2. Understanding state stores and their role in interactive queries.
  3. Queryable state stores and their benefits in real-time data access.
  4. Implementing interactive queries in Kafka Streams applications.
  5. Configuring and managing interactive queries for scalability and fault-tolerance.

Code Sample: Interactive Queries in Kafka Streams

Java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.*;
import java.util.Properties;

public class KafkaStreamsInteractiveQueriesExample {

    public static void main(String[] args) {
        // Configure Kafka Streams application
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // Create Kafka Streams builder
        StreamsBuilder builder = new StreamsBuilder();

        // Define the processing logic
        KStream<String, String> inputStream = builder.stream("input_topic");

        // Perform data processing and store the results in a state store
        KTable<String, Long> aggregatedTable = inputStream
                .groupBy((key, value) -> key)
                .count(Materialized.as("state_store_name"));

        // Enable querying of the state store
        ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
                StoreQueryParameters.fromNameAndType("state_store_name", QueryableStoreTypes.keyValueStore())
        );

        // Perform interactive queries on the state store
        String key = "sample_key";
        Long count = keyValueStore.get(key);
        System.out.println("Count for key " + key + ": " + count);

        // Build and start the Kafka Streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Gracefully shutdown the application on termination
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Reference Link:

  • Kafka Streams documentation on interactive queries: link

Helpful Video:

  • “Kafka Streams Interactive Queries” by Confluent: link

Benefits and Use Cases of Interactive Queries

In this section, we will discuss the benefits and use cases of interactive queries in Kafka Streams. Interactive queries provide real-time data access, enabling applications to query and retrieve data from internal state stores. Understanding the advantages and potential use cases of interactive queries empowers developers to build interactive and dynamic applications.

Topics covered in this section:

  1. Advantages and significance of interactive queries in stream processing.
  2. Real-time data access and dynamic analysis of stateful data.
  3. Use cases for interactive queries: real-time analytics, dashboards, and more.
  4. Scalability and fault-tolerance considerations for interactive queries.
  5. Integration possibilities with external data stores for advanced querying.

Code Sample: Interactive Queries with Join Operation

Java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.*;
import java.util.Properties;

public class KafkaStreamsInteractiveQueriesExample {

    public static void main(String[] args) {
        // Configure Kafka Streams application
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // Create Kafka Streams builder
        StreamsBuilder builder = new StreamsBuilder();

        // Define the processing logic
        KStream<String, String> inputStream1 = builder.stream("input_topic_1");
        KStream<String, String> inputStream2 = builder.stream("input_topic_2");

        // Perform data processing and store the results in a state store
        KTable<String, String> joinedTable = inputStream1
                .join(
                        inputStream2,
                        (value1, value2) -> value1 + ", " + value2,
                        JoinWindows.of(Duration.ofMinutes(5)),
                        Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
                )
                .groupByKey()
                .reduce((value1, value2) -> value2, Materialized.as("state_store_name"));

        // Enable querying of the state store
        ReadOnlyKeyValueStore<String, String> keyValueStore = streams.store(
                StoreQueryParameters.fromNameAndType("state_store_name", QueryableStoreTypes.keyValueStore())
        );

        // Perform interactive queries on the state store
        String key = "sample_key";
        String value = keyValueStore.get(key);
        System.out.println("Value for key " + key + ": " + value);

        // Build and start the Kafka Streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Gracefully shutdown the application on termination
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Reference Link:

  • Kafka Streams documentation on interactive queries: link

Helpful Video:

  • “Kafka Streams Interactive Queries” by DataCumulus: link

Conclusion:
In this module, we explored interactive queries in Kafka Streams, which provide the capability for real-time data access and querying on stateful data. Interactive queries empower applications to dynamically analyze and retrieve data from internal state stores, enabling real-time insights and dynamic decision-making.

The provided code samples and reference links equip you to implement interactive queries in your Kafka Streams applications. By leveraging queryable state stores, you can perform interactive queries to fetch data in real-time. Interactive queries are valuable in various use cases, including real-time analytics, dynamic dashboards, and more.

With interactive queries, Kafka Streams enables developers to build interactive and dynamic applications that can access and analyze real-time data. The scalability and fault-tolerance features of Kafka Streams ensure the availability and reliability of interactive query functionality.

Unleashing The Tech Marvels

Discover a tech enthusiast’s dreamland as our blog takes you on a thrilling journey through the dynamic world of programming. 

More Post like this

About Author
Ozzie Feliciano CTO @ Felpfe Inc.

Ozzie Feliciano is a highly experienced technologist with a remarkable twenty-three years of expertise in the technology industry.

kafka-logo-tall-apache-kafka-fel
Stream Dream: Diving into Kafka Streams
In “Stream Dream: Diving into Kafka Streams,”...
ksql
Talking in Streams: KSQL for the SQL Lovers
“Talking in Streams: KSQL for the SQL Lovers”...
spring_cloud
Stream Symphony: Real-time Wizardry with Spring Cloud Stream Orchestration
Description: The blog post, “Stream Symphony:...
1_GVb-mYlEyq_L35dg7TEN2w
Kafka Chronicles: Saga of Resilient Microservices Communication with Spring Cloud Stream
“Kafka Chronicles: Saga of Resilient Microservices...
kafka-logo-tall-apache-kafka-fel
Tackling Security in Kafka: A Comprehensive Guide on Authentication and Authorization
As the usage of Apache Kafka continues to grow in organizations...
1 2 3 58
90's, 2000's and Today's Hits
Decades of Hits, One Station

Listen to the greatest hits of the 90s, 2000s and Today. Now on TuneIn. Listen while you code.