【Iot開発ノウハウ】MQTT通信を実装してみよう
どうもこんにちは、InomaCreateです。
今回は、Iot開発でよく使われる通信プロトコル 「MQTT」通信について解説していこうと思います。
まずはMQTTの簡単な概要を説明し、その後、MQTT通信を使った簡単なシステムを実装していきたいと思います。
Androidアプリ(MQTT Broker&Publisher)
lightsensor_matt_app
Pythonプログラム(MQTT Subscriber)
mqtt_python
1.MQTTの概要
MQTTは、Message Queueing Telemetry Transportの略で以下のような特徴があります。
Iot(Inter net of Things)は、モノのインターネットを呼ばれ、様々なモノやセンサー等がお互いに通信します。
ですので、上記の様な特徴を備えたMQTT通信は、Iot時代に適した通信方式です。
MQTTの仕組み
続いて、簡単にMQTTの仕組みを説明します。
MQTTには、
MQTTでやり取りするメッセージには、
MQTTの仕組み 例1
以下に簡単に仕組みを説明していきます。
まず、受信側デバイスが受け取りたいメッセージ(トピックA)をブローカーへサブスクライブします。
送信側デバイスは、メッセージ(トピックA)をブローカーへパブリッシュします。
ブローカーは、トピックAをサブスクライブしているデバイスへメッセージを送信する流れとなります。
MQTTの仕組み 例2
以下の例では、上のサブスクライバーは、トピックAをサブスクライブ、下のサブスクライバーはトピックBをサブスクライブしています。
このようなパターンでは、
トピックAがパブリッシュされたとき、トピックAをサブスクライブしているデバイスのみに送信されます。
次にトピックBがパブリッシュされたとき、トピックBをサブスクライブしているデバイスのみに送信される仕組みになります。
MQTTの仕組みを出版業界に例えると
この仕組を出版業界に例えると、
読者が本を書店に購買予約(サブスクライブ)し、作者(パブリッシャー)が本を出版(パブリッシュ)すると、書店(ブローカー)は購買予約している読者(サブスクライバー)に本を届けるイメージです。
MQTT通信も同じように受信デバイスが受信したいトピックをサブスクライブし、該当のトピックが発行されると、そのメッセージを受信できるイメージです。
2.今回作るシステム
次に今回作成する簡易的なMQTT通信システムについて説明します。
今回はAndroid端末をパブリッシャー兼、ブローカー、
PC側をサブスクライバーとします。
作成するシステムとしては、
3.Android端末でパブリッシャー(ブローカーも含む)を作る
照度センサー値を取得する
まずは、Android端末で照度センサーの値を取得する処理を追加してみましょう。
AndroidStudioで新規プロジェクトを追加します。
MainActivity.javaに以下の処理を実装しましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
public class MainActivity extends AppCompatActivity implements SensorEventListener { private SensorManager mSensorManager; private Sensor mLight; private String TAG = "MainActivity"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); // センサーオブジェクトを取得 mSensorManager = (SensorManager) getSystemService(Context.SENSOR_SERVICE); // 照度センサーのオブジェクトを取得 mLight = mSensorManager.getDefaultSensor(Sensor.TYPE_LIGHT); } @Override protected void onResume(){ super.onResume(); // センサーのリスナー登録 mSensorManager.registerListener(this,mLight,SensorManager.SENSOR_DELAY_NORMAL); } @Override protected void onPause() { super.onPause(); // センサーを無効 mSensorManager.unregisterListener(this); } @Override public void onSensorChanged(SensorEvent event) { if(event.sensor.getType() == Sensor.TYPE_LIGHT){ Log.i(TAG,"Light!! value:"+event.values[0]); TextView textView = (TextView)findViewById(R.id.lightValueText); textView.setText(String.valueOf(event.values[0])); } } @Override public void onAccuracyChanged(Sensor sensor, int accuracy) { // センサーの精度を変更するときに使う } } |
センサーイベントを受信するため、MainActivityクラスにSensorEventListenerインターフェイスを実装します。(1行目)
onCreateで、センサーオブジェクト、照度センサーのオブジェクトを取得します。(12行目〜16行目)
Activityが開始された時に呼ばれるonResumeメソッドを実装します。
onResume内でセンサーリスナーを登録します。(21行目〜28行目)
また、Activityが一時停止した場合に呼ばれるonPauseメソッドを実装します。
onPause内でセンサーリスナーの登録を解除します。(29行目〜34行目)
最後に、onSensorChangedメソッドとonAccuracyChangedメソッドを実装します。
onSensorChangedメソッドはセンサー値の値が変化したときに呼ばれます。
event.values[0]内にセンサー値が入っていますので、その値をtextViewに表示させます。(36行目〜42行目)
onAccuracyChangedメソッドはセンサーの精度を変更するときに使います。今回は使いませんので中身は空にしておきます。(45行目〜48行目)
なお、照度センサー値は、textViewに表示させていますので、activity_main.xmlは以下のように変更しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
<?xml version="1.0" encoding="utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android" xmlns:app="http://schemas.android.com/apk/res-auto" xmlns:tools="http://schemas.android.com/tools" android:layout_width="match_parent" android:layout_height="match_parent" tools:context=".MainActivity"> <TextView android:id="@+id/lightValueText" android:layout_width="wrap_content" android:layout_height="wrap_content" android:text="" app:layout_constraintBottom_toBottomOf="parent" app:layout_constraintLeft_toLeftOf="parent" app:layout_constraintRight_toRightOf="parent" app:layout_constraintTop_toTopOf="parent" /> </androidx.constraintlayout.widget.ConstraintLayout> |
ここで一旦ビルドをかけると、以下の様なエラーが発生しました。(同じようなエラーが出た方は以下参考までに)
BuildToolのバージョンが31.0.0では問題があるようですので、以下のようにbuildToolsVersionを”30.0.2″に変更することでエラー回避できました。
1 2 3 4 5 6 7 |
plugins { id 'com.android.application' } android { compileSdkVersion 31 buildToolsVersion "30.0.2" |
MQTT処理を実装する
今回、MQTTブローカーは、
※Android端末側でブローカーも兼ねるので、Mosquittoも使います。
ライブラリを使用するにあたり、gradleバージョン7.2では、ビルドエラーになりました。
もし、ビルドエラーになった方は、私が作成したgradleバージョンに合わせてみてください。(以下)
AndroidStudio File -> Project Structure -> Projectタブを開き、バージョンを確認。
なお、プロジェクトのbuild.gradleファイルは以下です。(参考までに)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
// Top-level build file where you can add configuration options common to all sub-projects/modules. buildscript { repositories { google() jcenter() } dependencies { classpath "com.android.tools.build:gradle:4.1.1" // NOTE: Do not place your application dependencies here; they belong // in the individual module build.gradle files } } allprojects { repositories { google() jcenter() } } task clean(type: Delete) { delete rootProject.buildDir } |
build.gradele内にライブラリを追加します。(44行目〜49行目)
複数ライブラリの衝突を避けるため、packagingOptionsを追加します。(29行目〜32行目)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
plugins { id 'com.android.application' } android { compileSdkVersion 31 buildToolsVersion "30.0.2" defaultConfig { applicationId "com.example.lightsensor_mqtt_app" minSdkVersion 26 targetSdkVersion 31 versionCode 1 versionName "1.0" testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner" } buildTypes { release { minifyEnabled false proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro' } } compileOptions { sourceCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8 } packagingOptions { pickFirst "META-INF/INDEX.LIST" pickFirst "META-INF/io.netty.versions.properties" } } dependencies { implementation 'androidx.appcompat:appcompat:1.4.1' implementation 'com.google.android.material:material:1.5.0' implementation 'androidx.constraintlayout:constraintlayout:2.1.3' testImplementation 'junit:junit:4.+' androidTestImplementation 'androidx.test.ext:junit:1.1.3' androidTestImplementation 'androidx.test.espresso:espresso-core:3.4.0' implementation 'io.moquette:moquette-netty-parser:0.9' implementation 'io.moquette:moquette-broker:0.12.1' implementation 'io.moquette:moquette-parser-commons:0.8.1' implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' } |
AndroidManifest.xml(マニフェストファイル)に権限関係の処理を追加します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
<?xml version="1.0" encoding="utf-8"?> <manifest xmlns:android="http://schemas.android.com/apk/res/android" package="com.example.lightsensor_mqtt_app"> <uses-permission android:name="android.permission.INTERNET" /> <uses-permission android:name="android.permission.WAKE_LOCK"/> <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/> <application android:allowBackup="true" android:icon="@mipmap/ic_launcher" android:label="@string/app_name" android:roundIcon="@mipmap/ic_launcher_round" android:supportsRtl="true" android:theme="@style/Theme.Lightsensor_mqtt_app"> <activity android:name=".MainActivity"> <intent-filter> <action android:name="android.intent.action.MAIN" /> <category android:name="android.intent.category.LAUNCHER" /> </intent-filter> </activity> <service android:name="org.eclipse.paho.android.service.MqttService" > </service> </application> </manifest> |
続いて、MainActivity.java のonCreate内にBroker起動処理を追加します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public class MainActivity extends AppCompatActivity implements SensorEventListener { private SensorManager mSensorManager; private Sensor mLight; private String TAG = "MainActivity"; io.moquette.broker.Server broker; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); // Broker起動処理 try { broker = new io.moquette.broker.Server(); MemoryConfig memoryConfig = new MemoryConfig(new Properties()); memoryConfig.setProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, this.getFilesDir() + BrokerConstants.DEFAULT_MOQUETTE_STORE_H2_DB_FILENAME); broker.startServer(memoryConfig); Log.i(TAG,"startServer"); } catch (IOException e){ e.printStackTrace(); } |
起動したBrokerと接続する処理を追加します。
onCreateに以下を追加しましょう。
まずは、Android端末自体(自分自身)がブローカーなので、自分のIPアドレスを取得します。(29行目〜44行目)
その後、ブローカーとの接続処理を実行します。(46-71行目)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
public class MainActivity extends AppCompatActivity implements SensorEventListener { private SensorManager mSensorManager; private Sensor mLight; private String TAG = "MainActivity"; io.moquette.broker.Server broker; String IPAddress=null; private MqttAndroidClient mqttAndroidClient; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); // Broker起動処理 try { broker = new io.moquette.broker.Server(); MemoryConfig memoryConfig = new MemoryConfig(new Properties()); memoryConfig.setProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, this.getFilesDir() + BrokerConstants.DEFAULT_MOQUETTE_STORE_H2_DB_FILENAME); broker.startServer(memoryConfig); Log.i(TAG,"startServer"); } catch (IOException e){ e.printStackTrace(); } // 自分のIPアドレスを取得する try { for (NetworkInterface n : Collections.list(NetworkInterface.getNetworkInterfaces())) { for (InetAddress addr : Collections.list(n.getInetAddresses())) { if(addr instanceof Inet4Address && !addr.isLoopbackAddress()) { Log.i(TAG,"IP ADDRESS:"+addr.getHostAddress()); IPAddress = addr.getHostAddress(); TextView textView = (TextView)findViewById(R.id.textIP); textView.setText(IPAddress); } } } } catch (SocketException e){ Log.e(TAG,"IP address not get!!"); e.printStackTrace(); } // クライアントブローカー接続 if(IPAddress!=null) { mqttAndroidClient = new MqttAndroidClient(this, "tcp://" + IPAddress + ":1883", "test-android") { @Override public void onReceive(Context context, Intent intent) { super.onReceive(context, intent); } }; try { MqttConnectOptions options = new MqttConnectOptions(); mqttAndroidClient.connect(options, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken iMqttToken) { Log.d(TAG, "onSuccess"); } @Override public void onFailure(IMqttToken iMqttToken, Throwable throwable) { Log.d(TAG, "onFailure"); } }); } catch (MqttException e) { Log.d(TAG, e.toString()); } } |
あとからPC側でブローカーと接続するために、IPアドレスをアプリ画面に表示しておきます。(上記36-37行目)
activity_main.xmlに以下のようにTextViewを追加します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
<?xml version="1.0" encoding="utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android" xmlns:app="http://schemas.android.com/apk/res-auto" xmlns:tools="http://schemas.android.com/tools" android:layout_width="match_parent" android:layout_height="match_parent" tools:context=".MainActivity"> <TextView android:id="@+id/lightValueText" android:layout_width="wrap_content" android:layout_height="wrap_content" android:text="" app:layout_constraintBottom_toBottomOf="parent" app:layout_constraintLeft_toLeftOf="parent" app:layout_constraintRight_toRightOf="parent" app:layout_constraintTop_toTopOf="parent" /> <TextView android:id="@+id/textView" android:layout_width="wrap_content" android:layout_height="wrap_content" android:layout_marginStart="40dp" android:text="IP Address" app:layout_constraintBottom_toBottomOf="parent" app:layout_constraintStart_toStartOf="parent" app:layout_constraintTop_toTopOf="parent" app:layout_constraintVertical_bias="0.091" /> <TextView android:id="@+id/textIP" android:layout_width="wrap_content" android:layout_height="wrap_content" android:text="" app:layout_constraintBottom_toBottomOf="parent" app:layout_constraintEnd_toEndOf="parent" app:layout_constraintHorizontal_bias="0.065" app:layout_constraintStart_toEndOf="@+id/textView" app:layout_constraintTop_toTopOf="parent" app:layout_constraintVertical_bias="0.091" /> </androidx.constraintlayout.widget.ConstraintLayout> |
最後に、照度センサー値をパブリッシュする処理を追加します。
まずは、センサー値を保存しておくための変数:LightValueを追加します。
1 2 3 4 5 6 7 8 9 |
public class MainActivity extends AppCompatActivity implements SensorEventListener { private SensorManager mSensorManager; private Sensor mLight; private String TAG = "MainActivity"; io.moquette.broker.Server broker; String IPAddress=null; private MqttAndroidClient mqttAndroidClient; private int LightValue; |
LightValueに取得したセンサー値を保存しましょう。
1 2 3 4 5 6 7 8 9 10 |
@Override public void onSensorChanged(SensorEvent event) { if(event.sensor.getType() == Sensor.TYPE_LIGHT){ Log.i(TAG,"Light!! value:"+event.values[0]); TextView textView = (TextView)findViewById(R.id.lightValueText); textView.setText(String.valueOf(event.values[0])); LightValue = (int)event.values[0]; } } |
今回センサー値は、1秒毎にパブリッシュするようにしたいと思いますので、timerを追加します。
TimerTask()クラスのrunメソッド内で、トピック「sensor/light」で照度センサー値をbyte型配列に格納してパブリッシュしています。(25行目)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
public class MainActivity extends AppCompatActivity implements SensorEventListener { private SensorManager mSensorManager; private Sensor mLight; private String TAG = "MainActivity"; io.moquette.broker.Server broker; String IPAddress=null; private MqttAndroidClient mqttAndroidClient; private int LightValue; private Timer timer = new Timer(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); timer.schedule(new TimerTask() { @Override public void run() { if(mqttAndroidClient == null) return; try { Log.i(TAG,"publish!!"); mqttAndroidClient.publish("sensor/light", String.valueOf(LightValue).getBytes(), 0, true); } catch (MqttPersistenceException e) { Log.d(TAG,e.toString()); } catch (MqttException e) { Log.d(TAG,e.toString()); } } },1000,1000); |
これでAndroid端末側の処理ができました。
4.PC側でサブスクライバーを作る
次に、Android端末でパブリッシュしている照度センサー値をPC側(サブスクライバー)で受信するための処理を追加します。
言語はPythonで記載します。(※pythonのバージョンは、Python 3.8.5です)
まずは、MQTTクライアントのライブラリPahoをインストールしましょう。
Python3をインストールしていれば、MACであれば、以下を実行するとインストールできます。
続いて、サブスクライブ処理をPythonで実装します。
以下のように実装しましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import paho.mqtt.client as mqtt def on_connect(client, userdata, flag, rc): print("Connected with result code " + str(rc)) client.subscribe("sensor/light") def on_disconnect(client, userdata, flag, rc): if rc != 0: print("Unexpected disconnection.") def on_message(client, userdata, msg): msg.payload = msg.payload.decode("utf-8") print("Receive message ="+str(msg.payload)+" on topic = "+ msg.topic+ " with Qos "+str(msg.qos)) client = mqtt.Client() client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_message = on_message try: client.connect("192.168.11.8",1883,300) except: print("broker host is not started!! waiting!!") client.loop_forever() |
まず、client = mqtt.Client()でMQTTクライアントを作成します。(16行目)
それぞれブローカーと接続された時、切断時、メッセージを受けた時のコールバックメソッドを追加します。(17-19行目)
client.connect(IPアドレス、ポート番号、待受時間)でクライントをブローカーと接続します。
※IPアドレスは、Android端末側のIPアドレスを設定します。
client.loop_forever()は接続されるまでループして待ち続ける処理です。
def on_connectが、ブローカーと接続した際に呼ばれる処理です。
「sensor/light」トピックをサブスクライブしています。※Android端末側で決めたトピック名です。
def on_disconnectは、ブローカーと切断された時に呼ばれる処理です。
def on_messageがメッセージを受信した際の処理です。
msg.payloadに受信したデータが入っています。
実際に受信したデータは、byte型配列なので、utf-8でデコードします。
受信したメッセージをプリントで表示させています。
5.動かしてみる
では、Android端末でアプリを起動し、PC側でPythonスクリプトを実行してみましょう。
以下のように、Android端末で取得した照度センサー値が、PC側で受信できていればOKです!
動画解説
YouTubeに解説動画もアップしていますので、よろしければどうぞ。
【Iot開発ノウハウ】MQTT通信を実装してみよう(Part1:MQTT通信の概要)
【Iot開発ノウハウ】MQTT通信を実装してみよう(Part2:Android端末でセンサー値を取得する)
【Iot開発ノウハウ】MQTT通信を実装してみよう(Part3: Android端末でMQTT通信処理を追加する)
【Iot開発ノウハウ】MQTT通信を実装してみよう(Part4: MQTT受信側処理の実装)
6.最後に
以上、今回はMQTT通信の簡単なシステムを実装してみました。
今回の実装を応用していけば、色々なIotシステムが作れそうですね。
少しでも参考になれば幸いです。
それでは!
スポンサーリンク