フューチャー技術ブログ

GoでMQTT!! ~温湿度マイスターbotの作成~(前編)

Louis Reed on unsplash.com Unsplash

はじめに

こんにちは。TIG/DXユニット所属の宮永です。

今回はAWSサービスのうちの一つAWS IoTを使用してRaspberryPiとのMQTTによる通信を行います。

AWS IoTを使用したMQTTのチュートリアルはAWS公式からも詳細なハンズオン記事が出ています。

本記事はこちらのハンズオンを基にGo言語を使用してMQTTによる通信を行いました。(公式の記事はPythonで実装されています。)

最終的には室内の快適な温湿度を教えてくれる「温湿度マイスターbot」を作成します。
image.png

なお、本記事で作成したコードは

にて公開しています。

MQTTとは

MQTTはメッセージングプロトコルです。
以下 mqtt.orgより引用です。

MQTTは、モノのインターネット(IoT)用のOASIS標準メッセージングプロトコルです。これは、非常に軽量なパブリッシュ/サブスクライブメッセージングトランスポートとして設計されており、コードフットプリントが小さくネットワーク帯域幅が最小のリモートデバイスを接続するのに理想的です。今日のMQTTは、自動車、製造、電気通信、石油およびガスなど、さまざまな業界で使用されています。
MQTT - The Standard for IoT Messaging

MQTTはHTTPリクエストのようなリクエスト/レスポンスといったプロトコルとは異なり、イベント駆動型のパブリッシュ/サブスクライブプロトコルです。

下図にパブリッシュ/サブスクライブの概要を示します。

Publisherはセンシングの情報(温度や湿度、速度など)をBrokerに配信します。SubscriberはBrokerをSubscribeし、一定間隔で情報を受け取ります。このような構成から、PublisherとSubscriberは疎な結合となっています。拡張性が高く、軽量であるという点でIoTデバイスを使用した通信プロトコルとして注目されているとのことです。

image.png

AWS IoTとは

AWS IoT は、IoT デバイスを他のデバイスおよび AWS クラウドサービスに接続するクラウドサービスを提供します。AWS IoT は、IoT デバイスを AWS IoT ベースのソリューションに統合するのに役立つデバイスソフトウェアを提供します。デバイスが AWS IoT に接続できる場合、AWS IoT は AWS が提供するクラウドサービスにそれらのデバイスを接続できます。
AWS IoT とは - AWS IoT Core

AWSIoTは各種AWSサービスとIoTデバイスとを手軽に連携できるサービスを展開しています。
今回はAWS IoT標準サービスで提供されているMQTTブローカーを利用してMQTT通信にトライします。

システム構成

今回作成するものは室内の温湿度を定期的にセンシングし、Slackに温湿度のプロット図を定期的に送信する仕組みです。

DHT22という温湿度センサをRaspberryPi3B+に取り付けて2時間ごとに温湿度を取得します。取得した温湿度をMQTTによってAWS IoTにPublishします。AWS IoTはDynamoDBと連携させることで、Subscribeしたデータを蓄積します。

また、RaspberryPiではPythonスクリプトも同時に起動しておきます。PythonではBoto3を使用してDynamoDBに向けて定期的にQueryを行います。受け取った情報からtimestampを横軸、温度湿度を縦軸にとったプロット図を作成します。作成したプロット図は2時間ごとにSlackに投稿するという仕組みにしています。
(※冒頭のプロット図は便宜的に1分毎のデータをプロットしています。)

image.png

開発環境

ハードウェア

ソフトウェア

開発はwindows10環境、WSL2上で行いました。標準モジュール以外で使用したものを以下に列挙します。

こちらは余談ですが、VSCodeを使ってRaspberryPi上のソースコードを編集する際はVSCodeのSSH機能が非常に便利です。
以下の記事に詳しく記載されているのでぜひ利用してみてください。
VSCodeのSSH接続機能で、RaspberryPi内のコードを編集してデバッグ - Qiita

実装

実装は以下の手順で進めます。

  1. DHT22から温湿度情報を取得する
  2. AWS IoTを使用してRaspberryPiからのPublish動作確認
  3. DHT22の温湿度情報をAWS IoTへPublish
  4. AWS IoTで取得した温湿度情報をDynamoDBに連携
  5. Boto3を使用してDynamoDBからデータをQuery、データ整形
  6. 取得データをmatplotlibで可視化
  7. 作成したプロット図をSlack APIで画像投稿

1. DHT22から温湿度情報を取得する。

使用した温湿度センサはこちらです。
DSD TECH DHT22 温湿度センサーモジュール AM2302チップ付き

DHT22.JPG

まずはこちらの温湿度センサをジャンパワイヤーを使用してRaspberryPiに接続します。

接続するピンはVccが物理ピン1、GNDが物理ピン6,DATがGPIO2(物理ピン3)です。

DHT22からGo言語を使用して温湿度情報を取得するためにこちらのモジュールを利用させていただきました。

非常にシンプルに記述されており、摂氏と華氏の変換も実装されていたため使いやすかったです。

以下のコマンドでモジュールを取得してください。

go get github.com/MichaelS11/go-dht

まずは、温湿度情報を格納するMyDHT22構造体を定義します。

model.go
package dht22

import (
"fmt"
"github.com/MichaelS11/go-dht"
"time"
)

type MyDHT22 struct {
Temperature float64
Humidity float64
Timestamp time.Time
}

この構造体に対して温湿度を情報を取得するReadメソッドを定義します。

model.go
func (d MyDHT22) Read() MyDHT22 {
err := dht.HostInit()
if err != nil {
fmt.Println("HostInit error:", err)
return d
}

dht, err := dht.NewDHT("GPIO2", dht.Celsius, "")
if err != nil {
fmt.Println("NewDHT error:", err)
return d
}

humidity, temperature, err := dht.ReadRetry(11)
if err != nil {
fmt.Println("Read error:", err)
return d
}
d.Humidity = humidity
d.Temperature = temperature
d.Timestamp = time.Now()
return d
}

dht.NewDHT("GPIO2", dht.Celsius, "")にてDATの接続先を指定してください。また、Celsius(摂氏)とFahrenheit(華氏)が選択できるため、Celsiusを入力します。
それでは、DHT22よりセンシング情報を正しく取得できているか確かめます。
sampleフォルダを作成し、以下の様にmodel.godht22配下に格納します。

.
├── dht22
│ └── model.go
├── go.mod
├── go.sum
└── main.go

main.goは以下の様に記述します。

main.go
package main

import (
"fmt"
"time"

"github.com/sample/dht22"
)

func main() {
for {
var mydht dht22.MyDHT22
PubMsg := mydht.Read()
fmt.Println(PubMsg)
time.Sleep(2 * time.Second)
}
}

go mod initgo mod tidyを実行した後、上記のようなディレクトリ構成となるはずです。
それではgo run main.gomain.goを実行します。以下の様にターミナル上に表示されれば成功です。
1列目が温度、2列目が湿度、3列目が取得時刻です。

{25.3 48.1 2021-09-20 18:59:59.716042512 +0900 JST m=+7.022936648}
{25.3 48.1 2021-09-20 19:00:02.724946254 +0900 JST m=+10.031840338}
{25.4 50.5 2021-09-20 19:00:07.733240959 +0900 JST m=+15.040135043}
{25.3 50.4 2021-09-20 19:00:10.7415645 +0900 JST m=+18.048459365}
{25.3 48.1 2021-09-20 19:00:13.749694254 +0900 JST m=+21.056588391}
...

ここで作成したmodel.goは後の工程でも使用するので削除しないようにしてください。

2. AWS IoTを使用してPublishの動作確認

AWS IoTとRaspberryPiの連携は、「ポリシーの作成」から始まります。
「ポリシーの作成」から「モノの作成」までの工程はこちらのページに記載されている通りに行ってください。
AWS IoT Core の設定 :: AWS IoT Core 初級 ハンズオン
手順通り進めると以下の様に5つのファイルが作成されるはずです。こちらは後程使用するため、RaspberryPi上に格納してください。

.
├── xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-certificate.pem.crt
├── xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-private.pem.key
├── xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-public.pem.key
├── AmazonRootCA1.pem
└── AmazonRootCA3.pem

それでは、RaspberryPiからMQTTを使用してメッセージを送信します。

実装はGo言語で行います。実装の際には以下2点の記事を大いに参考にさせていただきました。

以下、実装したコードです。

main.go
package main

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

const (
ThingName = "xxxxxxxxxxxxxxxxx"
RootCAFile = "AmazonRootCA1.pem"
CertFile = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-certificate.pem.crt"
KeyFile = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-private.pem.key"
PubTopic = "topic/to/publish"
endpoint = "xxxxxxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com"
QoS = 1
)

func main() {

tlsConfig, err := newTLSConfig()
if err != nil {
panic(fmt.Sprintf("failed to construct tls config: %v", err))
}
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("ssl://%s:%d", endpoint, 443))
opts.SetTLSConfig(tlsConfig)
opts.SetClientID(ThingName)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(fmt.Sprintf("failed to connect broker: %v", token.Error()))
}
PubMsg := `{"MQTT":{"message":"Messaging from my RaspberryPi!!"}}`
log.Printf("publishing %s...\n", PubTopic)
if token := client.Publish(PubTopic, QoS, false, PubMsg); token.Wait() && token.Error() != nil {
panic(fmt.Sprintf("failed to publish %s: %v", PubTopic, token.Error()))
}
fmt.Println(PubMsg)
client.Disconnect(250)
}

func newTLSConfig() (*tls.Config, error) {
rootCA, err := ioutil.ReadFile(RootCAFile)
if err != nil {
return nil, err
}
certpool := x509.NewCertPool()
certpool.AppendCertsFromPEM(rootCA)
cert, err := tls.LoadX509KeyPair(CertFile, KeyFile)
if err != nil {
return nil, err
}
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
return nil, err
}
return &tls.Config{
RootCAs: certpool,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{cert},
NextProtos: []string{"x-amzn-mqtt-ca"},
}, nil
}

こちらのスクリプトではPubMsgで定義されたJSONを送信しています。

それでは上記スクリプトを実行したときにAWS IoTコンソール上でSubscribeが正しく動作しているか確かめます。

以下、画像上部はVSCode上でmain.goを実行しています。画像上部はAWS IoTコンソールにてRaspberryPiからのメッセージをSubscribeしています。

main.goの実行とともにコンソール上でも配信を受け取っていることが確認できます。

mqttdemo.gif

メッセージの送受信が確認できたところで、次に先ほどの実装で取得した温湿度をpayloadとして配信します。

3. DHT22の温湿度情報をPublish

「2. AWS IoTを使用してPublishの動作確認」にて取得した各種証明書とmain.goを同階層に格納してください。
同様に「1. DHT22から温湿度情報を取得する」にて実装したmodel.godht22サブディレクトリとして格納してください。

ディレクトリ構成

.
├── xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-certificate.pem.crt
├── xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-private.pem.key
├── xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-public.pem.key
├── AmazonRootCA1.pem
├── AmazonRootCA3.pem
├── dht22
│ └── model.go
├── go.mod
├── go.sum
└── main.go

「2. AWS IoTを使用してPublishの動作確認」の実装よりmain関数部分を少し変更します。
「1. DHT22から温湿度情報を取得する」にて実装したMYDHT22構造体を呼び出し、json.MarshallでJSONにしてPubMsgに渡しています。

データの取得と送信は2秒間隔でおこなっています。

main.go
...(省略)

func main() {

tlsConfig, err := newTLSConfig()
if err != nil {
panic(fmt.Sprintf("failed to construct tls config: %v", err))
}
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("ssl://%s:%d", endpoint, 443))
opts.SetTLSConfig(tlsConfig)
opts.SetClientID(ThingName)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(fmt.Sprintf("failed to connect broker: %v", token.Error()))
}
for {
var mydht dht22.MyDHT22
PubMsg, _ := json.Marshal(mydht.Read())

log.Printf("publishing %s...\n", PubTopic)
if token := client.Publish(PubTopic, QoS, false, PubMsg); token.Wait() && token.Error() != nil {
panic(fmt.Sprintf("failed to publish %s: %v", PubTopic, token.Error()))
}

time.Sleep(2 * time.Second)
}
}

...(省略)

下図上部がmain.goの実行、下部がSubscriptionの様子を示しています。

TimeStampに注目すると、2秒毎に新規データが蓄積されていることがわかります。

mqttdemo2.gif

前編まとめ

前編では温湿度センサーDHT22より取得したデータをMQTTでAWS IoTにPublishするところまでを行いました。

後編ではAWS IoTで受け取ったデータをDynamoDBに連携します。

DynamoDBに蓄積されたデータをBoto3によって取得し、Slackbotで配信するところまで行います。