Building microservices through Event Driven Architecture part12 : Produce events to Apache KAFKA.

This tutorial is the  12th part of a series : Building microservices through Event Driven Architecture.

The previous step is about building microservices through Event Driven Architecture part11: Continuous Integration

In this tutorial, I will show how to publish events to apache KAFKA.

When a command happen on the client side , then it will produce an event ( ex : PlaceOrderCommand => OrderCreatedEvent).  New events are registered as uncommitted events by the aggregateroot  and inserted to a append only table (eventstore). 

Now I must produce this events to a service bus so that applications subscribed to the service bus can pick the events in order to process them.

On the next steps, I will have a consumer that will pick the events , and index them to a high performance no-sql database that will be used by the Query side of my application as backend database.

Introduction to Apache KAFKA

Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.

What is Apache Kafka®?

 

Perform advanced streaming data transformations with Apache Spark and Kafka in Azure HDInsight

https://docs.microsoft.com/en-us/learn/modules/perform-advanced-streaming-data-transformations-with-spark-kafka?WT.mc_id=AZ-MVP-5003013

Installation

Installation of Java SE Development Kit 

Go to the following url, download and install java  :  https://www.oracle.com/fr/java/technologies/javase/javase-jdk8-downloads.html

Installation of Apache Kafka

Got to the following url and dowload then install  Kafka : https://kafka.apache.org/downloads

Choose the latest stable release, in my case I choose the scala 2.13 kafka_2.13-2.6.0 version

On the next screen, I choose the suggested mirror to download the binaries.

Download and extract the .tgz archive file to a installation folder  ( in my case  C:\KAFKADEMO folder on my workstation) .

You should have the following on windows

To verify that the installation is working , go under C:\KAFKADEMO\kafka_2.13-2.6.0\bin\windows location and run the following command :

  kafka-topics.bat

add an environment variables

this step is optionnal, you can edit your environnement variables and add your kafka installation folder to the path

Add a folder working_dir and to 2 sub folders zookeeper-data and kafka-data  as on the following image

Start Zookeeper

To configure  zookeeper, edit the zookeeper.properties file and update the dataDir directory as the following.

Edit C:\KAFKADEMO\kafka_2.13-2.6.0\config\zookeeper.properties

dataDir=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/zookeeper-data

Run the following command to start zookeeper :

zookeeper-server-start.bat config\zookeeper.properties

Start Kafka

To configure  Kafka, edit the server.properties file and update the log.dirs directory as the following.

Edit C:\KAFKADEMO\kafka_2.13-2.6.0\config\server.properties 

log.dirs=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/kafka-data

Run the following command to start kafka :

kafka-server-start.bat config\server.properties

Create topic

Run the following command to create a topic :

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1

Run the following command to list topics:

kafka-topics –zookeeper 127.0.0.1:2181 –list

Run the following command to describe topics :

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –describe

Run the following command to delete a topic :

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –delete

producer

To create a  producer that produce events to an apache kafka topic (eventstream), run the following command :

kafka-console-producer –broker-list 127.0.0.1:9092 –topic eventstream

consumer

To start consuming  events produced on topic (eventstream), run the following command :

kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream

To start consuming  all events produced on topic (eventstream)  from the first event, run the following command :

kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream –from-beginning

Introduction to Asp.Net Core SignalR

ASP.NET Core SignalR is an open-source library that simplifies adding real-time web functionality to apps. Real-time web functionality enables server-side code to push content to clients instantly.

Good candidates for SignalR:

  • Apps that require high frequency updates from the server. Examples are gaming, social networks, voting, auction, maps, and GPS apps.
  • Dashboards and monitoring apps. Examples include company dashboards, instant sales updates, or travel alerts.
  • Collaborative apps. Whiteboard apps and team meeting software are examples of collaborative apps.
  • Apps that require notifications. Social networks, email, chat, games, travel alerts, and many other apps use notifications.

Introduction to ASP.NET Core SignalR

Tutorial: Get started with ASP.NET Core SignalR

What is Azure SignalR Service?

Create a SignalR Hub :

To create a SignalR hub, I define the following interface so as to have a strongly typed hub

Hub Interfaces

  • Task OnPublish(T payload);

          To get notified when a message is published to the hub

  • Task OnPublish(string topic, T payload);

         To get notified when a message is published to a specific topic

  • Task OnSubscribe(string connectionId, string topic);

          To get notified when a client join a  specific topic

  • Task OnUnSubscribe(string connectionId, string topic);

          To get notified when a client leaves a specific topic

The following interface is used to subscribe and publish events

Here is the hub

ISignalRNotifier is the interface that publish and receive messages

Publish events to SignalR hub

When a command happen, it is stored as an event to an eventstore, then the producer could pick the event from the event store and publish it to a service bus. I don’t want it to work like that because I will wonder what event are yet published or not (isPublihed = true/false)  and update it accordindly.

So  for more flexibility I will introduce a SignalR Hub. So the scenario that I will implement is :

When a command happen, it is stored as an event to a eventstore and then published to a SignalR hub topic.   So clients interrested to that topic will get notified and then can process the event.  The client can be a service bus, a mobile app, a Single Page Application , etc…

Let us go ahead and publish events to the SignalR hub from the command side of our system.

So I have to update the handle function of LogCorner.EduSync.Speech.Application.UseCases.EventSourcingHandler.cs file and add the following :

_publisher.PublishAsync(Topics.Speech, eventStore);

Create a worker service

Lut us create a worker service and add the following classes

ProducerHostedService

ProducerHostedService is a background service that host ProducerService 

A backgroundService is a base class for implementing a long running IHostedService  https://docs.microsoft.com/en-us/dotnet/api/microsoft.extensions.hosting.backgroundservice?view=dotnet-plat-ext-3.1&WT.mc_id=DOP-MVP-5003013

ProducerService

ProducerService subscribe to a signalR topic and handle events published on that topic.

It uses  IServiceBus to send received events to a service bus topic

ServiceBus

ServiceBus use IServiceBusProvider interface to send messages to a service bus provider. So that I can switch to another service bus provider ( ex : RabbitMq, ect…) without changing implementation.

KafkaClient

KafkaClient send messages to kafka using Confluent.Kafka ( https://www.nuget.org/packages/Confluent.Kafka/ )

Testing

Start zookeeper

zookeeper-server-start.bat config\zookeeper.properties

Start Kafka

kafka-server-start.bat config\server.properties

Start consuming

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1

kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream

Start the following  projects :

  • LogCorner.EduSync.SignalR.Server
  • LogCorner.EduSync.Speech.Producer

Start the following  project :

  • LogCorner.EduSync.Speech.Presentation

Start postman and post a new command

You should see the following output on the comsumer console, consuming the command posted on postman

Code source is available here : 

Thanks for reading, if you have any feedback, feel free to post it

Regards

 

 

Gora LEYE

I'm a microsoft most valuable professional (MVP) .NET Architect and Technical Expert skills located in Paris (FRANCE). The purpose of this blog is mainly to post general .NET tips and tricks, www.masterconduite.com Gora LEYE

Support us

BMC logoBuy me a coffee