• 加筆
  • 2018.01.26
  • 修正
一 月
14
日曜日

ドアの動き検知システムの構築2

『ドアの動き検知システムの構築1』ではドアに貼り付けたTWELITE 2525Aの加速度情報をMONOSTICK経由でアプリケーションが拾いAWS IoT上に作成したドアを表す「モノ」のステータスを更新するところまで作成した。


現状では最後に発生した加速度情報がモノのシャドウステータスとして記録されるだけであるため時系列にどのような変化が起こったのか分からない。
時系列データとして利用できるよう発生した全ての加速度情報を Amazon Kinesis Data Firehose のストリームへ流しS3上に記録する。

また記録したデータはAmazon Athenaから検索できるようにする。

Firehoseストリームの作成

S3バケットの作成

Firehose経由で加速度情報をS3に保存するためのS3バケットを作成する。

AWSマネージメントコンソールから S3 を開き バケットを作成する

バケット名
naruko-data.nijohando.jp
リージョン
アジアパシフィック(東京)

デリバリーストリームの作成

AWSマネージメントコンソールから Kinesis を開き Data Firehose から Create delivery stream で新規デリバリーストリームを作成。

Delivery stream name
naruko-door1
Source
Direct PUT or other sources
Record transformation
Disabled
Destination
Amazon S3
S3 bucket
naruko-data.nijohando.jp
S3 Prefix
door1/
Buffer size
5 MB
Buffer interval
300 Seconds
S3 compression
GZIP
S3 encryption
Disabled
Error logging
Enabled
IAM role
NarukoFirehoseDelivery
( Create new, or Choose から新規に作成 )

Next Create delivery stream で作成。

Firehose -> S3の疎通確認

AWSマネージメントコンソールから Amazon Kinesis Firehoseコンソール にて先ほど作成したS3 デリバリーストリーム naruko を選択。
Test with demo data からStart sending demo data を押下。

送信したデモデータが指定したBuffer sizeを超えるか、Buffer Intervalが経過することでS3にファイルが作成されることを確認する。

AWS IoT ルールの作成

ここではモノ door1 のシャドウステータス変更時に変更情報がJSONとして流れてくるトピック $aws/things/door1/shadow/update/documents を購読し、取得したJSONを加工して指定のAmazon Kinesis Firehoseストリームへ流すためのルールを作成する。

JSONの構造を確認

トピック $aws/things/door1/shadow/update/documents へパブリッシュされるJSONの構造は以下のようになっている。

{
  "previous": {
    "state": {
      "reported": {
        "publishedAt": "2018-01-11 04:54:22.211",
        "lqi": 180,
        "powerSupplyVoltage": 2695,
        "sensorMode": 10,
        "x": 1,
        "y": -6,
        "z": -100
      }
    },
    "metadata": {
      "reported": {
        "publishedAt": {
          "timestamp": 1515613877
        },
        "lqi": {
          "timestamp": 1515613877
        },
        "powerSupplyVoltage": {
          "timestamp": 1515613877
        },
        "sensorMode": {
          "timestamp": 1515613877
        },
        "x": {
          "timestamp": 1515613877
        },
        "y": {
          "timestamp": 1515613877
        },
        "z": {
          "timestamp": 1515613877
        }
      }
    },
    "version": 272
  },
  "current": {
    "state": {
      "reported": {
        "publishedAt": "2018-01-11 05:19:21.801",
        "lqi": 186,
        "powerSupplyVoltage": 2700,
        "sensorMode": 8,
        "x": -6,
        "y": 0,
        "z": -70
      }
    },
    "metadata": {
      "reported": {
        "publishedAt": {
          "timestamp": 1515615561
        },
        "lqi": {
          "timestamp": 1515615561
        },
        "powerSupplyVoltage": {
          "timestamp": 1515615561
        },
        "sensorMode": {
          "timestamp": 1515615561
        },
        "X": {
          "timestamp": 1515615561
        },
        "Y": {
          "timestamp": 1515615561
        },
        "Z": {
          "timestamp": 1515615561
        }
      }
    },
    "version": 273
  },
  "timestamp": 1515615561
}

previouscurrentでそれぞれ変更前、変更後のステータス情報(state.reported)を持っており、各ステータス情報には metadata として state.reported以下の各プロパティが設定された際のタイムスタンプが含まれている。

また state.reported.publishedAt はアプリケーションが設定したプロパティでドアが動いた事を検知した日時を表す。metadata.reported.XXX.timestamp は AWS IoT側が設定したプロパティでシャドウステータスの XXX プロパティが更新された日時を表している。(いずれもUTC)

JSONの変換

AWS IoTではこれらのトピックに流れてくるJSONに対してSQLライクな記法で射影や選択の操作を行うことができる。 元のJSONは冗長であるため以下の射影を行う事にする。

FROM TO
current.version version
previous.state.reported.<プロパティ名> previous.<プロパティ名>
previous.metadata.reported.publishedAt.timestamp previous.updatedAt
current.state.reported.<プロパティ名> current.<プロパティ名>
current.metadata.reported.publishedAt.timestamp current.updatedAt

射影後の構造は以下となる。

{
  "version": 273,
  "previous": {
    "publishedAt": "2018-01-11 04:54:22.211",
    "lqi": 222,
    "powerSupplyVoltage": 2500,
    "sensorMode": 1,
    "x": -2,
    "y": 9,
    "z": 101,
    "updatedAt": 1515613877
  },
  "current": {
    "publishedAt": "2018-01-11 05:19:21.801",
    "lqi": 222,
    "powerSupplyVoltage": 2500,
    "sensorMode": 10,
    "x": -5,
    "y": 6,
    "z": 97,
    "updatedAt": ,1515615561
  }
}

ルールを作成する

前述のJSON変換実施とAmazon Kinesis Firehoseストリーム経由でS3に保存するためのルールを作成する。

AWSマネージメントコンソールから AWS IoT を開く。

ACT ルールを作成する から新規ルールを作成する。

名前
NarukoDoor1FirehoseS3
SQLバージョンの使用
2016-03-23
属性
current.version as version, previous.state.reported as previous, previous.metadata.reported.publishedAt.timestamp as previous.updatedAt, current.state.reported as current, current.metadata.reported.publishedAt.timestamp as current.updatedAt
トピックフィルター
$aws/things/door1/shadow/update/documents

アクションの追加 から Amazon Kinesis Firehoseストリームにメッセージを送信する を選択しアクション設定画面へ。

ストリーム名
naruko
Separator
\n(改行)
IAMロール名
AWSIoTNarukoZDoor1Firehose
( 新しいロールの作成 から新規に作成 )

アクションの追加ルールの作成 でルールの登録を完了する。

以上でドアを開閉するとJSONデータがAmazon Kinesis Firehoseストリームへ投入され、 ストリーム作成時に指定したBuffer sizeを超えるか、Buffer Intervalが経過することでS3にファイルが作成されるようになる。

Amazon Athenaから検索できるようにする

ドアの動き情報がS3に記録されるようになったのでこれをAtehnaから検索できるようにする。

AWSマネージメントコンソールから Athena を開く。

データベースの作成

Query Editor から CREATE DATABASE 文でデータベースを作成する。

create database naruko

テーブルの作成

DATABASE から naruko を選択し Query Editor から CREATE TABLE 文でテーブルを作成する。

CREATE EXTERNAL TABLE door1 (
  version int,
  `current` struct<
    `publishedAt`:timestamp,
    `lqi`:int,
    `powerSupplyVoltage`:int,
    `sensorMode`:int,
    `x`:int,
    `y`:int,
    `z`:int,
    `updatedAt`:timestamp
  >,
  `previous` struct<
    `publishedAt`:timestamp,
    `lqi`:int,
    `powerSupplyVoltage`:int,
    `sensorMode`:int,
    `x`:int,
    `y`:int,
    `z`:int,
    `updatedAt`:timestamp
  >
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://naruko-data.nijohando.jp/door1/'

以上でテーブルが作成され、S3上のデータが検索可能な状態になった。

検索

ということで色々検索してみる。

最後に扉が動いた際の時刻と加速度情報を検索

SELECT
  version,
  date_add('hour', 9, current.publishedAt),
  current.x,
  current.y,
  current.z
FROM
  door1
ORDER BY
  version DESC
LIMIT 1

日付は全てUTCで記録されているため、date_add関数でJSTに変換している。

指定日のドアの動きを検索

SELECT
  version,
  date_add('hour', 9, current.publishedAt),
  current.x,
  current.y,
  current.z
FROM
  door1
WHERE
  date(date_add('hour', 9, current.publishedAt)) = date('2018-01-14')
ORDER BY
  version DESC

3分以上のドア静止状態からドアが動いた際の直近10件のデータを検索

SELECT
  version,
  date_add('hour', 9, current.publishedAt),
  current.sensormode,
  current.x,
  current.y,
  current.z
FROM
  door1
WHERE
  date_diff('minute', previous.updatedat, current.updatedat) > 3
ORDER BY
  version DESC
LIMIT 10

指定日における3分以上ドア静止状態からドアが動いた際のデータを検索

SELECT
  version,
  date_add('hour', 9, current.publishedAt),
  current.sensormode,
  current.x,
  current.y,
  current.z
FROM
  door1
WHERE
  date_diff('minute', previous.updatedat, current.updatedat) > 3 AND
  date(date_add('hour', 9, current.publishedAt)) = date('2018-01-14')
ORDER BY
  version DESC

以上でドアの動き情報がS3上を蓄積されるようになりクエリで検索できるようになった。
次回の『ドアの動き検知システムの構築3』ではドアが動いた際にリアルタイムにSlackへ通知する機能を追加する。