MQTTで通信しよう【Node.js, Python】

はじめに

以前の記事にて、HTTPリクエストを使ってWebサーバーと通信を行う方法を紹介しました。

今回はMQTTを使って通信を行う方法を紹介します。

 

MQTTとは

MQTT(Message Queue Telemetry Transport)とは、通信プロトコルの一種で、

HTTPと比較し、データ量を少なくでき、1対多の通信を非同期で行うことができるのが特徴です。

データ量が少ないことで、ネットワークやデバイスへの負荷が軽くなるためIoTの分野で注目されています。

 

MQTTで通信をするために、Publisher(送信者)、Subscriber(受信者)の他にMQTTブローカーが必要になります。

MQTTブローカーはPublisherからのメッセージをSubscriberに渡す役割の中継サーバーです。

Topicというメッセージの送受信先の設定ができる仕組みがあり、

Publisherはメッセージの送信時にTopicを指定して送ることができます。

Subscriberは受信したいTopicを指定し、MQTTブローカーに登録することで、

MQTTブローカーに指定したTopicが送られてきた時にSubscriberにデータが送られます。

 

MQTT通信

 

MQTTブローカーを準備

まずは、MQTTブローカーを準備します。

今回はNode上で動くMQTTブローカーを作成します。

最初にNodeサーバーを以下のコマンドで作成します。(参考)

express mqtt_test --view=pug
cd mqtt_test
npm install

 

次にMQTTブローカーを動かすためのソフトウェアを入れます。

npm install aedes
npm install net

MQTTブローカーのプログラムを作成します。

今回は「broker.js」という名前で作成し、ファイルを以下のように書き換えます。

// モジュールの定義
const aedes = require('aedes')()
const server = require('net').createServer(aedes.handle)

//MQTTブローカーのポート
const port = 1883

// クライアントエラーの場合
aedes.on('clientError', function (client, err) {
	console.log('client error', client.id, err.message, err.stack)
})

// 接続エラーの場合
aedes.on('connectionError', function (client, err) {
	console.log('client error', client, err.message, err.stack)
})

// publishされた場合
aedes.on('publish', function (packet, client) {
	if (client) {
		console.log('message from client', client.id)
	}
})

// 新しいsubscriberが接続した場合
aedes.on('subscribe', function (subscriptions, client) {
	if (client) {
		console.log('subscribe from client', subscriptions, client.id)
	}
})

// 新しいクライアントが接続した場合
aedes.on('client', function (client) {
	console.log('new client', client.id)
})

// MQTTブローカー起動
server.listen(port, function () {
	console.log('server listening on port', port)
})

 

Subscriber(受信者)を準備

次にSubscriberを準備します。

今回はSubscriberを

まず、MQTTを使って通信するためのソフトウェアを入れます。

npm install mqtt

 

次にSubscriberのプログラムを作成します。

「mqtt_test\routes\index.js」ファイルを以下のように書き換えます。

var express = require('express');
var router = express.Router();

/* GET home page. */
router.get('/', function(req, res, next) {
	res.render('index', { title: 'Express' });
});

/* ---------- mqtt subscriber ---------- */
// mqttを使用するための定義
var mqtt = require('mqtt');

// MQTTブローカーの設定
var client = mqtt.connect({
	host: 'localhost',
	port: 1883
});

// MQTTブローカーへ接続成功時
client.on('connect', function(){
		console.log('subscriber.connected.');
});

// subscribe設定
var topic_test = 'topic_test01';
client.subscribe(topic_test);

// MQTTブローカーからメッセージを受信した際
client.on('message', function(topic, message){
	console.log('subscriber.on.message', 'topic:', topic, 'message:', message.toString());
});

module.exports = router;

メッセージを受信するたびにログに出力されるプログラムとなっています。

 

Publisher(送信者)を準備

最後にPublisherを準備します。

今回はPublisherをPythonで作成していきます。

まず、MQTTを使って通信するためのPythonモジュールを入れます。

pip install paho-mqtt

 

次にPublisherのプログラムを作成します。

今回は「mqtt_pub.py」という名前で作成し、ファイルを以下のように書き換えます。

#!usr/bin/env python
# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt		# MQTTのライブラリをインポート
from time import sleep			# ウェイトのために使う
import random				# ランダム数作成


# ブローカーに接続できたときの処理
def on_connect(client, userdata, flag, rc):
	print("Connected with result code " + str(rc))

# ブローカーが切断したときの処理
def on_disconnect(client, userdata, flag, rc):
	if rc != 0:
		print("Unexpected disconnection.")

# publishが完了したときの処理
def on_publish(client, userdata, mid):
	print("publish: {0}".format(mid))

# メイン関数	 この関数は末尾のif文から呼び出される
def main():
	topic = "topic_test01"
	cnt = 0
	client = mqtt.Client()					# クラスのインスタンス(実体)の作成
	client.on_connect = on_connect				# 接続時のコールバック関数を登録
	client.on_disconnect = on_disconnect	 		# 切断時のコールバックを登録
	client.on_publish = on_publish				# メッセージ送信時のコールバック

	client.connect("localhost", 1883, 60)			# 接続先はローカルのMQTTブローカー

	# 通信処理スタート
	client.loop_start()		# subscriberはloop_forever()だが,publishはloop_start()で起動だけさせる

	# 永久に繰り返す
	while True:
		send_msg = '{"No":%d,"Rand":%d}' % (cnt, random.randrange(500))
		cnt += 1
		if (cnt >= 1000):
			cnt = 0

		print(send_msg)					# メッセージを表示

		client.publish(topic,send_msg)			# トピック名とメッセージを決めて送信
		sleep(1)	 				# 1秒待つ

if __name__ == '__main__':					# importされないときだけmain()を呼ぶ
	main()							# メイン関数を呼び出す

1秒ごとにMQTTブローカーにメッセージを送信するプログラムとなっています。

 

MQTTで通信をする

はじめにMQTTブローカーを立ち上げます。

ターミナルを開き、サーバのプロジェクトフォルダに移動して、以下のコマンドでMQTTブローカーを動かします。

node broker.js

次に新しくターミナルを開き、以下のコマンドでSubscriberを立ち上げます。

npm start

次に新しくターミナルを開き、以下のコマンドでPublisherのpythonプログラムを実行します。

python mqtt_pub.py

 

Subscriberのターミナルに以下のようなメッセージが表示されていれば通信成功です。

\mqtt_test>npm start

> mqtt-test@0.0.0 start c:\Work\web\mqtt\mqtt_test
> node ./bin/www

subscriber.connected.
subscriber.on.message topic: topic_test01 message: {"No":0,"Rand":193}
subscriber.on.message topic: topic_test01 message: {"No":1,"Rand":146}
subscriber.on.message topic: topic_test01 message: {"No":2,"Rand":246}
subscriber.on.message topic: topic_test01 message: {"No":3,"Rand":15}
subscriber.on.message topic: topic_test01 message: {"No":4,"Rand":74}
subscriber.on.message topic: topic_test01 message: {"No":5,"Rand":423}
subscriber.on.message topic: topic_test01 message: {"No":6,"Rand":270}
subscriber.on.message topic: topic_test01 message: {"No":7,"Rand":238}
subscriber.on.message topic: topic_test01 message: {"No":8,"Rand":114}
subscriber.on.message topic: topic_test01 message: {"No":9,"Rand":345}
subscriber.on.message topic: topic_test01 message: {"No":10,"Rand":133}

 

おわりに

MQTTを使って通信を行う方法を紹介しました。

MQTTはIoTに向いている特徴を持っているため、今後も活用の機会が増えていくだろうと感じました。