『ドアの動き検知システムの構築1』ではドアに貼り付けたTWELITE 2525Aの加速度情報をMONOSTICK経由でアプリケーションが拾いAWS IoT上に作成したドアを表す「モノ」のステータスを更新するところまで作成した。
現状では最後に発生した加速度情報がモノのシャドウステータスとして記録されるだけであるため時系列にどのような変化が起こったのか分からない。
時系列データとして利用できるよう発生した全ての加速度情報を Amazon Kinesis Data Firehose のストリームへ流しS3上に記録する。
また記録したデータはAmazon Athenaから検索できるようにする。
Firehoseストリームの作成
S3バケットの作成
Firehose経由で加速度情報をS3に保存するためのS3バケットを作成する。
AWSマネージメントコンソールから S3 を開き バケットを作成する
- バケット名
naruko-data.nijohando.jp
- リージョン
- アジアパシフィック(東京)
デリバリーストリームの作成
AWSマネージメントコンソールから Kinesis を開き Data Firehose から Create delivery stream で新規デリバリーストリームを作成。
- Delivery stream name
naruko-door1
- Source
- Direct PUT or other sources
- Record transformation
- Disabled
- Destination
- Amazon S3
- S3 bucket
naruko-data.nijohando.jp
- S3 Prefix
door1/
- Buffer size
5 MB
- Buffer interval
300 Seconds
- S3 compression
- GZIP
- S3 encryption
- Disabled
- Error logging
- Enabled
- IAM role
NarukoFirehoseDelivery
( Create new, or Choose から新規に作成 )
Next Create delivery stream で作成。
Firehose -> S3の疎通確認
AWSマネージメントコンソールから Amazon Kinesis Firehoseコンソール にて先ほど作成したS3 デリバリーストリーム naruko
を選択。
Test with demo data からStart sending demo data を押下。
送信したデモデータが指定したBuffer sizeを超えるか、Buffer Intervalが経過することでS3にファイルが作成されることを確認する。
AWS IoT ルールの作成
ここではモノ door1
のシャドウステータス変更時に変更情報がJSONとして流れてくるトピック $aws/things/door1/shadow/update/documents
を購読し、取得したJSONを加工して指定のAmazon Kinesis Firehoseストリームへ流すためのルールを作成する。
JSONの構造を確認
トピック $aws/things/door1/shadow/update/documents
へパブリッシュされるJSONの構造は以下のようになっている。
{
"previous": {
"state": {
"reported": {
"publishedAt": "2018-01-11 04:54:22.211",
"lqi": 180,
"powerSupplyVoltage": 2695,
"sensorMode": 10,
"x": 1,
"y": -6,
"z": -100
}
},
"metadata": {
"reported": {
"publishedAt": {
"timestamp": 1515613877
},
"lqi": {
"timestamp": 1515613877
},
"powerSupplyVoltage": {
"timestamp": 1515613877
},
"sensorMode": {
"timestamp": 1515613877
},
"x": {
"timestamp": 1515613877
},
"y": {
"timestamp": 1515613877
},
"z": {
"timestamp": 1515613877
}
}
},
"version": 272
},
"current": {
"state": {
"reported": {
"publishedAt": "2018-01-11 05:19:21.801",
"lqi": 186,
"powerSupplyVoltage": 2700,
"sensorMode": 8,
"x": -6,
"y": 0,
"z": -70
}
},
"metadata": {
"reported": {
"publishedAt": {
"timestamp": 1515615561
},
"lqi": {
"timestamp": 1515615561
},
"powerSupplyVoltage": {
"timestamp": 1515615561
},
"sensorMode": {
"timestamp": 1515615561
},
"X": {
"timestamp": 1515615561
},
"Y": {
"timestamp": 1515615561
},
"Z": {
"timestamp": 1515615561
}
}
},
"version": 273
},
"timestamp": 1515615561
}
previous
、current
でそれぞれ変更前、変更後のステータス情報(state.reported
)を持っており、各ステータス情報には metadata
として state.reported
以下の各プロパティが設定された際のタイムスタンプが含まれている。
また state.reported.publishedAt
はアプリケーションが設定したプロパティでドアが動いた事を検知した日時を表す。metadata.reported.XXX.timestamp
は AWS IoT側が設定したプロパティでシャドウステータスの XXX
プロパティが更新された日時を表している。(いずれもUTC)
JSONの変換
AWS IoTではこれらのトピックに流れてくるJSONに対してSQLライクな記法で射影や選択の操作を行うことができる。 元のJSONは冗長であるため以下の射影を行う事にする。
FROM | TO |
---|---|
current.version | version |
previous.state.reported.<プロパティ名> | previous.<プロパティ名> |
previous.metadata.reported.publishedAt.timestamp | previous.updatedAt |
current.state.reported.<プロパティ名> | current.<プロパティ名> |
current.metadata.reported.publishedAt.timestamp | current.updatedAt |
射影後の構造は以下となる。
{
"version": 273,
"previous": {
"publishedAt": "2018-01-11 04:54:22.211",
"lqi": 222,
"powerSupplyVoltage": 2500,
"sensorMode": 1,
"x": -2,
"y": 9,
"z": 101,
"updatedAt": 1515613877
},
"current": {
"publishedAt": "2018-01-11 05:19:21.801",
"lqi": 222,
"powerSupplyVoltage": 2500,
"sensorMode": 10,
"x": -5,
"y": 6,
"z": 97,
"updatedAt": ,1515615561
}
}
ルールを作成する
前述のJSON変換実施とAmazon Kinesis Firehoseストリーム経由でS3に保存するためのルールを作成する。
AWSマネージメントコンソールから AWS IoT を開く。
ACT ルールを作成する から新規ルールを作成する。
- 名前
NarukoDoor1FirehoseS3
- SQLバージョンの使用
2016-03-23
- 属性
current.version as version, previous.state.reported as previous, previous.metadata.reported.publishedAt.timestamp as previous.updatedAt, current.state.reported as current, current.metadata.reported.publishedAt.timestamp as current.updatedAt
- トピックフィルター
$aws/things/door1/shadow/update/documents
アクションの追加 から Amazon Kinesis Firehoseストリームにメッセージを送信する を選択しアクション設定画面へ。
- ストリーム名
naruko
- Separator
\n(改行)
- IAMロール名
AWSIoTNarukoZDoor1Firehose
( 新しいロールの作成 から新規に作成 )
アクションの追加 、ルールの作成 でルールの登録を完了する。
以上でドアを開閉するとJSONデータがAmazon Kinesis Firehoseストリームへ投入され、 ストリーム作成時に指定したBuffer sizeを超えるか、Buffer Intervalが経過することでS3にファイルが作成されるようになる。
Amazon Athenaから検索できるようにする
ドアの動き情報がS3に記録されるようになったのでこれをAtehnaから検索できるようにする。
AWSマネージメントコンソールから Athena を開く。
データベースの作成
Query Editor から CREATE DATABASE
文でデータベースを作成する。
create database naruko
テーブルの作成
DATABASE から naruko
を選択し Query Editor から CREATE TABLE
文でテーブルを作成する。
CREATE EXTERNAL TABLE door1 (
version int,
`current` struct<
`publishedAt`:timestamp,
`lqi`:int,
`powerSupplyVoltage`:int,
`sensorMode`:int,
`x`:int,
`y`:int,
`z`:int,
`updatedAt`:timestamp
>,
`previous` struct<
`publishedAt`:timestamp,
`lqi`:int,
`powerSupplyVoltage`:int,
`sensorMode`:int,
`x`:int,
`y`:int,
`z`:int,
`updatedAt`:timestamp
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://naruko-data.nijohando.jp/door1/'
以上でテーブルが作成され、S3上のデータが検索可能な状態になった。
検索
ということで色々検索してみる。
最後に扉が動いた際の時刻と加速度情報を検索
SELECT
version,
date_add('hour', 9, current.publishedAt),
current.x,
current.y,
current.z
FROM
door1
ORDER BY
version DESC
LIMIT 1
日付は全てUTCで記録されているため、date_add
関数でJSTに変換している。
指定日のドアの動きを検索
SELECT
version,
date_add('hour', 9, current.publishedAt),
current.x,
current.y,
current.z
FROM
door1
WHERE
date(date_add('hour', 9, current.publishedAt)) = date('2018-01-14')
ORDER BY
version DESC
3分以上のドア静止状態からドアが動いた際の直近10件のデータを検索
SELECT
version,
date_add('hour', 9, current.publishedAt),
current.sensormode,
current.x,
current.y,
current.z
FROM
door1
WHERE
date_diff('minute', previous.updatedat, current.updatedat) > 3
ORDER BY
version DESC
LIMIT 10
指定日における3分以上ドア静止状態からドアが動いた際のデータを検索
SELECT
version,
date_add('hour', 9, current.publishedAt),
current.sensormode,
current.x,
current.y,
current.z
FROM
door1
WHERE
date_diff('minute', previous.updatedat, current.updatedat) > 3 AND
date(date_add('hour', 9, current.publishedAt)) = date('2018-01-14')
ORDER BY
version DESC
以上でドアの動き情報がS3上を蓄積されるようになりクエリで検索できるようになった。
次回の『ドアの動き検知システムの構築3』ではドアが動いた際にリアルタイムにSlackへ通知する機能を追加する。