Микросервисы — Брокер сообщений |

Микросервисы отделены друг от друга, но они по-прежнему могут взаимодействовать друг с другом. Существует три способа настройки связи в приложении, ориентированном на микросервисы:

Синхронный: Общение происходит в режиме реального времени.

  • RPC, gRPC
  • REST APIS, граф QL

АсинхронныйОбщение происходит независимо от времени с помощью брокера сообщений.

Гибридныймикросервис, который поддерживает как синхронный, так и асинхронный режим.

Это быстрый способ поэкспериментировать с асинхронной связью с использованием брокера сообщений, работающего как контейнер, и интеграции с микросервисными приложениями. Мы будем использовать Docker Desktop для лаборатории. Прежде чем мы приступим к упражнениям, давайте сначала рассмотрим основные команды докера:

Ниже приведены часто используемые команды Docker, когда вы начинаете работать с Docker Desktop (очистка среды).

docker context list
docker context show
docker image rm (docker image ls -q)
docker container ls -a
docker container ls -a -q
docker container rm (docker container ls -a -q)
docker volume ls
docker volume rm (docker volume ls -q)
docker network ls
docker network rm (docker network ls -q)

Полный список можно найти здесь

Compose — это инструмент для определения и запуска многоконтейнерных приложений Docker. С Compose вы используете файл YAML для настройки служб вашего приложения. Затем с помощью одной команды вы создаете и запускаете все службы из вашей конфигурации. Compose работает во всех средах: производстве, подготовке, разработке, тестировании, а также рабочих процессах непрерывной интеграции. Он также имеет команды для управления всем жизненным циклом вашего приложения:

  • Запускать, останавливать и перестраивать службы
  • Просмотр состояния запущенных служб
  • Потоковая передача журнала запущенных служб
  • Выполнение одноразовой команды для службы

Хранилище данных в памяти с открытым исходным кодом используется миллионами разработчиков в качестве базы данных, кэша, механизма потоковой передачи и брокера сообщений. Обычно используется для распределенного кэширования. Технически Redis не является программным обеспечением очереди сообщений, но с помощью некоторых клиентских библиотек его можно использовать для этой цели.

Создайте файл с именем redis-докер-compose.yml в каталоге вашего проекта и вставьте следующее:

version: '3.2'
services:
  redis:
    image: redis:6.2-alpine
    ports:
      - 6379:6379
    command: redis-server --save 60 1 --requirepass MDNcVb924a --loglevel warning

Из каталога проекта запустите приложение, запустив docker-compose up.

docker compose -f redis-docker-compose.yml up

Вы можете указать несколько файлов конфигурации -f. Когда вы предоставляете несколько файлов, Compose объединяет их в одну конфигурацию. Compose создает конфигурацию в том порядке, в котором вы предоставляете файлы. Последующие файлы переопределяют и добавляют к своим предшественникам.

редис-1.JPG

редис-2.JPG

using StackExchange.Redis;

internal class Program
{
    private static void Main(string[] args)
    {
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(
            new ConfigurationOptions
            {
                EndPoints = { "localhost:6379" }
            });

        var db = redis.GetDatabase(1);
        // Add key to the redis
        db.StringSet("key-name", "Dhananjay Kumar");
        // Get the value of the key.
        var bar = db.StringGet("key-name");
        Console.WriteLine(bar);
    }
}

редис-3.JPG

Это проект, разработанный и поддерживаемый Apache Software Foundation, написанный на Scala и Java. Команда Apache Kafka решила определить свой собственный протокол, вместо того, чтобы, например, использовать AMQP или STOMP. Apache Kafka с секционированием и репликацией упрощает масштабирование.

Основы Кафки

Создайте файл с именем кафка-докер-compose.yml в каталоге вашего проекта и вставьте следующее:

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafka-2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29093:29092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_HOST://localhost:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Как клиенты подключаются к кластеру Kafka (загрузочный сервер)?

Клиент, который хочет отправлять или получать сообщения из кластера Kafka, может подключиться к любому брокеру в кластере. Каждый брокер в кластере имеет метаданные обо всех других брокерах и также помогает клиенту подключиться к ним, поэтому любой брокер в кластере также называется загрузочным сервером.

Сервер начальной загрузки вернет клиенту метаданные, состоящие из списка всех брокеров в кластере. Затем, при необходимости, клиент будет знать, к какому именно брокеру подключиться для отправки или получения данных, и точно определить, какие брокеры содержат соответствующий тематический раздел.

кафка-1.JPG

using Confluent.Kafka;

var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
using var producer = new ProducerBuilder<Null, string>(config).Build();
try
{
  for (int i = 0; i < 10000000; i++)
  {
    await producer.ProduceAsync("demo-topic", new Message<Null, string> { Value = $"message- {i}" });
    Console.WriteLine($"message- {i} sent");
    Thread.Sleep(100);
  }
}
catch (Exception ex)
{
  throw;
}
using Confluent.Kafka;

var config = new ConsumerConfig
{
    GroupId = "gpid-1",
    BootstrapServers = "localhost:29092",
    AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Null, string>(config).Build();
consumer.Subscribe("demo-topic");
CancellationTokenSource token = new();
try
{
    while (true)
    {
        var res = consumer.Consume(token.Token);
        Console.WriteLine($"{res.Message.Value} received");
    }
}
catch (Exception ex)
{
    throw;
}

Console.ReadLine();

кафка-2.JPG

Docker сочиняет для стеков kafka

RabbitMQ поддерживает несколько протоколов. В этом руководстве используется AMQP 0-9-1, открытый протокол общего назначения для обмена сообщениями. Существует много клиентов для RabbitMQ на разных языках. Мы будем использовать клиент .NET, предоставленный RabbitMQ.

Создайте файл с именем RabbitMQ-докер-compose.yml в каталоге вашего проекта и вставьте следующее:

version: '3.2'
services:

  my_rabbit:
    container_name: my_rabbit
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "8080:15672"
    environment:
      RABBITMQ_DEFAULT_USER: "user"
      RABBITMQ_DEFAULT_PASS: "password"

кроликMQ-1.JPG

using RabbitMQ.Client;
using System;

namespace RabbitMQ.Producer
{
    static class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                Uri = new Uri("amqp://user:password@localhost:5672")
            };
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            QueueProducer.Publish(channel);
        }
    }
}
using System;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System.Text;
using System.Threading;

namespace RabbitMQ.Producer
{
    public static class QueueProducer
    {
        public static void Publish(IModel channel)
        {
            channel.QueueDeclare("demo-queue",
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);
            var count = 0;

            while (true)
            {
                var message = new { Name = "Producer", Message = $"Hello! Count: {count}" };
                var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                channel.BasicPublish("", "demo-queue", null, body);
                Console.WriteLine(message);
                count++;
                Thread.Sleep(1000);
            }
        }
    }
}
using RabbitMQ.Client;
using System;

namespace RabbitMQ.Consumer
{
    static class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                Uri = new Uri("amqp://user:password@localhost:5672")
            };
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            QueueConsumer.Consume(channel);
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQ.Consumer
{
    public static class QueueConsumer
    {

        public static void Consume(IModel channel)
        {
            channel.QueueDeclare("demo-queue",
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) => {
                var body = e.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(message);
            };

            channel.BasicConsume("demo-queue", true, consumer);
            Console.WriteLine("Consumer started");
            Console.ReadLine();
        }
    }
}

кроликMQ-3.JPG

кроликMQ-2.JPG

Похожие записи

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *