MQTT для Эликсира |
Недавно я прочитал прекрасную книгу Постройте метеостанцию с эликсиром и нервами. Он представляет Elixir как инструмент для создания встраиваемых приложений.
С нервымы можем запускать код Elixir на сетевых устройствах, взаимодействующих с некоторым управляющим программным обеспечением и друг с другом.
Упомянутая выше книга в основном посвящена части «Нервы» и использует протокол HTTP для сетевых взаимодействий. Хотя это разумный выбор во многих ситуациях, я хочу представить еще один широко используемый вариант для производственных установок IoT: MQTT.
Протокол MQTT
MQTT — это протокол обмена сообщениями, разработанный специально для связи устройств (IoT). Он используется во многих областях, таких как банковское дело, нефтегазовая промышленность, производство и т. д.
Протокол MQTT имеет много преимуществ, но здесь я хочу упомянуть некоторые из них:
- Это легкий двоичный протокол, обычно работающий по протоколу TCP/IP.
- Он разработан для ненадежной сети, что делает его хорошим выбором для наружной установки.
- Следует паб/саб модель, упрощающая клиентскую логику.
Мы продемонстрируем некоторые преимущества MQTT в нашей настройке.
MQTT-брокеры
Важной особенностью MQTT является то, что он упрощает логику клиента, что критично для встраиваемых устройств. Это достигается с помощью модели pubsub: в MQTT нет понятия «сервер». Вместо этого все участвующие объекты являются клиентами, подключающимися к так называемому маклер. Клиенты подписываться к темы, публиковать сообщения им, а брокер выполняет маршрутизацию (и многое другое).
Хороший готовый к работе брокер, например EMQX обычно предоставляет не только возможности маршрутизации MQTT, но и многие другие интересные функции, такие как
- другие способы подключения, такие как WebSockets;
- различные модели аутентификации и авторизации;
- потоковая передача данных в базы данных;
- настраиваемые правила маршрутизации на основе самоанализа сообщений;
- и так далее.
Настройка датчика
Для простоты наше устройство будет представлено обычным приложением Mix: его можно легко преобразовать в приложение Nerves.
Сначала мы создаем проект Mix:
mix new --sup weather_sensor
cd weather_sensor
Для взаимодействия с MQTT-брокером нам нужен MQTT-клиент. Мы принимаем emqtt. Добавьте его в mix.exs как зависимость:
defp deps do
[
{:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]}
]
end
Весь код нашего «датчика» мы поместим в основной модуль WeatherSensor, поэтому нам нужно добавить его в lib/weather_sensor/application.ex супервизора приложения:
defmodule WeatherSensor.Application do
@moduledoc false
use Application
@impl true
def start(_type, _args) do
children = [
WeatherSensor
]
opts = [strategy: :one_for_one, name: WeatherSensor.Supervisor]
Supervisor.start_link(children, opts)
end
end
Теперь давайте реализуем основной модуль в lib/weather_sensor.ex:
defmodule WeatherSensor do
@moduledoc false
use GenServer
def start_link([]) do
GenServer.start_link( __MODULE__ , [])
end
def init([]) do
interval = Application.get_env(:weather_sensor, :interval)
emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
report_topic = "reports/#{emqtt_opts[:clientid]}/temperature"
{:ok, pid} = :emqtt.start_link(emqtt_opts)
st = %{
interval: interval,
timer: nil,
report_topic: report_topic,
pid: pid
}
{:ok, set_timer(st), {:continue, :start_emqtt}}
end
def handle_continue(:start_emqtt, %{pid: pid} = st) do
{:ok, _} = :emqtt.connect(pid)
emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
clientid = emqtt_opts[:clientid]
{:ok, _, _} = :emqtt.subscribe(pid, {"commands/#{clientid}/set_interval", 1})
{:noreply, st}
end
def handle_info(:tick, %{report_topic: topic, pid: pid} = st) do
report_temperature(pid, topic)
{:noreply, set_timer(st)}
end
def handle_info({:publish, publish}, st) do
handle_publish(parse_topic(publish), publish, st)
end
defp handle_publish(["commands", _, "set_interval"], %{payload: payload}, st) do
new_st = %{st | interval: String.to_integer(payload)}
{:noreply, set_timer(new_st)}
end
defp handle_publish(_, _, st) do
{:noreply, st}
end
defp parse_topic(%{topic: topic}) do
String.split(topic, "/", trim: true)
end
defp set_timer(st) do
if st.timer do
Process.cancel_timer(st.timer)
end
timer = Process.send_after(self(), :tick, st.interval)
%{st | timer: timer}
end
defp report_temperature(pid, topic) do
temperature = 10.0 + 2.0 * :rand.normal()
message = {System.system_time(:millisecond), temperature}
payload = :erlang.term_to_binary(message)
:emqtt.publish(pid, topic, payload)
end
end
И добавьте некоторые параметры в config/config.exs:
import Config
config :weather_sensor, :emqtt,
host: '127.0.0.1',
port: 1883,
clientid: "weather_sensor",
clean_start: false,
name: :emqtt
config :weather_sensor, :interval, 1000
Подытожим немного, что происходит в WeatherSensor:
- Он реализует поведение GenServer.
- При запуске он
- открывает соединение MQTT;
- подписывается на тему commands/weather_sensor/set_interval для получения команд, полученные данные будут отправлены в процесс с помощью :emqtt в виде сообщений {:publish, publish}.
- таймер расписания с заданным интервалом.
- По истечении времени ожидания он публикует кортеж {Timestamp, Temperature} в reports/weather_sensor/temperaturetopic.
- При получении сообщения из темы commands/weather_sensor/set_interval он обновляет интервал таймера.
Поскольку наше приложение не является настоящим приложением Nerves с подключенным датчиком, таким как BMP280, мы генерируем данные о температуре.
Здесь мы уже видим одно преимущество перед HTTP-взаимодействием: мы можем не только отправлять данные, но и получать некоторые команды в режиме реального времени.
Нам нужен работающий брокер для успешного запуска узла; мы начнем его позже.
Настройка приборной панели
Поскольку в MQTT нет «серверов», наша управляющая панель также будет клиентом MQTT. Но это будет подписываться в тему reports/weather_sensor/temperature и публиковать команды tocommands/weather_sensor/set_interval.
Для приборной панели мы настроим приложение Phoenix LiveView.
Давайте создадим его:
mix phx.new --version
Phoenix installer v1.6.2
mix phx.new weather_dashboard --no-ecto --no-gettext --no-dashboard --live
cd weather_dashboard
Добавьте зависимости в mix.exs
defp deps do
[
...
{:jason, "~> 1.2"},
{:plug_cowboy, "~> 2.5"},
{:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]},
{:contex, github: "mindok/contex"} # We will need this for SVG charts
]
end
Добавьте некоторые настройки в config/dev.exs:
config :weather_dashboard, :emqtt,
host: '127.0.0.1',
port: 1883
config :weather_dashboard, :sensor_id, "weather_sensor"
# Period for chart
config :weather_dashboard, :timespan, 60
Теперь генерируем контроллер LiveView:
mix phx.gen.live Measurements Temperature temperatures --no-schema --no-context
Это создаст много файлов, но нам понадобятся только некоторые из них, так как нам нужно одностраничное приложение с диаграммой.
rm lib/weather_dashboard_web/live/temperature_live/form_component.*
rm lib/weather_dashboard_web/live/temperature_live/show.*
rm lib/weather_dashboard_web/live/live_helpers.ex
Также удалите импорт WeatherDashboardWeb.LiveHelpers из lib/weather_dashboard_web.ex.
Шаблон обновления для нашей страницы (lib/weather_dashboard_web/live/temperature_live/index.html.heex):
<div>
<%= if @plot do %>
<%= @plot %>
<% end %>
</div>
<div>
<form phx-submit="set-interval">
<label for="interval">Interval</label>
<input type="text" name="interval" value={@interval}/>
<input type="submit" value="Set interval"/>
</form>
</div>
На этой странице у нас есть диаграмма и элемент управления вводом для отправки команд на наше «устройство».
Теперь обновите основную часть, контроллер LiveView (lib/weather_dashboard_web/live/temperature_live/index.ex):
defmodule WeatherDashboardWeb.TemperatureLive.Index do
use WeatherDashboardWeb, :live_view
require Logger
@impl true
def mount(_params, _session, socket) do
reports = []
emqtt_opts = Application.get_env(:weather_dashboard, :emqtt)
{:ok, pid} = :emqtt.start_link(emqtt_opts)
{:ok, _} = :emqtt.connect(pid)
# Listen reports
{:ok, _, _} = :emqtt.subscribe(pid, "reports/#")
{:ok, assign(socket,
reports: reports,
pid: pid,
plot: nil,
interval: nil
)}
end
@impl true
def handle_params(_params, _url, socket) do
{:noreply, socket}
end
@impl true
def handle_event("set-interval", %{"interval" => interval_s}, socket) do
case Integer.parse(interval_s) do
{interval, ""} ->
id = Application.get_env(:weather_dashboard, :sensor_id)
# Send command to device
topic = "commands/#{id}/set_interval"
:ok = :emqtt.publish(
socket.assigns[:pid],
topic,
interval_s,
retain: true
)
{:noreply, assign(socket, interval: interval)}
_ ->
{:noreply, socket}
end
end
def handle_event(name, data, socket) do
Logger.info("handle_event: #{inspect([name, data])}")
{:noreply, socket}
end
@impl true
def handle_info({:publish, packet}, socket) do
handle_publish(parse_topic(packet), packet, socket)
end
defp handle_publish(["reports", id, "temperature"], %{payload: payload}, socket) do
if id == Application.get_env(:weather_dashboard, :sensor_id) do
report = :erlang.binary_to_term(payload)
{reports, plot} = update_reports(report, socket)
{:noreply, assign(socket, reports: reports, plot: plot)}
else
{:noreply, socket}
end
end
defp update_reports({ts, val}, socket) do
new_report = {DateTime.from_unix!(ts, :millisecond), val}
now = DateTime.utc_now()
deadline = DateTime.add(DateTime.utc_now(), - 2 * Application.get_env(:weather_dashboard, :timespan), :second)
reports =
[new_report | socket.assigns[:reports]]
|> Enum.filter(fn {dt, _} -> DateTime.compare(dt, deadline) == :gt end)
|> Enum.sort()
{reports, plot(reports, deadline, now)}
end
defp parse_topic(%{topic: topic}) do
String.split(topic, "/", trim: true)
end
defp plot(reports, deadline, now) do
x_scale =
Contex.TimeScale.new()
|> Contex.TimeScale.domain(deadline, now)
|> Contex.TimeScale.interval_count(10)
y_scale =
Contex.ContinuousLinearScale.new()
|> Contex.ContinuousLinearScale.domain(0, 30)
options = [
smoothed: false,
custom_x_scale: x_scale,
custom_y_scale: y_scale,
custom_x_formatter: &x_formatter/1,
axis_label_rotation: 45
]
reports
|> Enum.map(fn {dt, val} -> [dt, val] end)
|> Contex.Dataset.new()
|> Contex.Plot.new(Contex.LinePlot, 600, 250, options)
|> Contex.Plot.to_svg()
end
defp x_formatter(datetime) do
datetime
|> Calendar.strftime("%H:%M:%S")
end
end
Есть некоторые вещи, которые следует отметить.
- Мы создали обработчик LiveView для обслуживания главной страницы нашего приложения.
- Обычно Phoenix.PubSub используется для обновления состояния процесса LiveView. Но вместо этого мы делаем трюк: поскольку MQTT-брокер уже предоставляет модель pubsub, мы подключаемся к ней напрямую из нашего процесса LiveView.
- При получении новых данных о температуре сервер обновляет температурный график.
- При получении обновления формы от пользователя мы отправляем обновленный интервал в тему команды.
Наконец, настройте маршрутизацию в lib/weather_dashboard_web/router.ex, чтобы наш контроллер обрабатывал корневую страницу:
scope "/", WeatherDashboardWeb do
pipe_through :browser
live "/", TemperatureLive.Index
end
Связывание частей вместе
Теперь мы готовы все настроить и запустить.
Мы запускаем брокера MQTT. Поскольку нам не нужны какие-либо конкретные настройки, самый простой способ — запустить брокер с докером.
docker run -d --name emqx -p 1883:1883 emqx/emqx:4.3.10
Теперь запускаем наше «устройство»:
cd weather_sensor
export BUILD_WITHOUT_QUIC=1
iex -S mix
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]
....
13:17:24.461 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/commands/weather_sensor/set_interval", %{nl: 0, qos: 1, rap: 0, rh: 0}}]}, :undefined}
13:17:24.463 [debug] emqtt(weather_sensor): RECV Data: <<144, 3, 0, 2, 1>>
13:17:25.427 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 179, 156, 178, 158, 125, 1, 70, 64, 38, 106, 91, 64, 234, 212, 185>>}
13:17:26.428 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 156, 160, 178, 158, 125, 1, 70, 64, 39, 115, 221, 187, 144, 192, 31>>}
...
Мы видим, что наш датчик сразу начал отправлять отчеты.
Теперь запустим наш дашборд:
cd weather_dashboard
export BUILD_WITHOUT_QUIC=1
iex -S mix phx.server
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]
[info] Running WeatherDashboardWeb.Endpoint with cowboy 2.9.0 at 127.0.0.1:4000 (http)
[info] Access WeatherDashboardWeb.Endpoint at
Interactive Elixir (1.12.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> [watch] build finished, watching for changes...
Мы видим, что соответствующий процесс LiveView смонтировался, подключился к брокеру и начал получать данные о температуре:
[info] GET /
[info] Sent 200 in 145ms
[info] CONNECTED TO Phoenix.LiveView.Socket in 129µs
Transport: :websocket
Serializer: Phoenix.Socket.V2.JSONSerializer
Parameters: %{"_csrf_token" => "cwoROxAwKFo7NEcSdgMwFlgaZ1AlBxUa6FIRhAbjHA6XORIF-EUiIRqU", "_mounts" => "0", "_track_static" => %{"0" => "/assets/app.css", "1" => "/assets/app.js"}, "vsn" => "2.0.0"}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 1, false, 0, false}, {:mqtt_packet_connect, "MQTT", 4, false, true, false, 0, false, 60, %{}, "emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130", :undefined, :undefined, :undefined, :undefined, :undefined}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<32, 2, 0, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/reports/#", %{nl: 0, qos: 0, rap: 0, rh: 0}}]}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<144, 3, 0, 2, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<48, 58, 0, 35, 47, 114, 101, 112, 111, 114, 116, 115, 47, 119, 101, 97, 116,
104, 101, 114, 95, 115, 101, 110, 115, 111, 114, 47, 116, 101, 109, 112, 101,
114, 97, 116, 117, 114, 101, 131, 104, 2, 110, 6, 0, 180, 251, 188, 158, 125,
...
Также страница сразу начала обновляться:
Если мы обновим интервал, то увидим, что узел устройства получает команду сразу и начинает обновляться чаще:
Теперь продемонстрируем одну важную вещь: давайте остановим наш узел «устройство», немного подождем и запустим его снова. Мы видим, что нода продолжала отправлять данные с обновленной периодичностью.
Как это могло случиться? Секрет прост: флаг сохранения командных сообщений мы отправляем в командную тему.
:ok = :emqtt.publish(
socket.assigns[:pid],
topic,
interval_s,
retain: true
)
Когда мы отправляем сообщение с флагом сохранения в тему, это сообщение также становится сообщением «по умолчанию». Брокер хранит его, и каждый подписчик темы получает это сообщение при подписке.
Эта функция имеет важное значение для встроенных устройств, которые могут часто отключаться от сети и не имеют простого в использовании локального хранилища для сохранения своего состояния. Это способ правильно настроить их при подключении.
Вывод
В этой статье мы
- продемонстрировал популярный способ взаимодействия со встроенными устройствами — протокол MQTT;
- мы представили его использование в Эликсире;
- мы также продемонстрировали некоторые преимущества MQTT, такие как модель pubsub и сохранение сообщений.
Другие мощные функции, которые мы могли бы захотеть использовать даже в простой настройке:
- потоковая передача данных темы в базу данных, чтобы мы могли отображать историю при подключении без «ручного» сохранения;
- с использованием MQTT.js подключаться к брокеру напрямую из фронтенда через WebSockets.
Первоначально опубликовано на