こんにちは、Michaelです。
今回は、「AWS IoTルールの基本」の第6回として、入力したメッセージをトリガーにしてAWS Lambdaを起動するアクションを紹介します。
今回の構成
今回はIoTデバイスからのメッセージを受けて、気象APIからの先3時間内の予想気象情報を返す仕組みを想定します。
AWS IoTのトピック「test/pub」にPublishされたメッセージでLambdaを起動します。
起動したLambdaが、DynamoDBから機器の地点情報を取り出し、気象Web API「OpenWeatherMap」からその地点の先3時間内の予想気象情報を取得してデバイスに返します。
Lambda関数の作成
Lambdaと連携するにあたり、あらかじめ以下のようなLambda関数を作成しておきます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# coding: utf-8 import json import urllib.request import urllib.parse import boto3 from boto3.dynamodb.conditions import Key, Attr # ①Functionのロードをログに出力 print('Loading function') # ②DynamoDBオブジェクトを取得 dynamodb = boto3.resource('dynamodb') # ③AWS IoT Data Planeオブジェクトを取得 iot = boto3.client('iot-data') def lambda_handler(event, context): # ④DynamoDBテーブル名を指定 table_name = "device_properties" # ⑤参照データのキーの設定 partition_key = {"client_id": event["client_id"]} # ⑥DynamoDBテーブルのオブジェクトを取得 dynamotable = dynamodb.Table(table_name) # ⑦デバイス地点データの読み取り res = dynamotable.get_item(Key=partition_key) device_info = res["Item"] # ⑧気象API(OpneWeatherMap)から気象予報データ取得 api_key = "4f6611449a354542415da6e7393a5b21" url_qs = { "APPID": api_key, "lat": device_info["lat"], "lon": device_info["lon"], "units": "metric" } url = "http://api.openweathermap.org/data/2.5/forecast?%s" % urllib.parse.urlencode(url_qs) res = urllib.request.urlopen(url) data = json.loads(res.read()) # ⑨直近3時間の気象予報値の取得 timestamp = event["timestamp"] / 1000 for forcast in data["list"]: if forcast["dt"] > timestamp and forcast["dt"] <= timestamp+10800: forcast_time = forcast["dt"] forcast_data = forcast["main"] break # ⑩送信用データ payload = { "client_id": event["client_id"], "forcast_time": forcast_time } payload.update(forcast_data) # ⑪AWS IoTにデータをPublish try: iot.publish( topic="test/sub", qos=0, payload=json.dumps(payload, ensure_ascii=False) ) print("Publish to AWS IoT: OK") except Exception as e: print("Publish to AWS IoT: Error") print(e) |
ルールアクションの設定
アクションとして「メッセージデータを渡す Lambda 関数を呼び出す」を選択します。
設定内容は起動するLambda関数のみとなります。
設定をすると、自動的にLambda関数に対してAWS IoTからのアクセス許可が追加されます。
メッセージソースの設定
メッセージソースには以下のAWS IoT SQLクエリを設定します。
組み込み関数「clientid()」、「timestamp()」でClientId「client_id」と受信時刻「timestamp」を付加します。
1 |
SELECT clientid() AS client_id, timestamp() AS timestamp, temperature + 273.15 AS kelvin, * FROM 'test/pub' |
実行結果
MQTT.fxを送信デバイスとしてテストを実行してみます。
ClientIdに「device001」を設定し、返送用トピックの「test/sub」をSubscribeした状態にしておきます。
Publish先トピックに「test/pub」を設定し、以下のようなセンサーデータをPublishします。
1 2 3 4 5 |
{ "temperature": 10.6, "humidity": 51, "pressure": 1023.1 } |
メッセージをPublishすると、Subscribeしている「test/sub」に予想気象情報が返ってきていることがわかりました。
まとめ
AWS IoTとAWS Lambdaの組み合わせは、AWSを使ったIoTインフラの一部としてよく使われます。AWS Lambdaであれば、AWS IoTではできないより高度な処理も可能なため、Web APIだけでなくAmazon Machine Learningなど他のAWSサービスとの連携も容易になります。
今回は以上となります。
次回もお楽しみに!