Felpfe Inc.
Search
Close this search box.
call 24/7

+484 237-1364‬

Search
Close this search box.

Building Reliable Kafka Producers and Consumers: A Walkthrough with Java

Introduction

Apache Kafka is a powerful distributed streaming platform known for its high-throughput, low-latency, fault-tolerant, and durable real-time data processing capabilities. It’s often the backbone of choice for many data integration, event-driven architectures, and real-time analytics solutions. In this tutorial, we aim to take a deep dive into building reliable Kafka producers and consumers using Java, complete with unit tests to ensure our Kafka applications are functioning as expected.

Setting Up the Project

To start, we’ll need to set up a Java project with Maven to manage our dependencies. Below is a sample pom.xml file that includes the dependencies needed for this tutorial:

XML
<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
  </dependency>
</dependencies>

The kafka-clients library provides the necessary APIs to interact with Kafka, while junit allows us to write and run our unit tests.

Writing a Reliable Kafka Producer

Let’s begin by creating a Kafka producer. Producers are responsible for publishing data to Kafka topics. We start by defining our producer’s properties:

Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);

These properties specify the Kafka broker’s address, the serializer classes for keys and values (which convert our data into bytes), the acknowledgement settings, and the number of retries in case of failures.

To instantiate the Kafka producer and send a message, we use the following lines of code:

Java
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record);
producer.close();

With these simple steps, we’ve published a record to our “test-topic”. The send method is asynchronous and returns a Future<RecordMetadata>, which can be used to handle the response from the broker.

Writing a Reliable Kafka Consumer

Next, we’ll craft a Kafka consumer to subscribe to Kafka topics and consume the published data. As we did with the producer, we start by defining the consumer’s properties:

Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

Here, we define properties such as the Kafka broker’s address, the consumer group ID, the deserializer classes for keys and values, and settings for committing the consumer’s offset.

We then instantiate the Kafka consumer and subscribe to a topic:

Java
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

In this loop, our consumer continuously polls the topic for new records. Upon receiving new records, the consumer processes them and automatically commits the offsets.

Writing Unit Tests for Kafka Producers and Consumers

Testing Kafka producers and consumers can be challenging due to their asynchronous nature and their dependency on a Kafka broker. However, Kafka provides a MockProducer and a MockConsumer that can simulate a real Kafka producer and consumer, making unit testing easier.

Let’s look at how to write a unit test for our Kafka producer:

Java
@Test
public void testKafkaProducer() {
    Properties props = new Properties();
    // Add necessary properties...

    MockProducer<String, String> mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
    mockProducer.send(record);

    List<ProducerRecord<String, String>> records = mockProducer.history();
    assertEquals(1, records.size());
    assertEquals("key", records.get(0).key());
    assertEquals("value", records.get(0).value());
}

Similarly, we can write a unit test for our Kafka consumer:

Java
@Test
public void testKafkaConsumer() {
    Properties props = new Properties();
    // Add necessary properties...

    MockConsumer<String, String> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    mockConsumer.subscribe(Arrays.asList("test-topic"));

    mockConsumer.addRecord(new ConsumerRecord<>("test-topic", 0, 0, "key", "value"));

    ConsumerRecords<String, String> records = mockConsumer.poll(Duration.ofMillis(100));
    assertEquals(1, records.count());
    assertEquals("key", records.iterator().next().key());
    assertEquals("value", records.iterator().next

().value());
}

In both test cases, we simulate the behavior of real Kafka producers and consumers using the MockProducer and MockConsumer classes.

Conclusion

In this tutorial, we dove deep into building reliable Kafka producers and consumers using Java. We also looked at how to write unit tests for these components, ensuring the reliability and correctness of our Kafka applications.

We’ve only scratched the surface of what you can do with Apache Kafka. The platform also provides advanced features such as exactly-once processing semantics, compacted topics, and stream processing APIs. As you dive deeper into Kafka, you’ll discover that it provides all the tools necessary to build powerful, reliable, and scalable real-time applications.

Remember that the key to mastering Kafka is practice and experimentation. So, don’t be afraid to get your hands dirty and try out different configurations and settings.

Leave a Reply

About Author
Ozzie Feliciano CTO @ Felpfe Inc.

Ozzie Feliciano is a highly experienced technologist with a remarkable twenty-three years of expertise in the technology industry.

kafka-logo-tall-apache-kafka-fel
Stream Dream: Diving into Kafka Streams
In “Stream Dream: Diving into Kafka Streams,”...
ksql
Talking in Streams: KSQL for the SQL Lovers
“Talking in Streams: KSQL for the SQL Lovers”...
spring_cloud
Stream Symphony: Real-time Wizardry with Spring Cloud Stream Orchestration
Description: The blog post, “Stream Symphony:...
1_GVb-mYlEyq_L35dg7TEN2w
Kafka Chronicles: Saga of Resilient Microservices Communication with Spring Cloud Stream
“Kafka Chronicles: Saga of Resilient Microservices...
kafka-logo-tall-apache-kafka-fel
Tackling Security in Kafka: A Comprehensive Guide on Authentication and Authorization
As the usage of Apache Kafka continues to grow in organizations...
1 2 3 58
90's, 2000's and Today's Hits
Decades of Hits, One Station

Listen to the greatest hits of the 90s, 2000s and Today. Now on TuneIn. Listen while you code.