Felpfe Inc.
Search
Close this search box.
call 24/7

+484 237-1364‬

Search
Close this search box.

Creating Kafka Producers and Consumers in Different Programming Languages

Apache Kafka has emerged as a leading distributed streaming platform known for its high-throughput, fault-tolerant, and scalable nature. It enables the development of real-time data pipelines and applications that process and react to streams of data. In this comprehensive guide, we will explore how to create Kafka producers and consumers using different programming languages. We will provide step-by-step instructions, along with code samples and testing examples, to help you get started on your Kafka journey.

Table of Contents:

  1. Understanding Apache Kafka
  2. Setting up Apache Kafka
  3. Creating a Kafka Producer
    3.1. Python
    3.2. Java
    3.3. Node.js
    3.4. Go
  4. Creating a Kafka Consumer
    4.1. Python
    4.2. Java
    4.3. Node.js
    4.4. Go
  5. Testing the Kafka Producers and Consumers
  6. Conclusion
  7. Understanding Apache Kafka:
    Apache Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records in real-time. It provides fault tolerance, scalability, and durability, making it an ideal choice for building event-driven architectures, real-time analytics, and data integration pipelines. Kafka’s core abstraction is the topic, which is a category or feed name to which records can be published and subscribed.
  8. Setting up Apache Kafka:
    Before we delve into creating Kafka producers and consumers, it’s crucial to set up Apache Kafka on your local machine or a remote server. You can download Apache Kafka from the official website (https://kafka.apache.org/downloads) and follow the installation instructions specific to your operating system. Once installed, start the ZooKeeper server, which is required for Kafka to run, and then start the Kafka server.
  9. Creating a Kafka Producer:
    A Kafka producer is responsible for publishing messages to Kafka topics. Let’s explore how to create Kafka producers using different programming languages.

3.1. Python:

Python
from kafka import KafkaProducer

# Create a Kafka producer instance
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Publish a message to a Kafka topic
producer.send('my_topic', b'Hello, Kafka!')

# Close the Kafka producer
producer.close()

3.2. Java:

Java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "Hello, Kafka!");

        producer.send(record);

        producer.close();
    }
}

3.3. Node.js:

Bash
const { Kafka } = require('kafkajs');

async function run() {
  const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092']
  });

  const producer = kafka.producer();

  await producer.connect();

  await producer.send({
    topic: 'my_topic',
    messages: [{ value: 'Hello, Kafka!' }]
  });

  await producer.disconnect();
}

run().catch(console.error);

3.4. Go:

Go
package main

import (
    "log"

    "github.com/segmentio/kafka-go"
)

func main() {
    writer := kafka.NewWriter

(kafka.WriterConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "my_topic",
    })

    defer writer.Close()

    err := writer.WriteMessages(context.Background(),
        kafka.Message{Value: []byte("Hello, Kafka!")})

    if err != nil {
        log.Fatal("Failed to write messages:", err)
    }
}
  1. Creating a Kafka Consumer:
    A Kafka consumer reads messages from Kafka topics and processes them. Let’s explore how to create Kafka consumers using different programming languages.

4.1. Python:

Python
from kafka import KafkaConsumer

# Create a Kafka consumer instance
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')

# Consume messages from the Kafka topic
for message in consumer:
    print(message.value)

# Close the Kafka consumer
consumer.close()

4.2. Java:

Java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("group.id", "my_consumer_group");

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("my_topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

4.3. Node.js:

Bash
const { Kafka } = require('kafkajs');

async function run() {
  const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092']
  });

  const consumer = kafka.consumer({ groupId: 'my-group' });

  await consumer.connect();
  await consumer.subscribe({ topic: 'my_topic', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log(message.value.toString());
    },
  });
}

run().catch(console.error);

4.4. Go:

Go
package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
)

func main() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{"localhost:9092"},
        Topic:     "my_topic",
        Partition: 0,
        MinBytes:  10e3,
        MaxBytes:  10e6,
    })

    defer reader.Close()

    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Fatal("Failed to read message:", err)
        }
        log.Println(string(msg.Value))
    }
}
  1. Testing the Kafka Producers and Consumers:
    To test the Kafka producers and consumers, ensure that the Kafka server is running and create a Kafka topic named “my_topic” using the Kafka command-line tools.

For Python, run the producer script and then the consumer script in separate terminal windows. You should see the message “Hello, Kafka!” being consumed by the consumer.

For Java, compile and run the producer and consumer classes separately. Again, you should observe the message “Hello, Kafka!” being consumed by the consumer.

For Node.js, run the producer script and then the consumer script in separate terminal windows. You should observe the message “Hello, Kafka!” being consumed by the consumer.

For Go, compile and run the producer and consumer executables separately. You should observe the message “Hello, Kafka!” being consumed by the consumer.

In this comprehensive guide, we explored the process of creating Kafka producers and consumers in different programming languages. We covered Python, Java, Node.js, and Go as examples, showcasing the flexibility of Apache Kafka in supporting multiple programming languages.

By following the detailed steps and code samples provided, you can begin leveraging the power of Apache Kafka to build real-time data pipelines, event-driven architectures, and scalable systems. Kafka’s distributed streaming platform offers high throughput, fault tolerance, and durability, making it an ideal choice for handling real-time data streams.

Remember to thoroughly test your Kafka producers and consumers to ensure seamless integration and proper functioning. As you continue working with Kafka, you will discover its extensive capabilities and numerous use cases across various industries.

About Author
Ozzie Feliciano CTO @ Felpfe Inc.

Ozzie Feliciano is a highly experienced technologist with a remarkable twenty-three years of expertise in the technology industry.

kafka-logo-tall-apache-kafka-fel
Stream Dream: Diving into Kafka Streams
In “Stream Dream: Diving into Kafka Streams,”...
ksql
Talking in Streams: KSQL for the SQL Lovers
“Talking in Streams: KSQL for the SQL Lovers”...
spring_cloud
Stream Symphony: Real-time Wizardry with Spring Cloud Stream Orchestration
Description: The blog post, “Stream Symphony:...
1_GVb-mYlEyq_L35dg7TEN2w
Kafka Chronicles: Saga of Resilient Microservices Communication with Spring Cloud Stream
“Kafka Chronicles: Saga of Resilient Microservices...
kafka-logo-tall-apache-kafka-fel
Tackling Security in Kafka: A Comprehensive Guide on Authentication and Authorization
As the usage of Apache Kafka continues to grow in organizations...
1 2 3 58
90's, 2000's and Today's Hits
Decades of Hits, One Station

Listen to the greatest hits of the 90s, 2000s and Today. Now on TuneIn. Listen while you code.