Building microservices through Event Driven Architecture part12 : Produce events to Apache KAFKA.

In this tutorial, I will show how to publish events to apache KAFKA.

When a command happen on the client side , then it will produce an event ( ex : PlaceOrderCommand => OrderCreatedEvent).  New events are registered as uncommitted events by the aggregateroot  and inserted to a append only table (eventstore). 

Now I must produce this events to a service bus so that applications subscribed to the service bus can pick the events in order to process them.

On the next steps, I will have a consumer that will pick the events , and index them to a high performance no-sql database that will be used by the Query side of my application as backend database.

Introduction to Apache KAFKA

Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.

Installation of Java SE Development Kit 

Go to the following url, download and install java  :

Installation of Apache Kafka

Got to the following url and dowload then install  Kafka :

Choose the latest stable release, in my case I choose the scala 2.13 kafka_2.13-2.6.0 version

On the next screen, I choose the suggested mirror to download the binaries.

Download and extract the .tgz archive file to a installation folder  ( in my case  C:\KAFKADEMO folder on my workstation) .

You should have the following on windows

To verify that the installation is working , go under C:\KAFKADEMO\kafka_2.13-2.6.0\bin\windows location and run the following command :


add an environment variables

this step is optionnal, you can edit your environnement variables and add your kafka installation folder to the path

Add a folder working_dir and to 2 sub folders zookeeper-data and kafka-data  as on the following image

Start Zookeeper

To configure  zookeeper, edit the file and update the dataDir directory as the following.

Edit C:\KAFKADEMO\kafka_2.13-2.6.0\config\


Run the following command to start zookeeper :

zookeeper-server-start.bat config\

Start Kafka

To configure  Kafka, edit the file and update the log.dirs directory as the following.

Edit C:\KAFKADEMO\kafka_2.13-2.6.0\config\ 


Run the following command to start kafka :

kafka-server-start.bat config\

Create topic

Run the following command to create a topic :

kafka-topics –zookeeper –topic eventstream –create –partitions 3 –replication-factor 1

Run the following command to list topics:

kafka-topics –zookeeper –list

Run the following command to describe topics :

kafka-topics –zookeeper –topic eventstream –describe

Run the following command to delete a topic :

kafka-topics –zookeeper –topic eventstream –delete


To create a  producer that produce events to an apache kafka topic (eventstream), run the following command :

kafka-console-producer –broker-list –topic eventstream


To start consuming  events produced on topic (eventstream), run the following command :

kafka-console-consumer –bootstrap-server –topic eventstream

To start consuming  all events produced on topic (eventstream)  from the first event, run the following command :

kafka-console-consumer –bootstrap-server –topic eventstream –from-beginning

Introduction to Asp.Net Core SignalR

ASP.NET Core SignalR is an open-source library that simplifies adding real-time web functionality to apps. Real-time web functionality enables server-side code to push content to clients instantly.

Good candidates for SignalR:

  • Apps that require high frequency updates from the server. Examples are gaming, social networks, voting, auction, maps, and GPS apps.
  • Dashboards and monitoring apps. Examples include company dashboards, instant sales updates, or travel alerts.
  • Collaborative apps. Whiteboard apps and team meeting software are examples of collaborative apps.
  • Apps that require notifications. Social networks, email, chat, games, travel alerts, and many other apps use notifications.

Create a SignalR Hub :

To create a SignalR hub, I define the following interface so as to have a strongly typed hub

Hub Interfaces

  • Task OnPublish(T payload);

          To get notified when a message is published to the hub

  • Task OnPublish(string topic, T payload);

         To get notified when a message is published to a specific topic

  • Task OnSubscribe(string connectionId, string topic);

          To get notified when a client join a  specific topic

  • Task OnUnSubscribe(string connectionId, string topic);

          To get notified when a client leaves a specific topic

The following interface is used to subscribe and publish events

Here is the hub

ISignalRNotifier is the interface that publish and receive messages

Publish events to SignalR hub

When a command happen, it is stored as an event to an eventstore, then the producer could pick the event from the event store and publish it to a service bus. I don’t want it to work like that because I will wonder what event are yet published or not (isPublihed = true/false)  and update it accordindly.

So  for more flexibility I will introduce a SignalR Hub. So the scenario that I will implement is :

When a command happen, it is stored as an event to a eventstore and then published to a SignalR hub topic.   So clients interrested to that topic will get notified and then can process the event.  The client can be a service bus, a mobile app, a Single Page Application , etc…

Let us go ahead and publish events to the SignalR hub from the command side of our system.

So I have to update the handle function of LogCorner.EduSync.Speech.Application.UseCases.EventSourcingHandler.cs file and add the following :

_publisher.PublishAsync(Topics.Speech, eventStore);

Create a worker service

Lut us create a worker service and add the following classes


ProducerHostedService is a background service that host ProducerService 

A backgroundService is a base class for implementing a long running IHostedService


ProducerService subscribe to a signalR topic and handle events published on that topic.

It uses  IServiceBus to send received events to a service bus topic


ServiceBus use IServiceBusProvider interface to send messages to a service bus provider. So that I can switch to another service bus provider ( ex : RabbitMq, ect…) without changing implementation.


KafkaClient send messages to kafka using Confluent.Kafka ( )


Start zookeeper

zookeeper-server-start.bat config\

Start Kafka

kafka-server-start.bat config\

Start consuming

kafka-topics –zookeeper –topic eventstream –create –partitions 3 –replication-factor 1

kafka-console-consumer –bootstrap-server –topic eventstream

Start the following  projects :

  • LogCorner.EduSync.SignalR.Server
  • LogCorner.EduSync.Speech.Producer

Start the following  project :

  • LogCorner.EduSync.Speech.Presentation

Start postman and post a new command

You should see the following output on the comsumer console, consuming the command posted on postman

Code source is available here : 

Thanks for reading, if you have any feedback, feel free to post it





