MQTTで通信しよう【Node.js, Python】
はじめに
以前の記事にて、HTTPリクエストを使ってWebサーバーと通信を行う方法を紹介しました。
今回はMQTTを使って通信を行う方法を紹介します。
MQTTとは
MQTT(Message Queue Telemetry Transport)とは、通信プロトコルの一種で、
HTTPと比較し、データ量を少なくでき、1対多の通信を非同期で行うことができるのが特徴です。
データ量が少ないことで、ネットワークやデバイスへの負荷が軽くなるためIoTの分野で注目されています。
MQTTで通信をするために、Publisher(送信者)、Subscriber(受信者)の他にMQTTブローカーが必要になります。
MQTTブローカーはPublisherからのメッセージをSubscriberに渡す役割の中継サーバーです。
Topicというメッセージの送受信先の設定ができる仕組みがあり、
Publisherはメッセージの送信時にTopicを指定して送ることができます。
Subscriberは受信したいTopicを指定し、MQTTブローカーに登録することで、MQTTブローカーに指定したTopicが送られてきた時にSubscriberにデータが送られます。
MQTTブローカーを準備
まずは、MQTTブローカーを準備します。
今回はNode上で動くMQTTブローカーを作成します。
最初にNodeサーバーを以下のコマンドで作成します。(参考)
express mqtt_test --view=pug
cd mqtt_test
npm install
次にMQTTブローカーを動かすためのソフトウェアを入れます。
npm install aedes
npm install net
MQTTブローカーのプログラムを作成します。
今回は「broker.js」という名前で作成し、ファイルを以下のように書き換えます。
// モジュールの定義
const aedes = require('aedes')()
const server = require('net').createServer(aedes.handle)
//MQTTブローカーのポート
const port = 1883
// クライアントエラーの場合
aedes.on('clientError', function (client, err) {
console.log('client error', client.id, err.message, err.stack)
})
// 接続エラーの場合
aedes.on('connectionError', function (client, err) {
console.log('client error', client, err.message, err.stack)
})
// publishされた場合
aedes.on('publish', function (packet, client) {
if (client) {
console.log('message from client', client.id)
}
})
// 新しいsubscriberが接続した場合
aedes.on('subscribe', function (subscriptions, client) {
if (client) {
console.log('subscribe from client', subscriptions, client.id)
}
})
// 新しいクライアントが接続した場合
aedes.on('client', function (client) {
console.log('new client', client.id)
})
// MQTTブローカー起動
server.listen(port, function () {
console.log('server listening on port', port)
})
Subscriber(受信者)を準備
次にSubscriberを準備します。
今回はSubscriberを
まず、MQTTを使って通信するためのソフトウェアを入れます。
npm install mqtt
次にSubscriberのプログラムを作成します。
「mqtt_test\routes\index.js」ファイルを以下のように書き換えます。
var express = require('express');
var router = express.Router();
/* GET home page. */
router.get('/', function(req, res, next) {
res.render('index', { title: 'Express' });
});
/* ---------- mqtt subscriber ---------- */
// mqttを使用するための定義
var mqtt = require('mqtt');
// MQTTブローカーの設定
var client = mqtt.connect({
host: 'localhost',
port: 1883
});
// MQTTブローカーへ接続成功時
client.on('connect', function(){
console.log('subscriber.connected.');
});
// subscribe設定
var topic_test = 'topic_test01';
client.subscribe(topic_test);
// MQTTブローカーからメッセージを受信した際
client.on('message', function(topic, message){
console.log('subscriber.on.message', 'topic:', topic, 'message:', message.toString());
});
module.exports = router;
メッセージを受信するたびにログに出力されるプログラムとなっています。
Publisher(送信者)を準備
最後にPublisherを準備します。
今回はPublisherをPythonで作成していきます。
まず、MQTTを使って通信するためのPythonモジュールを入れます。
pip install paho-mqtt
次にPublisherのプログラムを作成します。
今回は「mqtt_pub.py」という名前で作成し、ファイルを以下のように書き換えます。
#!usr/bin/env python
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt # MQTTのライブラリをインポート
from time import sleep # ウェイトのために使う
import random # ランダム数作成
# ブローカーに接続できたときの処理
def on_connect(client, userdata, flag, rc):
print("Connected with result code " + str(rc))
# ブローカーが切断したときの処理
def on_disconnect(client, userdata, flag, rc):
if rc != 0:
print("Unexpected disconnection.")
# publishが完了したときの処理
def on_publish(client, userdata, mid):
print("publish: {0}".format(mid))
# メイン関数 この関数は末尾のif文から呼び出される
def main():
topic = "topic_test01"
cnt = 0
client = mqtt.Client() # クラスのインスタンス(実体)の作成
client.on_connect = on_connect # 接続時のコールバック関数を登録
client.on_disconnect = on_disconnect # 切断時のコールバックを登録
client.on_publish = on_publish # メッセージ送信時のコールバック
client.connect("localhost", 1883, 60) # 接続先はローカルのMQTTブローカー
# 通信処理スタート
client.loop_start() # subscriberはloop_forever()だが,publishはloop_start()で起動だけさせる
# 永久に繰り返す
while True:
send_msg = '{"No":%d,"Rand":%d}' % (cnt, random.randrange(500))
cnt += 1
if (cnt >= 1000):
cnt = 0
print(send_msg) # メッセージを表示
client.publish(topic,send_msg) # トピック名とメッセージを決めて送信
sleep(1) # 1秒待つ
if __name__ == '__main__': # importされないときだけmain()を呼ぶ
main() # メイン関数を呼び出す
1秒ごとにMQTTブローカーにメッセージを送信するプログラムとなっています。
MQTTで通信をする
はじめにMQTTブローカーを立ち上げます。
ターミナルを開き、サーバのプロジェクトフォルダに移動して、以下のコマンドでMQTTブローカーを動かします。
node broker.js
次に新しくターミナルを開き、以下のコマンドでSubscriberを立ち上げます。
npm start
次に新しくターミナルを開き、以下のコマンドでPublisherのpythonプログラムを実行します。
python mqtt_pub.py
Subscriberのターミナルに以下のようなメッセージが表示されていれば通信成功です。
\mqtt_test>npm start
> mqtt-test@0.0.0 start c:\Work\web\mqtt\mqtt_test
> node ./bin/www
subscriber.connected.
subscriber.on.message topic: topic_test01 message: {"No":0,"Rand":193}
subscriber.on.message topic: topic_test01 message: {"No":1,"Rand":146}
subscriber.on.message topic: topic_test01 message: {"No":2,"Rand":246}
subscriber.on.message topic: topic_test01 message: {"No":3,"Rand":15}
subscriber.on.message topic: topic_test01 message: {"No":4,"Rand":74}
subscriber.on.message topic: topic_test01 message: {"No":5,"Rand":423}
subscriber.on.message topic: topic_test01 message: {"No":6,"Rand":270}
subscriber.on.message topic: topic_test01 message: {"No":7,"Rand":238}
subscriber.on.message topic: topic_test01 message: {"No":8,"Rand":114}
subscriber.on.message topic: topic_test01 message: {"No":9,"Rand":345}
subscriber.on.message topic: topic_test01 message: {"No":10,"Rand":133}
おわりに
MQTTを使って通信を行う方法を紹介しました。
MQTTはIoTに向いている特徴を持っているため、今後も活用の機会が増えていくだろうと感じました。