Представляем ReplayQ, буферный уровень EMQX для надежной интеграции данных IoT
EMQX является наиболее масштабируемым MQTT брокер для IoT, IIoT и подключенные транспортные средства. Он также обладает широкими возможностями расширения для интеграции с различными базами данных и потоковыми платформами. EMQX ПредприятиеRule-Engine предоставляет богатый набор интеграций данных из коробки. Вот некоторые из них: PostgreSQL, MySQL, InfluxDB и Kafka.
Вы можете найти полный список в EMQX Корпоративные документы.
Начиная с версии 4.3, EMQX Enterprise поддерживает локальный буфер отправки для интеграции Kafka.
Этот буферный слой, названный replayq
делает интеграцию данных EMQX более устойчивой к сбоям в работе сети и простоям служб.
В этом посте мы сначала объясним, зачем нужен этот буфер, затем возьмем в качестве примера интеграцию Kafka, чтобы представить высокоуровневый дизайн, а затем углубимся в подробности, чтобы лучше понять, как replayq
работает.
Зачем нам буфер?
В каждом узле EMQX производитель Kafka представляет собой пул рабочих процессов, по одному на каждый раздел Kafka. Как показано ниже, для темы Кафки, имеющей M
разделы, будет пул M
рабочие процессы, работающие на каждом узле EMQX.
Как вы, возможно, уже знаете, EMQX написан на языке Эрланг язык программирования. Если мы используем жаргон Erlang, «работник» в этом контексте является «процессом» Erlang. В отличие от процессов Linux, процессы Erlang чрезвычайно легковесны, работают в изоляции памяти и планируются виртуальной машиной Erlang (VM).
В EMQX для каждого клиента MQTT существует отдельный процесс, и производители Kafka не являются исключением, они тоже процессы.
В идеальном мире этот поток данных должен быть плавным, все сетевые/программные компоненты имеют достаточную мощность для обработки потока данных практически без откатов к клиенту. Однако во многих случаях могут быть перегрузки и даже потери соединения. Учитывая ограниченную вычислительную и сетевую среду, на самом деле не существует хорошего решения для полного устранения перегрузок и сетевых нарушений, вместо этого мы пытаемся смягчить их влияние.
Небольшая перегруженность может привести лишь к еще большему отталкиванию клиентов. Например, в таких случаях они могут столкнуться с более высокой задержкой при получении PUBACK от брокера для сообщений QoS 1.
В некоторых случаях все может быть немного сложнее, чем просто увеличение задержки, например:
- Сетевые нарушения от EMQX до Kafka
- Kafka (временно) не работает (да, Kafka очень доступна, но всегда есть исключения)
- Резкое увеличение трафика MQTT выше запланированной мощности сети или ЦП на стороне Kafka.
Это когда клиенты MQTT могут начать испытывать тайм-ауты.
И вот тогда мы начинаем желать, чтобы EMQX мог сглаживать пики и смягчать (временные) ошибки — буфер с именем replayq
был введен именно для этой цели.
Мы можем рассматривать рабочих-производителей как очереди. Помня о концепции «очереди», вы сможете понять остальную часть этого документа.
Что такое ReplayQ
replayq
— это библиотека очередей, изначально использовавшаяся в EMQX Kafka, а затем преобразованная в универсальное решение. Он предлагает общий механизм очередей для размещения сообщений в ОЗУ или на диске в зависимости от того, как он настроен.
Название «воспроизведение» происходит от его способности буферизовать сообщения на диске и потенциально воспроизводить старые сообщения.
Другой режим буферизации
Для очередей буферизации сообщений EMQX предоставил два разных режима буферизации.
Режим «Только память» используется, когда мост данных не настроен с replayq
каталог данных. Буферизованные сообщения в этом режиме нестабильны, если узел перезагружается, все буферизованные сообщения теряются. По сравнению с полным отсутствием буфера преимущество явного буфера «Только память» заключается в его защите от перегрузки (о чем мы поговорим позже в этом посте).
Режим диска
Чтобы было проще понять, как работает дисковый режим, давайте немного увеличим масштаб, взглянув только на одну конкретную очередь буфера.
Варианты
Есть три разных варианта режима dis
- нормальный (
offload=false
)- Каждая операция постановки в очередь вызывает запись на диск. Это может быть интенсивным IOPS, если скорость постановки в очередь высока.
- разгрузить (
offload=true
)- Записывать на диск только после накопления определенного количества байтов в оперативной памяти. Это означает, что сообщения, хранящиеся в ОЗУ, являются энергозависимыми, но сообщения, хранящиеся на диске, сохранятся после перезагрузки.
- изменчивая разгрузка (
offload={true, volatile}
)- То же, что и режим разгрузки, но все элементы очереди являются энергозависимыми, включая элементы, хранящиеся на диске.
Сегментированный
В режиме «Диск» сообщения хранятся в файлах, для хранения сообщений на диске есть две крайние схемы
- Один файл для каждого сообщения.
Это практично, когда есть только несколько сообщений для буферизации, потому что их очень много. иноды файловая система может выделить, в какой-то момент будет слишком дорого даже просто перечислить все файлы на диске. - Все сообщения в одном файле.
Это практично, когда имеется только ограниченное количество байтов для буферизации. Почти все файловые системы имеют предельный размер для одного файла некоторые из них, возможно, достаточно велики для большинства случаев использования, но недостатком являются накладные расходы на сканирование или поиск (чтобы пропустить уже использованные данные).
Вот почему мы выбираем промежуточный баланс: буфер состоит из ряда файлов, называемых «сегментами».
Сегментация файлов может повысить производительность и управляемость данных. Разделение на более мелкие сегменты упрощает доступ к определенным частям потока и поиск конкретной информации. Кроме того, сегментация упрощает управление данными, например архивирование или удаление старых сегментов для освобождения места.
Как показано на диаграмме ниже. Новые сообщения, поступающие в очередь, будут присоединены к заднему (последнему) файлу сегмента.
Когда размер сегмента превышает настраиваемый предел «байтов сегмента», будет создан новый файл сегмента.
При извлечении полный головной (самый старый) сегмент загружается в память, которая затем работает как обычная очередь в памяти. Отличие от очереди в памяти состоит в том, что нам нужно записать курсор (т. COMMIT
) на диске после успешной обработки извлеченных элементов.
СОВЕРШИТЬ
В режиме памяти очередь является изменяемой, удаление элементов из начала очереди уменьшает размер очереди (т. е. оставляя позади хвостовую часть). Однако, когда очередь представляет собой файл на диске, изменение файла станет слишком дорогим для выполнения дискового ввода-вывода.
Поэтому нам нужен способ записи курсора (логической позиции), чтобы помочь нам быстро найти место, где он был остановлен после перезапуска, чтобы избежать повторной обработки старых сообщений. Вот почему был введен маркерный файл COMMIT.
Файл фиксации по существу записывает две части информации: 1) номер сегмента; 2) идентификатор элемента очереди (порядковый номер внутри сегмента).
Например, если файл маркера COMMIT записан segno = 1, id = 10
это означает, что все сообщения с идентификатором от 1 до 10 потребляются, поэтому после перезапуска он должен начать читать сообщения из сегмента-1, но пропустить первые 10 сообщений.
Коррупция
По разным причинам данные, записанные на диск, могут быть повреждены, что чаще всего происходит в таких сценариях, как отключение питания, сетевые помехи для подключенных к сети блочных устройств или в самой операционной системе не хватает оперативной памяти и т. д.
Обычно повреждения происходят в конце файла сегмента (например, исключение до того, как операционная система успела синхронизировать изменения с уровней кэшей на жесткий диск).
Чтобы обнаружить повреждение, элементы очереди записываются на диск вместе с 4-байтовым магическим числом и контрольной суммой (CRC32). При повторном открытии replayq
файлов сегментов (например, после перезапуска) выполняется полное сканирование для обнаружения повреждений.
В случае, если магическое число не найдено (в байтах 2-5) или не совпадает контрольная сумма полезной нагрузки, обнаруживается повреждение. Все сообщения, начиная с поврежденного, отбрасываются (файл обрезается).
Защита от перегрузки
Как и в любом другом решении для организации очередей, если входящая скорость выше исходящей, она в конечном итоге переполнится, независимо от того, насколько велика емкость очереди.
Чтобы защитить узлы EMQX от нехватки выделенной оперативной памяти или дискового пространства, существуют механизмы защиты системы путем отбрасывания самых старых сообщений.
Максимальное количество байтов на буфер
Для каждого буфера существует настраиваемый предел общего количества байтов, которые можно накапливать. По умолчанию 2 ГБ.
Стоит отметить одну вещь: размер каждого сообщения, подлежащего буферизации, является лишь оценкой, а также, чтобы не получить пустых сегментов, проверка ограничения размера выполняется только после того, как будет добавлено хотя бы одно сообщение.
После каждого добавления выполняется проверка общего количества байтов, и переполненная часть будет извлечена и удалена. Опять же, он всегда выдает хотя бы одно сообщение, так что байтов может быть больше, чем байтов переполнения.
Нам может понадобиться немного посчитать при настройке емкости каждого буфера, для этого есть простая формула.
TotalBytesLimit = TotalAllocatableBytes / NumberOfQueues
Например, при планировании производителя Kafka, использующего буферизацию в дисковом режиме для соединения данных с темой Kafka, имеющей k
разделов, если общее дисковое пространство, которое может быть выделено для буферизации, v
то пропускная способность каждой очереди должна быть v/k
.
Падение при высокой нагрузке
Конфигурация ограничения общего количества байтов — хороший вариант для защиты системы от чрезмерного потребления ОЗУ при работе в режиме «Только память». Кроме того, это хороший способ защитить его от слишком большого места на диске при работе в «Режиме диска». Однако ограничение распространяется на раздел Kafka, а это означает, что он требует тщательного расчета и планирования.
Более простой способ обеспечить защиту от перегрузки ОЗУ — автоматическая настройка ограничения.
EMQX предоставляет drop_if_highmem
вариант в 4.4, и memory_overload_protection
в конфигурации 5.0 для интеграции данных Kafka (по умолчанию отключено).
Если этот параметр включен, то после того, как система достигнет порога «высокой нагрузки», при добавлении в очередь из заголовка очереди будет извлечено и удалено как минимум такое же количество байтов. Точное количество байтов зависит от выравнивания границ элементов очереди.
Резюме
Внедрение ReplayQ в качестве буферного уровня в интеграцию данных EMQX Rule Engine может помочь повысить надежность EMQX, сделать конвейер данных более устойчивым к сбоям в сети и простоям целевого сервиса.
Первоначально опубликовано на https://www.emqx.com.