Микросервисы — Брокер сообщений |
Микросервисы отделены друг от друга, но они по-прежнему могут взаимодействовать друг с другом. Существует три способа настройки связи в приложении, ориентированном на микросервисы:
Синхронный: Общение происходит в режиме реального времени.
- RPC, gRPC
- REST APIS, граф QL
АсинхронныйОбщение происходит независимо от времени с помощью брокера сообщений.
Гибридныймикросервис, который поддерживает как синхронный, так и асинхронный режим.
Это быстрый способ поэкспериментировать с асинхронной связью с использованием брокера сообщений, работающего как контейнер, и интеграции с микросервисными приложениями. Мы будем использовать Docker Desktop для лаборатории. Прежде чем мы приступим к упражнениям, давайте сначала рассмотрим основные команды докера:
Ниже приведены часто используемые команды Docker, когда вы начинаете работать с Docker Desktop (очистка среды).
docker context list
docker context show
docker image rm (docker image ls -q)
docker container ls -a
docker container ls -a -q
docker container rm (docker container ls -a -q)
docker volume ls
docker volume rm (docker volume ls -q)
docker network ls
docker network rm (docker network ls -q)
Полный список можно найти здесь
Compose — это инструмент для определения и запуска многоконтейнерных приложений Docker. С Compose вы используете файл YAML для настройки служб вашего приложения. Затем с помощью одной команды вы создаете и запускаете все службы из вашей конфигурации. Compose работает во всех средах: производстве, подготовке, разработке, тестировании, а также рабочих процессах непрерывной интеграции. Он также имеет команды для управления всем жизненным циклом вашего приложения:
- Запускать, останавливать и перестраивать службы
- Просмотр состояния запущенных служб
- Потоковая передача журнала запущенных служб
- Выполнение одноразовой команды для службы
Хранилище данных в памяти с открытым исходным кодом используется миллионами разработчиков в качестве базы данных, кэша, механизма потоковой передачи и брокера сообщений. Обычно используется для распределенного кэширования. Технически Redis не является программным обеспечением очереди сообщений, но с помощью некоторых клиентских библиотек его можно использовать для этой цели.
Создайте файл с именем redis-докер-compose.yml в каталоге вашего проекта и вставьте следующее:
version: '3.2'
services:
redis:
image: redis:6.2-alpine
ports:
- 6379:6379
command: redis-server --save 60 1 --requirepass MDNcVb924a --loglevel warning
Из каталога проекта запустите приложение, запустив docker-compose up.
docker compose -f redis-docker-compose.yml up
Вы можете указать несколько файлов конфигурации -f. Когда вы предоставляете несколько файлов, Compose объединяет их в одну конфигурацию. Compose создает конфигурацию в том порядке, в котором вы предоставляете файлы. Последующие файлы переопределяют и добавляют к своим предшественникам.
using StackExchange.Redis;
internal class Program
{
private static void Main(string[] args)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(
new ConfigurationOptions
{
EndPoints = { "localhost:6379" }
});
var db = redis.GetDatabase(1);
// Add key to the redis
db.StringSet("key-name", "Dhananjay Kumar");
// Get the value of the key.
var bar = db.StringGet("key-name");
Console.WriteLine(bar);
}
}
Это проект, разработанный и поддерживаемый Apache Software Foundation, написанный на Scala и Java. Команда Apache Kafka решила определить свой собственный протокол, вместо того, чтобы, например, использовать AMQP или STOMP. Apache Kafka с секционированием и репликацией упрощает масштабирование.
Создайте файл с именем кафка-докер-compose.yml в каталоге вашего проекта и вставьте следующее:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29093:29092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_HOST://localhost:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Как клиенты подключаются к кластеру Kafka (загрузочный сервер)?
Клиент, который хочет отправлять или получать сообщения из кластера Kafka, может подключиться к любому брокеру в кластере. Каждый брокер в кластере имеет метаданные обо всех других брокерах и также помогает клиенту подключиться к ним, поэтому любой брокер в кластере также называется загрузочным сервером.
Сервер начальной загрузки вернет клиенту метаданные, состоящие из списка всех брокеров в кластере. Затем, при необходимости, клиент будет знать, к какому именно брокеру подключиться для отправки или получения данных, и точно определить, какие брокеры содержат соответствующий тематический раздел.
using Confluent.Kafka;
var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
using var producer = new ProducerBuilder<Null, string>(config).Build();
try
{
for (int i = 0; i < 10000000; i++)
{
await producer.ProduceAsync("demo-topic", new Message<Null, string> { Value = $"message- {i}" });
Console.WriteLine($"message- {i} sent");
Thread.Sleep(100);
}
}
catch (Exception ex)
{
throw;
}
using Confluent.Kafka;
var config = new ConsumerConfig
{
GroupId = "gpid-1",
BootstrapServers = "localhost:29092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Null, string>(config).Build();
consumer.Subscribe("demo-topic");
CancellationTokenSource token = new();
try
{
while (true)
{
var res = consumer.Consume(token.Token);
Console.WriteLine($"{res.Message.Value} received");
}
}
catch (Exception ex)
{
throw;
}
Console.ReadLine();
Docker сочиняет для стеков kafka
RabbitMQ поддерживает несколько протоколов. В этом руководстве используется AMQP 0-9-1, открытый протокол общего назначения для обмена сообщениями. Существует много клиентов для RabbitMQ на разных языках. Мы будем использовать клиент .NET, предоставленный RabbitMQ.
Создайте файл с именем RabbitMQ-докер-compose.yml в каталоге вашего проекта и вставьте следующее:
version: '3.2'
services:
my_rabbit:
container_name: my_rabbit
image: rabbitmq:3-management
ports:
- "5672:5672"
- "8080:15672"
environment:
RABBITMQ_DEFAULT_USER: "user"
RABBITMQ_DEFAULT_PASS: "password"
using RabbitMQ.Client;
using System;
namespace RabbitMQ.Producer
{
static class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://user:password@localhost:5672")
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
QueueProducer.Publish(channel);
}
}
}
using System;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System.Text;
using System.Threading;
namespace RabbitMQ.Producer
{
public static class QueueProducer
{
public static void Publish(IModel channel)
{
channel.QueueDeclare("demo-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var count = 0;
while (true)
{
var message = new { Name = "Producer", Message = $"Hello! Count: {count}" };
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
channel.BasicPublish("", "demo-queue", null, body);
Console.WriteLine(message);
count++;
Thread.Sleep(1000);
}
}
}
}
using RabbitMQ.Client;
using System;
namespace RabbitMQ.Consumer
{
static class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://user:password@localhost:5672")
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
QueueConsumer.Consume(channel);
}
}
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQ.Consumer
{
public static class QueueConsumer
{
public static void Consume(IModel channel)
{
channel.QueueDeclare("demo-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) => {
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
};
channel.BasicConsume("demo-queue", true, consumer);
Console.WriteLine("Consumer started");
Console.ReadLine();
}
}
}