Felpfe Inc.
Search
Close this search box.
call 24/7

+484 237-1364‬

Search
Close this search box.

Hands-On with Kafka Streams: Building a Real-Time Analytics Application

Introduction

Real-time analytics has become a crucial component of modern software architectures, especially when dealing with vast amounts of data generated from various sources. Apache Kafka, a robust and efficient distributed messaging system, provides a library for processing and analyzing this real-time data called Kafka Streams.

Today, we are diving into Kafka Streams, creating a real-time analytics application that will consume a stream of sales data and compute the total sales per product category. Strap in and enjoy the ride.

Part 1: Kafka Streams 101

Before diving into the code, it’s crucial to understand the Kafka Streams architecture and its core concepts. In a nutshell, Kafka Streams allows you to define a “topology” of processing nodes. Each node represents a computational step (e.g., mapping, filtering, aggregation) in your data pipeline. Kafka Streams processes data in real-time, record by record, which enables highly responsive and context-aware applications.

Part 2: Setting Up the Kafka Streams Application

Our real-time analytics application will use Maven for dependency management. Below are the necessary dependencies:

YAML
<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
  </dependency>
</dependencies>

After setting up our project, we need to configure our Streams application:

Java
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "sales-analytics-app");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

Our configuration includes the application ID, which uniquely identifies the application across the Kafka cluster. We also specify the address of our Kafka broker and default SerDes (Serializer/Deserializer) classes, which determine how to convert between Java objects and bytes for network transmission.

Next, we define our processing topology. The initial stream (“sales-topic”) receives sales data as input. We then transform this stream into a table (“sales-per-category-topic”) that reflects the total sales per product category:

Java
final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> sourceStream = builder.stream("sales-topic");
KTable<String, Long> salesPerCategory = sourceStream
  .map((key, value) -> new KeyValue<>(value.getCategory(), value.getSaleAmount()))
  .groupByKey()
  .reduce(Long::sum);

salesPerCategory.toStream().to("sales-per-category-topic", Produced.with(Serdes.String(), Serdes.Long()));

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();

Let’s dissect the steps:

  1. We create a new StreamsBuilder, which provides the high-level Streams DSL.
  2. We specify the source topic of our stream.
  3. We transform the data. Each sales record gets mapped into a KeyValue pair, with the product category as the key and the sale amount as the value.
  4. We group the data by key (i.e., product category) and sum the sales amounts.
  5. We output the result to a new topic.
  6. We create a KafkaStreams instance and start processing.

Part 3: Deep Dive into Data Processing

With the basic setup out of the way, let’s delve deeper into Kafka Streams’ data processing capabilities. Kafka Streams offers two types of data manipulation operations: Stateless and Stateful.

Stateless operations, such as map and filter, process each record independently. In contrast, stateful operations, like groupByKey and reduce, consider the context of previous records.

In our application, we use the map operation to transform the sales records and a combination of groupByKey and reduce to aggregate the sales amounts. Let’s add a filter operation to exclude any sales transactions below a certain threshold:

Java
KStream<String, String> filteredStream = sourceStream.filter((key, value) -> value.getSaleAmount() > 100);
KTable<String, Long> salesPerCategory = filteredStream
  .map((key, value) -> new KeyValue<>(value.getCategory(), value.getSaleAmount()))
  .groupByKey()
  .reduce(Long::sum);

This additional operation ensures that only transactions above $100 contribute to the sales totals.

Part 4: Enhancing the Application: Windowed Aggregations

Now, let’s say we want to analyze sales not only in total but also within specific time windows. Kafka Streams allows us to do this through windowed aggregations. We’ll modify our application to compute hourly sales totals:

Java
TimeWindows windows = TimeWindows.of(Duration.ofHours(1)).advanceBy(Duration.ofMinutes(1));

KTable<Windowed<String>, Long> windowedSalesPerCategory = filteredStream
  .map((key, value) -> new KeyValue<>(value.getCategory(), value.getSaleAmount()))
  .groupByKey()
  .windowedBy(windows)
  .reduce(Long::sum);

windowedSalesPerCategory.toStream().to("windowed-sales-per-category-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));

Here, we create a TimeWindows object that defines the window duration (1 hour) and how much the window advances each time (1 minute). We apply the windowed aggregation through the windowedBy operation.

Part 5: Testing the Kafka Streams Application

Now that we have our streaming application, it’s time to test it. Kafka provides the TopologyTestDriver for this purpose, a powerful tool for testing your Streams applications without needing a running Kafka cluster.

We’ll write a JUnit test that verifies our application correctly calculates sales per category and excludes transactions below $100. We’ll also verify that the windowed aggregation works as expected:

Java
// Setup test driver and test data here...

// Feeding input data
testDriver.pipeInput(factory.create("sales-topic", "category1", "50"));
testDriver.pipeInput(factory.create("sales-topic", "category1", "150"));
testDriver.pipeInput(factory.create("sales-topic", "category2", "200"));
testDriver.advanceWallClockTime(Duration.ofHours(1));
testDriver.pipeInput(factory.create("sales-topic", "category1", "200"));

// Verifying the output data
ProducerRecord<String, Long> outputRecord1 = testDriver.readOutput("sales-per-category-topic", new StringDeserializer(), new LongDeserializer());
assertEquals("category1", outputRecord1.key());
assertEquals(Long.valueOf(150L), outputRecord1.value());

ProducerRecord<String, Long> outputRecord2 = testDriver.readOutput("sales-per-category-topic", new StringDeserializer(), new LongDeserializer());
assertEquals("category2", outputRecord2.key());
assertEquals(Long.valueOf(200

L), outputRecord2.value());

ProducerRecord<Windowed<String>, Long> windowedOutputRecord = testDriver.readOutput("windowed-sales-per-category-topic", new WindowedDeserializer<>(new StringDeserializer()), new LongDeserializer());
assertEquals("category1", windowedOutputRecord.key().key());
assertEquals(Long.valueOf(200L), windowedOutputRecord.value());

In this test, we first feed some test data into our application through the pipeInput method. We also advance the wall clock time to simulate the passage of time. After processing the input data, we verify the output records from both the total sales topic and the windowed sales topic.

Conclusion

Congratulations! You’ve built a real-time analytics application using Kafka Streams, from scratch. In the process, you learned about the basic principles of Kafka Streams, how to create a processing topology, and how to perform stateless and stateful operations. You also dove into windowed aggregations and testing your Kafka Streams applications.

This tutorial only scratches the surface of what you can do with Kafka Streams. With its powerful DSL and intuitive API, Kafka Streams opens up a world of possibilities for real-time data processing. As you continue to explore Kafka Streams, you may find yourself tackling more complex use-cases such as joins, interactive queries, and exactly-once processing.

Remember that Kafka Streams is not a standalone service, but a part of your application. This means you can leverage all the tools and libraries from your ecosystem, leading to a simplified operational model and increased flexibility.

About Author
Ozzie Feliciano CTO @ Felpfe Inc.

Ozzie Feliciano is a highly experienced technologist with a remarkable twenty-three years of expertise in the technology industry.

kafka-logo-tall-apache-kafka-fel
Stream Dream: Diving into Kafka Streams
In “Stream Dream: Diving into Kafka Streams,”...
ksql
Talking in Streams: KSQL for the SQL Lovers
“Talking in Streams: KSQL for the SQL Lovers”...
spring_cloud
Stream Symphony: Real-time Wizardry with Spring Cloud Stream Orchestration
Description: The blog post, “Stream Symphony:...
1_GVb-mYlEyq_L35dg7TEN2w
Kafka Chronicles: Saga of Resilient Microservices Communication with Spring Cloud Stream
“Kafka Chronicles: Saga of Resilient Microservices...
kafka-logo-tall-apache-kafka-fel
Tackling Security in Kafka: A Comprehensive Guide on Authentication and Authorization
As the usage of Apache Kafka continues to grow in organizations...
1 2 3 58
90's, 2000's and Today's Hits
Decades of Hits, One Station

Listen to the greatest hits of the 90s, 2000s and Today. Now on TuneIn. Listen while you code.