MQTT для Эликсира |

Недавно я прочитал прекрасную книгу Постройте метеостанцию ​​с эликсиром и нервами. Он представляет Elixir как инструмент для создания встраиваемых приложений.

С нервымы можем запускать код Elixir на сетевых устройствах, взаимодействующих с некоторым управляющим программным обеспечением и друг с другом.

Упомянутая выше книга в основном посвящена части «Нервы» и использует протокол HTTP для сетевых взаимодействий. Хотя это разумный выбор во многих ситуациях, я хочу представить еще один широко используемый вариант для производственных установок IoT: MQTT.

Протокол MQTT

MQTT — это протокол обмена сообщениями, разработанный специально для связи устройств (IoT). Он используется во многих областях, таких как банковское дело, нефтегазовая промышленность, производство и т. д.

Протокол MQTT имеет много преимуществ, но здесь я хочу упомянуть некоторые из них:

  • Это легкий двоичный протокол, обычно работающий по протоколу TCP/IP.
  • Он разработан для ненадежной сети, что делает его хорошим выбором для наружной установки.
  • Следует паб/саб модель, упрощающая клиентскую логику.

Мы продемонстрируем некоторые преимущества MQTT в нашей настройке.

MQTT-брокеры

Важной особенностью MQTT является то, что он упрощает логику клиента, что критично для встраиваемых устройств. Это достигается с помощью модели pubsub: в MQTT нет понятия «сервер». Вместо этого все участвующие объекты являются клиентами, подключающимися к так называемому маклер. Клиенты подписываться к темы, публиковать сообщения им, а брокер выполняет маршрутизацию (и многое другое).

Хороший готовый к работе брокер, например EMQX обычно предоставляет не только возможности маршрутизации MQTT, но и многие другие интересные функции, такие как

  • другие способы подключения, такие как WebSockets;
  • различные модели аутентификации и авторизации;
  • потоковая передача данных в базы данных;
  • настраиваемые правила маршрутизации на основе самоанализа сообщений;
  • и так далее.

Настройка датчика

Для простоты наше устройство будет представлено обычным приложением Mix: его можно легко преобразовать в приложение Nerves.

Сначала мы создаем проект Mix:

mix new --sup weather_sensor
cd weather_sensor

Для взаимодействия с MQTT-брокером нам нужен MQTT-клиент. Мы принимаем emqtt. Добавьте его в mix.exs как зависимость:

defp deps do
  [
    {:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]}
  ]
end

Весь код нашего «датчика» мы поместим в основной модуль WeatherSensor, поэтому нам нужно добавить его в lib/weather_sensor/application.ex супервизора приложения:

defmodule WeatherSensor.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      WeatherSensor
    ]

    opts = [strategy: :one_for_one, name: WeatherSensor.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Теперь давайте реализуем основной модуль в lib/weather_sensor.ex:

defmodule WeatherSensor do
  @moduledoc false

  use GenServer

  def start_link([]) do
    GenServer.start_link( __MODULE__ , [])
  end

  def init([]) do
    interval = Application.get_env(:weather_sensor, :interval)
    emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
    report_topic = "reports/#{emqtt_opts[:clientid]}/temperature"
    {:ok, pid} = :emqtt.start_link(emqtt_opts)
    st = %{
      interval: interval,
      timer: nil,
      report_topic: report_topic,
      pid: pid
    }

    {:ok, set_timer(st), {:continue, :start_emqtt}}
  end

  def handle_continue(:start_emqtt, %{pid: pid} = st) do
    {:ok, _} = :emqtt.connect(pid)

    emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
    clientid = emqtt_opts[:clientid]
    {:ok, _, _} = :emqtt.subscribe(pid, {"commands/#{clientid}/set_interval", 1})
    {:noreply, st}
  end

  def handle_info(:tick, %{report_topic: topic, pid: pid} = st) do
    report_temperature(pid, topic)
    {:noreply, set_timer(st)}
  end

  def handle_info({:publish, publish}, st) do
    handle_publish(parse_topic(publish), publish, st)
  end

  defp handle_publish(["commands", _, "set_interval"], %{payload: payload}, st) do
    new_st = %{st | interval: String.to_integer(payload)}
    {:noreply, set_timer(new_st)}
  end

  defp handle_publish(_, _, st) do
    {:noreply, st}
  end

  defp parse_topic(%{topic: topic}) do
    String.split(topic, "/", trim: true)
  end

  defp set_timer(st) do
    if st.timer do
      Process.cancel_timer(st.timer)
    end
    timer = Process.send_after(self(), :tick, st.interval)
    %{st | timer: timer}
  end

  defp report_temperature(pid, topic) do
    temperature = 10.0 + 2.0 * :rand.normal()
    message = {System.system_time(:millisecond), temperature}
    payload = :erlang.term_to_binary(message)
    :emqtt.publish(pid, topic, payload)
  end
end

И добавьте некоторые параметры в config/config.exs:

import Config

config :weather_sensor, :emqtt,
  host: '127.0.0.1',
  port: 1883,
  clientid: "weather_sensor",
  clean_start: false,
  name: :emqtt

config :weather_sensor, :interval, 1000

Подытожим немного, что происходит в WeatherSensor:

  • Он реализует поведение GenServer.
  • При запуске он
    • открывает соединение MQTT;
    • подписывается на тему commands/weather_sensor/set_interval для получения команд, полученные данные будут отправлены в процесс с помощью :emqtt в виде сообщений {:publish, publish}.
    • таймер расписания с заданным интервалом.
  • По истечении времени ожидания он публикует кортеж {Timestamp, Temperature} в reports/weather_sensor/temperaturetopic.
  • При получении сообщения из темы commands/weather_sensor/set_interval он обновляет интервал таймера.

Поскольку наше приложение не является настоящим приложением Nerves с подключенным датчиком, таким как BMP280, мы генерируем данные о температуре.

Здесь мы уже видим одно преимущество перед HTTP-взаимодействием: мы можем не только отправлять данные, но и получать некоторые команды в режиме реального времени.

Нам нужен работающий брокер для успешного запуска узла; мы начнем его позже.

Настройка приборной панели

Поскольку в MQTT нет «серверов», наша управляющая панель также будет клиентом MQTT. Но это будет подписываться в тему reports/weather_sensor/temperature и публиковать команды tocommands/weather_sensor/set_interval.

Для приборной панели мы настроим приложение Phoenix LiveView.

Давайте создадим его:

mix phx.new --version
Phoenix installer v1.6.2
mix phx.new weather_dashboard --no-ecto --no-gettext --no-dashboard --live
cd weather_dashboard

Добавьте зависимости в mix.exs

defp deps do
    [
      ...
      {:jason, "~> 1.2"},
      {:plug_cowboy, "~> 2.5"},

      {:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]},
      {:contex, github: "mindok/contex"} # We will need this for SVG charts
    ]
  end

Добавьте некоторые настройки в config/dev.exs:

config :weather_dashboard, :emqtt,
  host: '127.0.0.1',
  port: 1883

config :weather_dashboard, :sensor_id, "weather_sensor"

# Period for chart
config :weather_dashboard, :timespan, 60

Теперь генерируем контроллер LiveView:

mix phx.gen.live Measurements Temperature temperatures --no-schema --no-context

Это создаст много файлов, но нам понадобятся только некоторые из них, так как нам нужно одностраничное приложение с диаграммой.

rm lib/weather_dashboard_web/live/temperature_live/form_component.*
rm lib/weather_dashboard_web/live/temperature_live/show.*
rm lib/weather_dashboard_web/live/live_helpers.ex

Также удалите импорт WeatherDashboardWeb.LiveHelpers из lib/weather_dashboard_web.ex.

Шаблон обновления для нашей страницы (lib/weather_dashboard_web/live/temperature_live/index.html.heex):

<div>
  <%= if @plot do %>
    <%= @plot %>
  <% end %>
</div>

<div>
  <form phx-submit="set-interval">
    <label for="interval">Interval</label>
    <input type="text" name="interval" value={@interval}/>
    <input type="submit" value="Set interval"/>
  </form>
</div>

На этой странице у нас есть диаграмма и элемент управления вводом для отправки команд на наше «устройство».

Теперь обновите основную часть, контроллер LiveView (lib/weather_dashboard_web/live/temperature_live/index.ex):

defmodule WeatherDashboardWeb.TemperatureLive.Index do
  use WeatherDashboardWeb, :live_view

  require Logger

  @impl true
  def mount(_params, _session, socket) do
    reports = []
    emqtt_opts = Application.get_env(:weather_dashboard, :emqtt)
    {:ok, pid} = :emqtt.start_link(emqtt_opts)
    {:ok, _} = :emqtt.connect(pid)
    # Listen reports
    {:ok, _, _} = :emqtt.subscribe(pid, "reports/#")
    {:ok, assign(socket,
      reports: reports,
      pid: pid,
      plot: nil,
      interval: nil
    )}
  end

  @impl true
  def handle_params(_params, _url, socket) do
    {:noreply, socket}
  end

  @impl true
  def handle_event("set-interval", %{"interval" => interval_s}, socket) do
    case Integer.parse(interval_s) do
      {interval, ""} ->
        id = Application.get_env(:weather_dashboard, :sensor_id)
        # Send command to device
        topic = "commands/#{id}/set_interval"
        :ok = :emqtt.publish(
          socket.assigns[:pid],
          topic,
          interval_s,
          retain: true
        )
        {:noreply, assign(socket, interval: interval)}
      _ ->
        {:noreply, socket}
    end
  end

  def handle_event(name, data, socket) do
    Logger.info("handle_event: #{inspect([name, data])}")
    {:noreply, socket}
  end

  @impl true
  def handle_info({:publish, packet}, socket) do
    handle_publish(parse_topic(packet), packet, socket)
  end

  defp handle_publish(["reports", id, "temperature"], %{payload: payload}, socket) do
    if id == Application.get_env(:weather_dashboard, :sensor_id) do
      report = :erlang.binary_to_term(payload)
      {reports, plot} = update_reports(report, socket)
      {:noreply, assign(socket, reports: reports, plot: plot)}
    else
      {:noreply, socket}
    end
  end

  defp update_reports({ts, val}, socket) do
    new_report = {DateTime.from_unix!(ts, :millisecond), val}
    now = DateTime.utc_now()
    deadline = DateTime.add(DateTime.utc_now(), - 2 * Application.get_env(:weather_dashboard, :timespan), :second)
    reports =
      [new_report | socket.assigns[:reports]]
      |> Enum.filter(fn {dt, _} -> DateTime.compare(dt, deadline) == :gt end)
      |> Enum.sort()

    {reports, plot(reports, deadline, now)}
  end

  defp parse_topic(%{topic: topic}) do
    String.split(topic, "/", trim: true)
  end

  defp plot(reports, deadline, now) do
    x_scale =
      Contex.TimeScale.new()
      |> Contex.TimeScale.domain(deadline, now)
      |> Contex.TimeScale.interval_count(10)

    y_scale =
      Contex.ContinuousLinearScale.new()
      |> Contex.ContinuousLinearScale.domain(0, 30)

    options = [
      smoothed: false,
      custom_x_scale: x_scale,
      custom_y_scale: y_scale,
      custom_x_formatter: &x_formatter/1,
      axis_label_rotation: 45
    ]

    reports
    |> Enum.map(fn {dt, val} -> [dt, val] end)
    |> Contex.Dataset.new()
    |> Contex.Plot.new(Contex.LinePlot, 600, 250, options)
    |> Contex.Plot.to_svg()
  end

  defp x_formatter(datetime) do
    datetime
    |> Calendar.strftime("%H:%M:%S")
  end

end

Есть некоторые вещи, которые следует отметить.

  • Мы создали обработчик LiveView для обслуживания главной страницы нашего приложения.
  • Обычно Phoenix.PubSub используется для обновления состояния процесса LiveView. Но вместо этого мы делаем трюк: поскольку MQTT-брокер уже предоставляет модель pubsub, мы подключаемся к ней напрямую из нашего процесса LiveView.
  • При получении новых данных о температуре сервер обновляет температурный график.
  • При получении обновления формы от пользователя мы отправляем обновленный интервал в тему команды.

Наконец, настройте маршрутизацию в lib/weather_dashboard_web/router.ex, чтобы наш контроллер обрабатывал корневую страницу:

scope "/", WeatherDashboardWeb do
    pipe_through :browser

    live "/", TemperatureLive.Index
  end

Связывание частей вместе

Теперь мы готовы все настроить и запустить.

Мы запускаем брокера MQTT. Поскольку нам не нужны какие-либо конкретные настройки, самый простой способ — запустить брокер с докером.

docker run -d --name emqx -p 1883:1883 emqx/emqx:4.3.10

Теперь запускаем наше «устройство»:

cd weather_sensor
export BUILD_WITHOUT_QUIC=1
iex -S mix
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]

....

13:17:24.461 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/commands/weather_sensor/set_interval", %{nl: 0, qos: 1, rap: 0, rh: 0}}]}, :undefined}

13:17:24.463 [debug] emqtt(weather_sensor): RECV Data: <<144, 3, 0, 2, 1>>

13:17:25.427 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 179, 156, 178, 158, 125, 1, 70, 64, 38, 106, 91, 64, 234, 212, 185>>}

13:17:26.428 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 156, 160, 178, 158, 125, 1, 70, 64, 39, 115, 221, 187, 144, 192, 31>>}
...

Мы видим, что наш датчик сразу начал отправлять отчеты.

Теперь запустим наш дашборд:

cd weather_dashboard
export BUILD_WITHOUT_QUIC=1
iex -S mix phx.server
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]

[info] Running WeatherDashboardWeb.Endpoint with cowboy 2.9.0 at 127.0.0.1:4000 (http)
[info] Access WeatherDashboardWeb.Endpoint at 
Interactive Elixir (1.12.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> [watch] build finished, watching for changes...

Давайте перейдем к .

Мы видим, что соответствующий процесс LiveView смонтировался, подключился к брокеру и начал получать данные о температуре:

[info] GET /
[info] Sent 200 in 145ms
[info] CONNECTED TO Phoenix.LiveView.Socket in 129µs
  Transport: :websocket
  Serializer: Phoenix.Socket.V2.JSONSerializer
  Parameters: %{"_csrf_token" => "cwoROxAwKFo7NEcSdgMwFlgaZ1AlBxUa6FIRhAbjHA6XORIF-EUiIRqU", "_mounts" => "0", "_track_static" => %{"0" => "/assets/app.css", "1" => "/assets/app.js"}, "vsn" => "2.0.0"}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 1, false, 0, false}, {:mqtt_packet_connect, "MQTT", 4, false, true, false, 0, false, 60, %{}, "emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130", :undefined, :undefined, :undefined, :undefined, :undefined}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<32, 2, 0, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/reports/#", %{nl: 0, qos: 0, rap: 0, rh: 0}}]}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<144, 3, 0, 2, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<48, 58, 0, 35, 47, 114, 101, 112, 111, 114, 116, 115, 47, 119, 101, 97, 116,
  104, 101, 114, 95, 115, 101, 110, 115, 111, 114, 47, 116, 101, 109, 112, 101,
  114, 97, 116, 117, 114, 101, 131, 104, 2, 110, 6, 0, 180, 251, 188, 158, 125,
...

Также страница сразу начала обновляться:

Феникс 1

Если мы обновим интервал, то увидим, что узел устройства получает команду сразу и начинает обновляться чаще:

Феникс 2

Теперь продемонстрируем одну важную вещь: давайте остановим наш узел «устройство», немного подождем и запустим его снова. Мы видим, что нода продолжала отправлять данные с обновленной периодичностью.

Феникс 3

Как это могло случиться? Секрет прост: флаг сохранения командных сообщений мы отправляем в командную тему.

:ok = :emqtt.publish(
  socket.assigns[:pid],
  topic,
  interval_s,
  retain: true
)

Когда мы отправляем сообщение с флагом сохранения в тему, это сообщение также становится сообщением «по умолчанию». Брокер хранит его, и каждый подписчик темы получает это сообщение при подписке.

Эта функция имеет важное значение для встроенных устройств, которые могут часто отключаться от сети и не имеют простого в использовании локального хранилища для сохранения своего состояния. Это способ правильно настроить их при подключении.

Вывод

В этой статье мы

  • продемонстрировал популярный способ взаимодействия со встроенными устройствами — протокол MQTT;
  • мы представили его использование в Эликсире;
  • мы также продемонстрировали некоторые преимущества MQTT, такие как модель pubsub и сохранение сообщений.

Другие мощные функции, которые мы могли бы захотеть использовать даже в простой настройке:

  • потоковая передача данных темы в базу данных, чтобы мы могли отображать историю при подключении без «ручного» сохранения;
  • с использованием MQTT.js подключаться к брокеру напрямую из фронтенда через WebSockets.

Весь код доступен на .

Первоначально опубликовано на

Похожие записи

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *