Create Kafka Producer And Consumer In Dotnet And Python CSharp Docker Kafka Python

Oct 23rd, 2020 - written by Kimserey with .

Last week we looked at how we could setup Kafka locally in Docker. Continuing along our Kafka series, we will look at how we can create a producer and consumer using confluent-kafka-dotnet.

Docker Setup

As a reminder of our post from last week, here is the docker compose file for our local setup:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
version: '2'
services:

  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    build: .
    ports:
      - "9094:9094"
    environment:      
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_CREATE_TOPICS: "kimtopic:2:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Which we then start with docker-compose up -d. This will start a broker available on localhost:9094 and with a topic kimtopic with 2 partitions.

Producer

To create a producer, we start by adding the confluent-kafka-dotnet nuget package. Then we can create a producer with the builder ProducerBuilder.

1
using var producer = new ProducerBuilder<int, string>(new ProducerConfig { BootstrapServers = "localhost:9094" }).Build();

The producer takes two types, the key type and value type. Here we define the key type as int and value as string. For the configuration of the producer, we specify the boostrap server as localhost:9094 as that’s the address we advertised.

Using this producer, we can then produce a message on the Kafka topic:

1
await producer.ProduceAsync("kimtopic", new Message<int, string> {Key = 1, Value = "hello world"});

Here is a producer with a while loop using Bogus to generate random content for the message:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Bogus;

namespace KafkaProducer
{
    class Program
    {
        public static async Task Main(string[] args)
        {
            var faker = new Faker();
            using var p = new ProducerBuilder<Null, string>(new ProducerConfig { BootstrapServers = "localhost:9094" }).Build();

            while (true)
            {
                Console.WriteLine("Click any key to generate a random value.");
                Console.ReadKey();

                try
                {
                    var dr = await p.ProduceAsync("kimtopic", new Message<Null, string> {Key = 1, Value = faker.Company.Bs()});
                     Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
                }
                catch (ProduceException<Null, string> e)
                {
                    Console.WriteLine($"Delivery failed: {e.Error.Reason}");
                }
            }
        }
    }
}

With this setup, we can use Kafkacat to list to the message this producer add to the topic:

1
kafkacat -C -b localhost:9094 -t kimtopic

Consumer

Once we have the producer setup, we can move on to look at creating the consumer part. Similarly to the producer, we use a consumer build:

1
2
3
4
5
6
7
8
9
var conf = new ConsumerConfig
{
    GroupId = "test-consumer-group-1",
    BootstrapServers = "localhost:9094",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoOffsetStore = false
};

using var consumer = new ConsumerBuilder<int, string>(conf).Build();

In the configuration of the consumer, we specify:

  • the consumer group id via GroupId which will identify the consumer group that this consumer joins,
  • the boostrap server localhost:9094,
  • the offset reset strategy to earliest which indicates that if the consumer restart at a committed offset which is no longer valid - it should restart from the earliest offset from the partition,

We also set the auto offset store to false, this is a special case where we do not want the offset to be committed right away after being delivered to the consumer but rather we want to mark it ready for commit once we processed successfully the message. This can be done by calling consumer.SotreOffset(consumerReuslt).

After having created the consumer, we can subcribe to the topic with:

1
consumer.Subscribe("kimtopic");

And once we subscribe we can then consume from it:

1
2
3
var consumerResult = consumer.Consume();
Console.WriteLine($"Consumed message '{consumerResult.Value}' at: '{consumerResult.TopicPartitionOffset}'.");
consumer.StoreOffset(consumerResult);

Here is a complete consumer with a while loop to continue consuming the topic indifinitely.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void Main(string[] args)
{
    var conf = new ConsumerConfig
    {
        GroupId = "test-consumer-group-1",
        BootstrapServers = "localhost:9094",
        AutoOffsetReset = AutoOffsetReset.Earliest,
        EnableAutoCommit = true,
        EnableAutoOffsetStore = false
    };

    var consumerBuilder = new ConsumerBuilder<Ignore, string>(conf);
    using var c = consumerBuilder.Build();
    c.Subscribe("kimtopic");

    try
    {
        while (true)
        {
            try
            {
                var cr = c.Consume();
                Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                c.StoreOffset(cr);
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"Error occured: {e.Error.Reason}");
            }
        }
    }
    catch (OperationCanceledException)
    {
        c.Close();
    }
}

Once reception, we look at the message value and the topic partition offset:

1
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");

Using this, we can demonstrate how the partitions are reassigned. If we start a single consumer, we will receive messages from both partitions, and when we start a second instance of the consumer, we will have a reassignment where each partition will be assigned to its separate consumer.

Producer and Consumer in Python

In the same way as C#, Python has an equivalent library called kafka-python.

1
pip install kafka-python

which we can use to get started creating a producer and a consumer. We use KafkaProducer to create a producer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from kafka import KafkaProducer
from kafka.errors import KafkaError
from faker import Faker
import json

producer = KafkaProducer(
    bootstrap_servers=["localhost:9094"],
    key_serializer=lambda m: json.dumps(m).encode("ascii"),
    value_serializer=lambda m: json.dumps(m).encode("ascii"),
)

fake = Faker()

for i in range(100):
    # Synchronously send to Kafka
    producer.send("kimtopic", value={"value": fake.address()}, key=i).get()

and KafkaConsumer to create a consumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    "kimtopic", 
    group_id="my-group", 
    bootstrap_servers=["localhost:9094"],
    key_deserializer=lambda m: json.loads(m.decode('ascii')),
    value_deserializer=lambda m: json.loads(m.decode('ascii')),
)
for message in consumer:
    print(
        "%s:%d:%d: key=%s value=%s"
        % (message.topic, message.partition, message.offset, message.key, message.value)
    )

The same configuration for the consuemr and producer can be specified direclty under the classes as keyword arguments. And we simply run the scripts py consumer.py and py producer.py to start testing. This showcase how even though Kafka is written in Java, the usage of it via the Consumer and Producer is totally under control of the user and any language can be used provided that a library was built to interface with the broker.

And that concludes today’s post!

Conclusion

Today we looked at how we could leverage the local Kafka setup we created in our previous post. We looked at how we could create a producer, what were the configuration involved in that. And we then looked at how we could create a consumer and looked into the configuration necessary to create a consumer. I hope you liked this post and I see you on the next one!

Kafka Posts

  1. Introduction to Kafkacat CLI for Kafka
  2. Local Kafka Docker Setup
  3. Kafka Consumer and Producer in dotnet
  4. Kafka Schema Registry with Avro
  5. Kafka Topics, Partitions and Consumer Groups
  6. Kafka Offsets and Statistics
  7. Kafka Log Compaction

External Sources

Designed, built and maintained by Kimserey Lam.