Как использовать MQTT в Rust
Rust — мультипарадигмальный язык программирования, разработанный для повышения производительности и безопасности, особенно для безопасного параллелизма. Rust синтаксически похож на C++, но может гарантировать безопасность памяти, используя проверку заимствования для проверки ссылок. Rust обеспечивает безопасность памяти без сборки мусора, а подсчет ссылок является необязательным.
MQTT является своего рода облегченный протокол обмена сообщениями IoT на основе модели публикации/подписки. Он может использовать очень мало кода и полосы пропускания для обеспечения надежной службы сообщений в реальном времени для сетевого оборудования. Кроме того, он широко используется в IoT, мобильном Интернете, интеллектуальном оборудовании, IoV, электроэнергетике.
Эта статья в основном знакомит с тем, как использовать пахо-mqtt клиентскую библиотеку в проекте Rust, а также как реализовать подключение, подписку, обмен сообщениями и отмену подписки и т. д. между клиентом и MQTT-брокером.
Инициализация проекта
Этот проект использует Rust 1.44.0 для разработки и тестирования и управляется с помощью инструмента управления пакетами Cargo 1.44.0, и читатель может проверить текущую версию Rust с помощью следующей команды.
~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)
Выбор клиентской библиотеки MQTT
paho-mqtt — самый универсальный и широко используемый клиент MQTT в текущей версии Rust. Последняя версия 0.7.1
поддерживает MQTT v5, 3.1.1, 3.1, а также поддерживает передачу данных по стандартным протоколам TCP, SSL/TLS, WebSockets и поддерживает QoS 0, 1, 2 и т. д.
Инициализация проекта
Выполните следующую команду, чтобы создать новый проект Rust с именем mqtt-example
.
~ cargo new mqtt-example
Created binary (application) `mqtt-example` package
Изменить Cargo.toml
файл в проекте и добавить адрес paho-mqtt
библиотека в dependencies
и укажите двоичный файл, соответствующий файлу кода подписки, публикации.
[dependencies]
paho-mqtt = { git = " branch = "master" }
[[bin]]
name = "sub"
path = "src/sub/main.rs"
[[bin]]
name = "pub"
path = "src/pub/main.rs"
Использование Rust MQTT
Создать клиентское соединение
В этой статье будет использоваться бесплатный публичный MQTT-брокер который предоставляется EMQX в качестве брокера MQTT тестового соединения. Эта услуга основана на EMQX Облачная платформа Интернета вещей MQTT создавать. Информация о доступе к серверу выглядит следующим образом:
- Маклер: Broker.emqx.io
- TCP-порт: 1883 г.
- Порт веб-сокета: 8083
Настройте параметры подключения MQTT Broker
Настройте адрес подключения MQTT Broker (включая порт), тему (здесь мы настроили две темы) и идентификатор клиента.
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
Написать код подключения MQTT
Напишите код подключения MQTT, а адрес подключения можно передать в качестве аргумента командной строки при выполнении двоичного файла, чтобы улучшить взаимодействие с пользователем. Обычно нам нужно создать клиента, а затем подключить его к broker.emqx.io
.
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
Публикация сообщений
Здесь мы публикуем всего пять сообщений в двух темах rust/mqtt
а также rust/test
в зависимости от четности цикла.
for num in 0..5 {
let content = "Hello world! ".to_string() + &num.to_string();
let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
if num % 2 == 0 {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
} else {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
}
let tok = cli.publish(msg);
if let Err(e) = tok {
println!("Error sending message: {:?}", e);
break;
}
}
Перед подключением клиента потребитель должен быть инициализирован. Здесь мы зацикливаем обработку очереди сообщений в потребителе и распечатываем подписанное имя темы и содержимое полученных сообщений.
fn subscribe_topics(cli: &mqtt::Client) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}
fn main() {
...
// Initialize the consumer before connecting.
let rx = cli.start_consuming();
...
// Subscribe topics.
subscribe_topics(&cli);
println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
}
else if !cli.is_connected() {
if try_reconnect(&cli) {
println!("Resubscribe topics...");
subscribe_topics(&cli);
} else {
break;
}
}
}
...
}
Полный код
Код для публикации сообщений
use std::{
env,
process,
time::Duration
};
extern crate paho_mqtt as mqtt;
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;
fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
// Create a message and publish it.
// Publish message to 'test' and 'hello' topics.
for num in 0..5 {
let content = "Hello world! ".to_string() + &num.to_string();
let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
if num % 2 == 0 {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
} else {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
}
let tok = cli.publish(msg);
if let Err(e) = tok {
println!("Error sending message: {:?}", e);
break;
}
}
// Disconnect from the broker.
let tok = cli.disconnect(None);
println!("Disconnect from the broker");
tok.unwrap();
}
Чтобы улучшить взаимодействие с пользователем, подписки на сообщения отключаются, а темы повторно подписываются после повторного установления соединения.
use std::{
env,
process,
thread,
time::Duration
};
extern crate paho_mqtt as mqtt;
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];
// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{
println!("Connection lost. Waiting to retry connection");
for _ in 0..12 {
thread::sleep(Duration::from_millis(5000));
if cli.reconnect().is_ok() {
println!("Successfully reconnected");
return true;
}
}
println!("Unable to reconnect after several attempts.");
false
}
// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}
fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Initialize the consumer before connecting.
let rx = cli.start_consuming();
// Define the set of options for the connection.
let lwt = mqtt::MessageBuilder::new()
.topic("test")
.payload("Consumer lost connection")
.finalize();
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(false)
.will_message(lwt)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
// Subscribe topics.
subscribe_topics(&cli);
println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
}
else if !cli.is_connected() {
if try_reconnect(&cli) {
println!("Resubscribe topics...");
subscribe_topics(&cli);
} else {
break;
}
}
}
// If still connected, then disconnect now.
if cli.is_connected() {
println!("Disconnecting");
cli.unsubscribe_many(DFLT_TOPICS).unwrap();
cli.disconnect(None).unwrap();
}
println!("Exiting");
}
Запуск и тестирование
Скомпилируйте бинарные файлы
Следующая команда генерирует sub
, pub
бинарный файл в mqtt-example/target/debug
каталог.
cargo build
Выполнить sub
бинарный файл и дождитесь публикации сообщения.
Публикация сообщений
Выполнение pub
бинарный файл, вы можете видеть, что сообщения были опубликованы в темах rust/test
а также rust/mqtt
соответственно.
При этом опубликованные сообщения также отображаются в подписке на сообщения.
На данный момент мы завершили использование пахо-mqtt клиент для подключения к публичный MQTT-брокера также реализовано соединение, публикация сообщений и подписка между тестовым клиентом и MQTT-брокером.
Первоначально опубликовано на