Consuming events

Consuming events

We now have a way to generate events, but how do we consume them?

There are various ways in which 2 processes can communicate, but we're mainly interested in a way that allows the producer and the consumer to be decoupled from each other. A typical way to achieve that is to have another component in between them, that can persist the producer's data and make it available to the consumer.

Although there are a lot of possibilities for implementing such a requirement, the solution I'm currently interested in learning is Apache Kafka. I plan to write a separate series of articles about Kafka as I learn more about it, but in this article, I'll focus only on the practical matters of using it.

When I just started googling about Kafka I expected there will be a ton of examples on how to get started with it, but I had some surprising initial struggles. There is a lot of marketing info, free e-books, and large articles explaining more in-depth concepts, a bunch of them from some company called Confluent, but I couldn't see a clear starting point.

"Ok, whatever, I'll just do it all myself" was the thought as I went to Docker Hub, confidently typed "kafka" and got this.

Docker Hub results for kafka search term

10.000 results, rapidly decreasing number of downloads, no Apache, no Confluent, no clue what's going on. Ok, back to googling.

A few queries later I learned that Confluent was founded by the creators of Kafka and that the good (best?) image to start with is confluentinc/cp-kafka . I also stumbled upon this nice article that shows how to set up Kafka and Zookeeper and provides simple Producer and Consumer applications.

I decided to use the default .NET client for both Producer and Consumer instead of kafka-sharp that was mentioned in the article, because it was no longer maintained, and it felt simpler to use the same package on both sides.

Changes to the Producer

First I had to add the client Kafka package.

cd Producer
dotnet add package Confluent.Kafka

I did a quick rename of Program.cs files for both projects so I can distinguish them easier. After that, the Producer code had to be modified so it sends the events to the Kafka topic.

// Producer.cs 
using Confluent.Kafka;

var _mousePosition = new MousePosition();
var topicName = "mouse-position";
var kafkaConfig = new ProducerConfig { BootstrapServers = "localhost:9092" };

using (var producer = new ProducerBuilder<Null, string>(kafkaConfig).Build())
{
    while (true)
    {
        var newPosition = _mousePosition.GetNewPosition();
        if (newPosition != string.Empty)
        {
            var message = new Message<Null, string> { Value = newPosition };
            await producer.ProduceAsync(topicName, message);

            Console.WriteLine(newPosition);
        }
        await Task.Delay(100);
    }
}

After running the project there's an error because no one is listening on port 9092.

[thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Unknown error (after 2041ms in state CONNECT)

To fix that, we need to bring up Zookeeper and Kafka. Following the tutorial I found, I ran the following commands.

docker pull confluentinc/cp-zookeeper
docker pull confluentinc/cp-kafka
docker network create kafka

docker run --name zookeeper --network=kafka -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper

docker run --name kafka --network=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 confluentinc/cp-kafka

These commands pulled respective images, created a network, and used it when running both containers to ensure they can communicate. The last 2 commands need to be created in separate windows or separate panes in the case of Windows Terminal (check out the Cheatsheets page for some shortcuts related to panes). The second command also exposed port 9092, so hopefully, our producer will function correctly. And it does, or at least it looks like it does, as I don't see any exceptions.

The real verification will be the successful consumption of those events. But for that, we need to create the consumer. For the first version, the consumer could only retrieve the events and output them to the console. Later, we can think of different ways how to use the events. The consumer will use the same client package as the producer.

// Consumer.cs
using Confluent.Kafka;

var topicName = "mouse-position";
var kafkaConfig = new ConsumerConfig
{
    GroupId = "group1",
    BootstrapServers = "localhost:9092",
    AutoOffsetReset = AutoOffsetReset.Earliest,
};

using (var consumer = new ConsumerBuilder<Null, string>(kafkaConfig).Build())
{
    consumer.Subscribe(topicName);
    while (true)
    {
        var result = consumer.Consume();
        var message = result.Message.Value;
        var offset = result.Offset;
        Console.WriteLine($"Consumed message '{message}' at: '{offset}'.");
    }
}

After running the consumer, the terminal pane with Kafka started to output errors.

[2022-12-04 18:38:27,718] INFO [Admin Manager on Broker 1001]: Error processing create topic request CreatableTopic(name='__consumer_offsets', numPartitions=50, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='compression.type', value='producer'), CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='segment.bytes', value='104857600')]) (kafka.server.ZkAdminManager)
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

I thought I messed up some config with the consumer, so I tried playing with options within ConsumerConfig but I didn't see anything useful. Then I learned about some utilities available inside the Kafka container, so I tried to consume the messages directly from the container.

docker exec -it kafka /bin/bash
kafka-console-consumer --bootstrap-server localhost:9092 --topic mouse-position --from-beginning

It resulted in a bunch of the same errors in the Kafka terminal pane, so I had to google a bit more for help. When I look at the error message above, so nicely highlighted in this code block, it is embarrassingly obvious, but at that moment I had a different mindset - I was trying to get the most simple example running, so any obstacle felt frustrating and that hampered my reasoning. After a bit of searching, I understood that the problem had to do with replication, a concept used to ensure redundancy and high availability.

At this moment, I don't care much about any of that, I just want simplicity, so I looked for a way to tell Kafka that I don't want any replication. Surely, there was an argument I could pass to specify no replication.

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

Because of the specific container names, it's not possible to execute the docker run command multiple times unless the previous container is stopped and removed, so I often had to do docker rm -f kafka and docker rm -f zookeeper.

I included the new argument in the new version of the docker run command.

docker run --name kafka --network=kafka -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 confluentinc/cp-kafka

And that was it, the consumer started to work as expected! The first version of senseless events is in place! :)

Did you find this article valuable?

Support Mladen Drmac by becoming a sponsor. Any amount is appreciated!