Как использовать MQTT в Flask
Колба представляет собой облегченную структуру веб-приложений, написанную на Python, которая называется «микрофреймворк», потому что она использует простое ядро для расширения других функций, таких как: ORM, инструменты проверки форм, загрузка файлов, различные методы открытой аутентификации и т. д.
MQTT — это облегченный протокол передачи сообщений Интернета вещей (IoT), основанный на режиме публикации/подписки. Он может обеспечить надежную службу сообщений в реальном времени для сетевых устройств с очень небольшим количеством кода и меньшей пропускной способностью. Он широко используется в IoT, мобильном Интернете, интеллектуальном оборудовании, IoV, энергетике и т. д.
В этой статье в основном рассказывается, как использовать MQTT в проекте Flask, а также реализовать подключение, подписку, обмен сообщениями, отмену подписки и другие функции между MQTT-клиент и MQTT-брокер.
Мы будем использовать Колба-MQTT клиентская библиотека, которая является расширением Flask и может рассматриваться как декоратор пахо-mqtt для упрощения интеграции MQTT в приложениях Flask.
Инициализация проекта
Этот проект разработан и протестирован с использованием Python 3.8, и пользователи могут использовать следующие команды для проверки версии Python.
$ python3 --version
Python 3.8.2
Используйте Pip для установки библиотеки Flask-MQTT.
pip3 install flask-mqtt
Используйте Flask-MQTT
Мы примем Бесплатный публичный MQTT-брокер предоставляется компанией EMQ, которая создана на основе Облачный сервис MQTT — Облако EMQX. Ниже представлена информация о доступе к серверу:
- Маклер:
broker.emqx.io
- TCP-порт: 1883
- Порт веб-сокета: 8083
Импорт Flask-MQTT
Импортируйте библиотеку Flask и расширение Flask-MQTT и создайте приложение Flask.
from flask import Flask, request, jsonify
from flask_mqtt import Mqtt
app = Flask( __name__ )
Настроить расширение Flask-MQTT
app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = '' # Set this item when you need to verify username and password
app.config['MQTT_PASSWORD'] = '' # Set this item when you need to verify username and password
app.config['MQTT_KEEPALIVE'] = 5 # Set KeepAlive time in seconds
app.config['MQTT_TLS_ENABLED'] = False # If your server supports TLS, set it True
topic="/flask/mqtt"
mqtt_client = Mqtt(app)
Полные элементы конфигурации см. Документ конфигурации Flask-MQTT.
Написать функцию обратного вызова Connect
Мы можем обрабатывать успешные или неудачные соединения MQTT в этой функции обратного вызова, и этот пример подпишется на /flask/mqtt
тему после успешного подключения.
@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
if rc == 0:
print('Connected successfully')
mqtt_client.subscribe(topic) # subscribe topic
else:
print('Bad connection. Code:', rc)
Функция обратного вызова записи сообщения
Эта функция распечатывает сообщения, полученные /flask/mqtt
тема.
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
print('Received message on topic: {topic} with payload: {payload}'.format(**data))
Создать API публикации сообщений
Мы создаем простой POST API для публикации сообщений MQTT.
На практике API может потребовать более сложной обработки бизнес-логики.
@app.route('/publish', methods=['POST'])
def publish_message():
request_data = request.get_json()
publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
return jsonify({'code': publish_result[0]})
Запустить фляжное приложение
При запуске приложения Flask клиент MQTT подключится к серверу и подпишется на тему /flask/mqtt
.
if __name__ == ' __main__':
app.run(host="127.0.0.1", port=5000)
Тест
Теперь мы используем Клиент MQTT — MQTT X для подключения, подписки и публикации тестов.
Получить сообщение
Создайте соединение в MQTT X и подключитесь к серверу MQTT.
Публиковать Hello from MQTT X
к /flask/mqtt
тема в MQTT X.
Мы увидим сообщение, отправленное MQTT X, в окне запуска Flask.
Опубликовать сообщение
Подписаться на /flask/mqtt
тема в MQTT X.
Используйте Postman для вызова /publish
API: Отправить сообщение Hello from Flask
к /flask/mqtt
тема.
Мы можем увидеть сообщение, отправленное из Flask в MQTT X.
Полный код
from flask import Flask, request, jsonify
from flask_mqtt import Mqtt
app = Flask( __name__ )
app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = '' # Set this item when you need to verify username and password
app.config['MQTT_PASSWORD'] = '' # Set this item when you need to verify username and password
app.config['MQTT_KEEPALIVE'] = 5 # Set KeepAlive time in seconds
app.config['MQTT_TLS_ENABLED'] = False # If your broker supports TLS, set it True
topic="/flask/mqtt"
mqtt_client = Mqtt(app)
@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
if rc == 0:
print('Connected successfully')
mqtt_client.subscribe(topic) # subscribe topic
else:
print('Bad connection. Code:', rc)
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
print('Received message on topic: {topic} with payload: {payload}'.format(**data))
@app.route('/publish', methods=['POST'])
def publish_message():
request_data = request.get_json()
publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
return jsonify({'code': publish_result[0]})
if __name__ == ' __main__':
app.run(host="127.0.0.1", port=5000)
Ограничения
Flask-MQTT в настоящее время не подходит для использования с несколькими рабочими экземплярами. Итак, если вы используете сервер WSGI, например вентиляция или оружейный рог убедитесь, что у вас есть только один рабочий экземпляр.
На данный момент мы завершили простой клиент MQTT с использованием Flask-MQTT и можем подписываться и публиковать сообщения в приложении Flask.
Первоначально опубликовано на emqx.com.