fluent-plugin-sakuraioをリリースしました。
fluent-plugin-sakuraioはsakura.ioのWebSocket APIからデータを受け取るためのFluentdのInput Pluginです。
Fluentdとfluent-plugin-sakuraioを使うことで、連携サービスで対応していない、S3などのデータストアや他サービスへのデータ送信、データ送信前のデータの前処理を簡単に行うことができるようになります。
この記事では、基本的な使い方とrecord_transformerプラグインを使ったデータの意味付けの手順をご紹介します。
使い方
WebSocketのサービスの追加
sakura.ioのコンソールにログインし、sakura.ioリファレンスを参考にプロジェクトにWebSocketのサービスを追加します。
追加すると wss://
で始まるURLが表示されるので、これをメモしておきます。
Fluentdのインストール
Fluentdをサーバにインストールします。
今回は Ubuntu 16.04 を想定しているため、下記のコマンドでインストールします。
curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-xenial-td-agent3.sh | sh
その他のインストール方法はFluentdの公式ドキュメントを見てください。
fluent-plugin-sakuraioのインストール
下記のコマンドを実行します。
sudo apt install pkg-config
sudo td-agent-gem install fluent-plugin-sakuraio
gemでfluentdをインストールした場合は下記のようになります。
gem install fluent-plugin-sakuraio
Fluentdのコンフィグファイルの作成
/etc/td-agent/td-agent.conf
を下記のように修正します。
<source>
@type sakuraio
url wss://api.sakura.io/ws/v1/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
</source>
<match **>
@type file
pash /var/log/td-agent/sakuraio
append true
flush_interval 1
</match>
url
には先程メモしたWebSocketのURLを貼り付けます。
Fluentdの再起動と出力の確認
下記のコマンドでFluentdを再起動します。
sudo systemctl restart td-agent
下記のコマンドを実行すると通信モジュールから送信されてきたデータを確認できるはずです。
tailf /var/log/td-agent/sakuraio.`date +%Y%m%d`.log
出力は下記のようになります。
2017-10-31T17:27:40+09:00 uXXXXXXXXXXX.channels.0 {"module":"uXXXXXXXXXXX","channel":0,"type":"f","value":29.095764}
2017-10-31T17:27:40+09:00 uXXXXXXXXXXX.channels.1 {"module":"uXXXXXXXXXXX","channel":1,"type":"f","value":18.597412}
2017-10-31T17:27:40+09:00 uXXXXXXXXXXX.channels.2 {"module":"uXXXXXXXXXXX","channel":2,"type":"I","value":150873}
record_transformer で意味付けする
sakura.ioから送信されてくるデータはchannel、type、valueしかなく、どれが何を表すのかは後からマッピングする必要がありますが、record_transformerを使うと簡単に実現できます。
今回は例として、ハンズオンで作ることができる、温度、湿度データを送信するシステムを使います。
Fluentdの設定は下記の通りです。
<source>
@type sakuraio
url wss://api.sakura.io/ws/v1/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
</source>
<filter *.channels.0>
@type record_transformer
remove_keys channel,type,value
<record>
temperature ${record["value"]}
</record>
</filter>
<filter *.channels.1>
@type record_transformer
remove_keys channel,type,value
<record>
humidity ${record["value"]}
</record>
</filter>
<filter *.channels.2>
@type record_transformer
remove_keys channel,type,value
<record>
count ${record["value"]}
</record>
</filter>
<match **>
@type file
pash /var/log/td-agent/sakuraio
append true
flush_interval 1
</match>
出力は下記のようになります。
2017-10-31T17:33:41+09:00 uXXXXXXXXXXX.channels.0 {"module":"uXXXXXXXXXXX","temperature":29.04541}
2017-10-31T17:33:41+09:00 uXXXXXXXXXXX.channels.1 {"module":"uXXXXXXXXXXX","humidity":18.597412}
2017-10-31T17:33:41+09:00 uXXXXXXXXXXX.channels.2 {"module":"uXXXXXXXXXXX","count":150879}
最後に
今回の記事はMVNOチームの@higebuが書きました。
通信モジュールにデータを送信する、Output Pluginも近いうちに追加したいと思っています。
何か不具合やご要望がありましたら、気軽にGitHubにIssueを登録していただければと思います。
本記事の内容とはあまり関係ありませんが、MVNOチームではモバイルネットワークのバックエンドに興味のあるエンジニアを募集しています。
興味のある方は@higebuまでご連絡ください。
以上です。