家庭内IoT機材 MQTTシステム設計実践例

IoTシステムを構築する上で、デバイス間の効率的なデータ交換は非常に重要です。その中でもMQTT (Message Queuing Telemetry Transport) は、HTTPと比較して軽量なため多くのIoTプロジェクトで採用されています。しかし、MQTTに関する情報は理論的なものが多く、実践的なシステム設計に関する記事が少なかったので、今回は実際に構築したMQTTシステムを例にその設計思想やポイントついて解説します。これからMQTTを使ったシステムを設計・構築しようとしている方々にとって、少しでも参考になれば幸いです。

以前からIoT灌水システムを家に設置していましたが、問題の早期検出のために死活監視用のステータス情報(ハートビート)を送るようにし、問題検知をした際にすぐにわかるよう、物理的な情報パネルを作成し問題発生時はランプを点灯するようなシステムが必要でした。 そのため、今回はMQTTブローカーに常時接続し、IOT機材を管理できるようなシステムを組みました。具体的には以下のような構成になっています

システム概要

全体システム構成図

  • Raspberry Pi
    • MQTTブローカーとシステム全体を制御するPythonスクリプト(以後コントローラー)が動作しています。また、IoT機器から送られたテレメトリ情報をInfluxDBに保存し、Grafanaでグラフ表示させるサービスも提供しています。
  • 灌水システム
    • 灌水処理を行う独立したIoT機器です。指令は受けず、定期的に水位・電圧・ヘルスチェック情報の送信と、灌水の実行イベント時にMQTTで情報をパブリッシュします。
  • 情報パネル
    • システムの状態や収集したデータを表示する役割を担います。サブスクライブのみで、コントローラーから来た情報を単純に表示するための装置となります。

MQTTトピック設計

MQTTのトピック設計は、システムの柔軟性や拡張性に大きく影響します。今回のシステムでは、「疎結合」と「中央制御」を意識した設計を行いました。 MQTTの具体的なトピックパスは以下のようにしています。

MQTTトピック設計

トピック名の冒頭にデバイス名を置くことで、以下のメリットが得られます。

  • 直感的な理解
    • トピック名を見ただけで、その情報がどのデバイスに関連するものなのかがすぐにわかります。
  • 権限管理の容易さ
    • デバイスごとにPublish/Subscribeの権限を設定する際に、トピック名に基づいて柔軟なアクセス制御が可能になります。
  • 拡張性
    • 新しいデバイスを追加する際にも、一貫した命名規則によりトピック設計が容易になります。

また、Publish/Subscribeにおけるデバイスの役割を明確にすることが重要です。

  • 情報をPublishする役割
    • 「どのデバイスがこのデータを送信するのか」という視点で、送信元デバイスをトピック名の冒頭に置きます
  • 実行コマンドなどをSubscribeする役割
    • 「どのデバイスがこのコマンドを受信するのか」という視点で、受信側デバイスをトピック名の冒頭に置きます。

Publish系トピック:デバイスが「データを発信する」場合

デバイスが自身の状態や計測値、発生したイベントなどを他のデバイスに送信する場合、トピック名の冒頭にはデータを送信するデバイスの識別子を置きます。これにより、「このデバイスがこの情報をPublishする」という関係性が明確になります。 灌水システムがその稼働状況やセンサーデータ、イベントなどをコントローラーに送信(Publish)するケースを例に挙げます。

  • irrigation_system/0001/status
    • 灌水システム「0001」が現在の稼働状況をPublishします。
  • irrigation_system/0001/telemetry/voltage
    • 灌水システム「0001」が電圧に関するテレメトリデータをPublishします。
  • irrigation_system/0001/events/watering
    • 灌水システム「0001」が灌水イベントの発生をPublishします。

これらの例では、すべて「irrigation_system/0001」という灌水システムが情報の発信源であり、その後に情報の種類(status, telemetry, events)が続いています。

Subscribe系トピック:デバイスが「コマンドを受信する」場合

デバイスが他のデバイスからの指示やコマンドを受け取り、それに基づいて特定の動作を実行する場合、トピック名の冒頭にはコマンドを受信するデバイスの識別子を置きます。これにより、「このデバイスがこのコマンドをSubscribeする」という関係性が明確になります。 情報パネルがコントローラーからの警告や注意、情報メッセージを受信(Subscribe)するケースは以下のようになります。

  • information_panel/0001/command/warning
    • 情報パネル「0001」がコントローラーからの警告コマンドをSubscribeします。
  • information_panel/0001/command/caution
    • 情報パネル「0001」がコントローラーからの注意コマンドをSubscribeします。
  • information_panel/0001/command/info
    • 情報パネル「0001」がコントローラーからの情報コマンドをSubscribeします。

これらの例では、すべて「information_panel/0001」というデバイスがコマンドの受信側であり、その後にコマンドの種類(command/warning, command/watering)が続いています。 なお、ペイロードについては必要な情報をJSONL形式にしています。これは手軽さを優先したためです。大規模な運用の場合は、トピック名の省略のほか、ペイロードをMessagePackでシリアライズするなどの対応も必要になるでしょう。

疎結合の実現

IoTデバイスは、他の特定のデバイスや情報システムに直接トピックを送信するのではなく、コントローラーを介して通信しています。これにより、各コンポーネントは互いに独立して機能変更や追加が可能となり、システム全体の柔軟性が向上します。 例えば、新しいセンサーデバイスを追加する場合でも、既存のデバイスやコントローラーのロジックに大きな変更を加える必要はありません。新しいデバイスが定義されたトピックに従ってメッセージを送受信するように設定するだけで、システムに統合できます。中央制御のメリットとして、以下が挙げられます。

  • 一元的なデータ管理・監視
    • システム全体のデータの流れを把握しやすくなります。
  • 障害時の切り分け
    • 問題が発生した場合、コントローラーを中心に原因を特定しやすくなります。
  • ロジックの集約
    • デバイスごとの複雑な判断ロジックをコントローラーに集約することで、個々のデバイスはシンプルな機能に集中できます

Will Message(遺言メッセージ)の必要性

MQTTには、クライアントが予期せず切断された場合に、ブローカーが特定のメッセージを他のクライアントに送信する「Will Message」という機能があります。これはデバイスの死活監視に利用できます。 今回のシステム構成では、コントローラーが各IoTデバイスの接続状態やデータの受信状況を常時監視する設計としています。コントローラー側で定期的なハートビートチェックや、データ受信のタイムアウト監視を行うことで、デバイスの異常を検知できます。そのため、今回作ったシステムではWill Messageを利用していません。 コントローラーによる最後の稼働状況情報受信からの経過時間による監視で十分と判断しました。ただし、システムの要件やデバイスの特性によっては、Will Messageの活用が有効な場合もあります。例えば、ネットワークが不安定な環境や、より即時性の高い異常検知が求められるケースなどです。

IoT機器を作る場合

今回利用している灌水システムは独立した機器として設計していますが、IoT機器として常にネットワークに繋がるなら、機器側ではネットワークと機械制御のみ行い、コントローラー側でロジックの制御を行った方がシンプルで柔軟なシステムが組めそうだなと思いました。というより、一般的なIoT機材はそのような設計になっているはず……

まとめ

本記事では、私が家庭内IoT機器向けに設計・構築したMQTTシステムについて、その物理構成、論理構成、トピック設計の考え方を解説しました。 MQTTはHTTPに比べて軽量で汎用性の高いプロトコルですが、そのメリットを最大限に引き出すためには、システム全体のアーキテクチャ設計が重要となります。特に、トピック設計における疎結合の意識と、中央集権的なコントローラーによる制御は、拡張性や保守性の高いシステムを構築する上で有効なアプローチだと思います

Appendix テレメトリデータの可視化

InfluxDBでテレメトリ情報を保管し、Grafanaを使って可視化するようにしてみました。データの傾向分析が容易になります。

Grafanaでのテレメトリ表示

コントローラーでInfluxDBに書き込むように以下のようなプログラムを追加しました。(以下は水量の例、他のデータを入れる際は、fieldの指定とデータを変えるだけ)

point = Point("irrigation_system").tag("device", “0001”).field("water_volume", water_volume).time(datetime.now(timezone.utc))
influxdb.write(bucket=BUCKET, org=ORG, record=point)

Grafanaでの表示は以下のFluxクエリで対応 (時間範囲指定)

from(bucket: "iothomebucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "irrigation_system")
|> filter(fn: (r) => r["_field"] == "water_volume")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)

Appendix Raspberry Piでの環境構築

InfluxDB Grafana Mosquitto Pythonなど複数のプログラムを動かす必要があるので、Docker Composeにまとめました。 https://github.com/ayumu-bekki/iot_home restart:always を定義することで、Raspberry Piのシステムが再起動したとしても自動で立ち上がるようにしています。

この記事へのコメント