Microservices - Message Broker
Microservices are decoupled from each other but they still can communicate with each other. There are three ways to set up communication in a microservices-oriented application:
Synchronous: Communication happens in real-time.
- RPC, gRPC
- REST APIS, graphQL
Asynchronous, Communication happens independent of time using a Mesage broker.
- RabbitMQ
- Kafka
- Redis
Hybrid, Microservice which supports both Synchronous and Asynchronous.
This is a quick start to play around with asynchronous communication using a message broker running as a container and integration with microservice applications. We will be using Docker Desktop for the lab. Before we jump start to exercise, let's first look at basic docker commands -
Useful docker commands
Below are commonly used docker commands when you start working on Docker Desktop (Environment cleanup).
docker context list
docker context show
docker image rm (docker image ls -q)
docker container ls -a
docker container ls -a -q
docker container rm (docker container ls -a -q)
docker volume ls
docker volume rm (docker volume ls -q)
docker network ls
docker network rm (docker network ls -q)
Complete List can be found here
Docker Compose
Compose is a tool for defining and running multi-container Docker applications. With Compose, you use a YAML file to configure your application’s services. Then, with a single command, you create and start all the services from your configuration. Compose works in all environments: production, staging, development, testing, as well as CI workflows. It also has commands for managing the whole lifecycle of your application:
- Start, stop, and rebuild services
- View the status of running services
- Stream the log output of running services
- Run a one-off command on a service
Redis
The open-source, in-memory data store is used by millions of developers as a database, cache, streaming engine, and message broker. Commonly used for distributed caching. Redis is not technically a message queue software, but through some client libraries, it can be used for that purpose.
Create a file called redis-docker-compose.yml in your project directory and paste the following:
version: '3.2'
services:
redis:
image: redis:6.2-alpine
ports:
- 6379:6379
command: redis-server --save 60 1 --requirepass MDNcVb924a --loglevel warning
From your project directory, start up your application by running docker-compose up.
docker compose -f redis-docker-compose.yml up
You can supply multiple -f configuration files. When you supply multiple files, Compose combines them into a single configuration. Compose builds the configuration in the order you supply the files. Subsequent files override and add to their predecessors.
using StackExchange.Redis;
internal class Program
{
private static void Main(string[] args)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(
new ConfigurationOptions
{
EndPoints = { "localhost:6379" }
});
var db = redis.GetDatabase(1);
// Add key to the redis
db.StringSet("key-name", "Dhananjay Kumar");
// Get the value of the key.
var bar = db.StringGet("key-name");
Console.WriteLine(bar);
}
}
Apache Kafka
It's a project developed and maintained by the Apache Software Foundation, written in Scala and Java. The Apache Kafka team chooses to define its own protocol, instead of adopting AMQP or STOMP for example. Apache Kafka with partitioning and replication makes it easy to scale.
Create a file called kafka-docker-compose.yml in your project directory and paste the following:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29093:29092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_HOST://localhost:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
How do clients connect to a Kafka Cluster (bootstrap server)?
A client that wants to send or receive messages from the Kafka cluster may connect to any broker in the cluster. Every broker in the cluster has metadata about all the other brokers and will help the client connect to them as well, and therefore any broker in the cluster is also called a bootstrap server.
The bootstrap server will return metadata to the client that consists of a list of all the brokers in the cluster. Then, when required, the client will know which exact broker to connect to send or receive data, and accurately find which brokers contain the relevant topic partition.
using Confluent.Kafka;
var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
using var producer = new ProducerBuilder<Null, string>(config).Build();
try
{
for (int i = 0; i < 10000000; i++)
{
await producer.ProduceAsync("demo-topic", new Message<Null, string> { Value = $"message- {i}" });
Console.WriteLine($"message- {i} sent");
Thread.Sleep(100);
}
}
catch (Exception ex)
{
throw;
}
using Confluent.Kafka;
var config = new ConsumerConfig
{
GroupId = "gpid-1",
BootstrapServers = "localhost:29092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Null, string>(config).Build();
consumer.Subscribe("demo-topic");
CancellationTokenSource token = new();
try
{
while (true)
{
var res = consumer.Consume(token.Token);
Console.WriteLine($"{res.Message.Value} received");
}
}
catch (Exception ex)
{
throw;
}
Console.ReadLine();
Docker compose for kafka stacks
RabbitMQ
RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open, general-purpose protocol for messaging. There are many clients for RabbitMQ in many different languages. We'll use the .NET client provided by RabbitMQ.
Create a file called rabbitmq-docker-compose.yml in your project directory and paste the following:
version: '3.2'
services:
my_rabbit:
container_name: my_rabbit
image: rabbitmq:3-management
ports:
- "5672:5672"
- "8080:15672"
environment:
RABBITMQ_DEFAULT_USER: "user"
RABBITMQ_DEFAULT_PASS: "password"
using RabbitMQ.Client;
using System;
namespace RabbitMQ.Producer
{
static class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://user:password@localhost:5672")
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
QueueProducer.Publish(channel);
}
}
}
using System;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System.Text;
using System.Threading;
namespace RabbitMQ.Producer
{
public static class QueueProducer
{
public static void Publish(IModel channel)
{
channel.QueueDeclare("demo-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var count = 0;
while (true)
{
var message = new { Name = "Producer", Message = $"Hello! Count: {count}" };
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
channel.BasicPublish("", "demo-queue", null, body);
Console.WriteLine(message);
count++;
Thread.Sleep(1000);
}
}
}
}
using RabbitMQ.Client;
using System;
namespace RabbitMQ.Consumer
{
static class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://user:password@localhost:5672")
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
QueueConsumer.Consume(channel);
}
}
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQ.Consumer
{
public static class QueueConsumer
{
public static void Consume(IModel channel)
{
channel.QueueDeclare("demo-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) => {
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
};
channel.BasicConsume("demo-queue", true, consumer);
Console.WriteLine("Consumer started");
Console.ReadLine();
}
}
}