Codementor Events

Microservices - Message Broker

Published Jan 21, 2023Last updated Jan 27, 2023
Microservices - Message Broker

Microservices are decoupled from each other but they still can communicate with each other. There are three ways to set up communication in a microservices-oriented application:

Synchronous: Communication happens in real-time.

  • RPC, gRPC
  • REST APIS, graphQL

Asynchronous, Communication happens independent of time using a Mesage broker.

  • RabbitMQ
  • Kafka
  • Redis

Hybrid, Microservice which supports both Synchronous and Asynchronous.

This is a quick start to play around with asynchronous communication using a message broker running as a container and integration with microservice applications. We will be using Docker Desktop for the lab. Before we jump start to exercise, let's first look at basic docker commands -

Useful docker commands

Below are commonly used docker commands when you start working on Docker Desktop (Environment cleanup).

docker context list
docker context show
docker image rm (docker image ls -q)
docker container ls -a
docker container ls -a -q
docker container rm (docker container ls -a -q)
docker volume ls
docker volume rm (docker volume ls -q)
docker network ls
docker network rm (docker network ls -q)

Complete List can be found here

Docker Compose

Compose is a tool for defining and running multi-container Docker applications. With Compose, you use a YAML file to configure your application’s services. Then, with a single command, you create and start all the services from your configuration. Compose works in all environments: production, staging, development, testing, as well as CI workflows. It also has commands for managing the whole lifecycle of your application:

  • Start, stop, and rebuild services
  • View the status of running services
  • Stream the log output of running services
  • Run a one-off command on a service

Redis

The open-source, in-memory data store is used by millions of developers as a database, cache, streaming engine, and message broker. Commonly used for distributed caching. Redis is not technically a message queue software, but through some client libraries, it can be used for that purpose.

Create a file called redis-docker-compose.yml in your project directory and paste the following:

version: '3.2'
services:
  redis:
    image: redis:6.2-alpine
    ports:
      - 6379:6379
    command: redis-server --save 60 1 --requirepass MDNcVb924a --loglevel warning

From your project directory, start up your application by running docker-compose up.

docker compose -f redis-docker-compose.yml up

You can supply multiple -f configuration files. When you supply multiple files, Compose combines them into a single configuration. Compose builds the configuration in the order you supply the files. Subsequent files override and add to their predecessors.

redis-1.JPG

redis-2.JPG

using StackExchange.Redis;

internal class Program
{
    private static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(
            new ConfigurationOptions
            {
                EndPoints = { "localhost:6379" }
            });

        var db = redis.GetDatabase(1);
        // Add key to the redis
        db.StringSet("key-name", "Dhananjay Kumar");
        // Get the value of the key.
        var bar = db.StringGet("key-name");
        Console.WriteLine(bar);
    }
}

redis-3.JPG

Apache Kafka

It's a project developed and maintained by the Apache Software Foundation, written in Scala and Java. The Apache Kafka team chooses to define its own protocol, instead of adopting AMQP or STOMP for example. Apache Kafka with partitioning and replication makes it easy to scale.

Kafka Fundamentals

Create a file called kafka-docker-compose.yml in your project directory and paste the following:

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafka-2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29093:29092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_HOST://localhost:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

How do clients connect to a Kafka Cluster (bootstrap server)?

A client that wants to send or receive messages from the Kafka cluster may connect to any broker in the cluster. Every broker in the cluster has metadata about all the other brokers and will help the client connect to them as well, and therefore any broker in the cluster is also called a bootstrap server.

The bootstrap server will return metadata to the client that consists of a list of all the brokers in the cluster. Then, when required, the client will know which exact broker to connect to send or receive data, and accurately find which brokers contain the relevant topic partition.

kafka-1.JPG

using Confluent.Kafka;

var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
using var producer = new ProducerBuilder<Null, string>(config).Build();
try
{
  for (int i = 0; i < 10000000; i++)
  {
    await producer.ProduceAsync("demo-topic", new Message<Null, string> { Value = $"message- {i}" });
    Console.WriteLine($"message- {i} sent");
    Thread.Sleep(100);
  }
}
catch (Exception ex)
{
  throw;
}
using Confluent.Kafka;

var config = new ConsumerConfig
{
    GroupId = "gpid-1",
    BootstrapServers = "localhost:29092",
    AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Null, string>(config).Build();
consumer.Subscribe("demo-topic");
CancellationTokenSource token = new();
try
{
    while (true)
    {
        var res = consumer.Consume(token.Token);
        Console.WriteLine($"{res.Message.Value} received");
    }
}
catch (Exception ex)
{
    throw;
}

Console.ReadLine();

kafka-2.JPG

Docker compose for kafka stacks

RabbitMQ

RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open, general-purpose protocol for messaging. There are many clients for RabbitMQ in many different languages. We'll use the .NET client provided by RabbitMQ.

Create a file called rabbitmq-docker-compose.yml in your project directory and paste the following:

version: '3.2'
services:

  my_rabbit:
    container_name: my_rabbit
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "8080:15672"
    environment:
      RABBITMQ_DEFAULT_USER: "user"
      RABBITMQ_DEFAULT_PASS: "password"

rabbitMQ-1.JPG

using RabbitMQ.Client;
using System;

namespace RabbitMQ.Producer
{
    static class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                Uri = new Uri("amqp://user:password@localhost:5672")
            };
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            QueueProducer.Publish(channel);
        }
    }
}
using System;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System.Text;
using System.Threading;

namespace RabbitMQ.Producer
{
    public static class QueueProducer
    {
        public static void Publish(IModel channel)
        {
            channel.QueueDeclare("demo-queue",
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);
            var count = 0;

            while (true)
            {
                var message = new { Name = "Producer", Message = $"Hello! Count: {count}" };
                var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                channel.BasicPublish("", "demo-queue", null, body);
                Console.WriteLine(message);
                count++;
                Thread.Sleep(1000);
            }
        }
    }
}
using RabbitMQ.Client;
using System;

namespace RabbitMQ.Consumer
{
    static class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                Uri = new Uri("amqp://user:password@localhost:5672")
            };
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            QueueConsumer.Consume(channel);
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQ.Consumer
{
    public static class QueueConsumer
    {

        public static void Consume(IModel channel)
        {
            channel.QueueDeclare("demo-queue",
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) => {
                var body = e.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(message);
            };

            channel.BasicConsume("demo-queue", true, consumer);
            Console.WriteLine("Consumer started");
            Console.ReadLine();
        }
    }
}

rabbitMQ-3.JPG

rabbitMQ-2.JPG

Discover and read more posts from DhananjayKumar
get started