fluent-plugin-sakuraio をリリースしました

Pocket

sakuraio_to_fluentd

fluent-plugin-sakuraioをリリースしました。

fluent-plugin-sakuraiosakura.ioのWebSocket APIからデータを受け取るためのFluentdのInput Pluginです。

Fluentdfluent-plugin-sakuraioを使うことで、連携サービスで対応していない、S3などのデータストアや他サービスへのデータ送信、データ送信前のデータの前処理を簡単に行うことができるようになります。

この記事では、基本的な使い方とrecord_transformerプラグインを使ったデータの意味付けの手順をご紹介します。

使い方

WebSocketのサービスの追加

sakura.ioのコンソールにログインし、sakura.ioリファレンスを参考にプロジェクトにWebSocketのサービスを追加します。

追加すると wss:// で始まるURLが表示されるので、これをメモしておきます。

Fluentdのインストール

Fluentdをサーバにインストールします。

今回は Ubuntu 16.04 を想定しているため、下記のコマンドでインストールします。

curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-xenial-td-agent3.sh | sh

その他のインストール方法はFluentdの公式ドキュメントを見てください。

fluent-plugin-sakuraioのインストール

下記のコマンドを実行します。

sudo apt install pkg-config
sudo td-agent-gem install fluent-plugin-sakuraio

gemでfluentdをインストールした場合は下記のようになります。

gem install fluent-plugin-sakuraio

Fluentdのコンフィグファイルの作成

/etc/td-agent/td-agent.conf を下記のように修正します。

<source>
  @type sakuraio
  url wss://api.sakura.io/ws/v1/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
</source>
<match **>
  @type file
  pash /var/log/td-agent/sakuraio
  append true
  flush_interval 1
</match>

url には先程メモしたWebSocketのURLを貼り付けます。

Fluentdの再起動と出力の確認

下記のコマンドでFluentdを再起動します。

sudo systemctl restart td-agent

下記のコマンドを実行すると通信モジュールから送信されてきたデータを確認できるはずです。

tailf /var/log/td-agent/sakuraio.`date +%Y%m%d`.log

出力は下記のようになります。

2017-10-31T17:27:40+09:00       uXXXXXXXXXXX.channels.0 {"module":"uXXXXXXXXXXX","channel":0,"type":"f","value":29.095764}
2017-10-31T17:27:40+09:00       uXXXXXXXXXXX.channels.1 {"module":"uXXXXXXXXXXX","channel":1,"type":"f","value":18.597412}
2017-10-31T17:27:40+09:00       uXXXXXXXXXXX.channels.2 {"module":"uXXXXXXXXXXX","channel":2,"type":"I","value":150873}

record_transformer で意味付けする

sakura.ioから送信されてくるデータはchannel、type、valueしかなく、どれが何を表すのかは後からマッピングする必要がありますが、record_transformerを使うと簡単に実現できます。

今回は例として、ハンズオンで作ることができる、温度、湿度データを送信するシステムを使います。

Fluentdの設定は下記の通りです。

<source>
  @type sakuraio
  url wss://api.sakura.io/ws/v1/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
</source>

<filter *.channels.0>
  @type record_transformer
  remove_keys channel,type,value
  <record>
    temperature ${record["value"]}
  </record>
</filter>
<filter *.channels.1>
  @type record_transformer
  remove_keys channel,type,value
  <record>
    humidity ${record["value"]}
  </record>
</filter>
<filter *.channels.2>
  @type record_transformer
  remove_keys channel,type,value
  <record>
    count ${record["value"]}
  </record>
</filter>

<match **>
  @type file
  pash /var/log/td-agent/sakuraio
  append true
  flush_interval 1
</match>

出力は下記のようになります。

2017-10-31T17:33:41+09:00   uXXXXXXXXXXX.channels.0 {"module":"uXXXXXXXXXXX","temperature":29.04541}
2017-10-31T17:33:41+09:00   uXXXXXXXXXXX.channels.1 {"module":"uXXXXXXXXXXX","humidity":18.597412}
2017-10-31T17:33:41+09:00   uXXXXXXXXXXX.channels.2 {"module":"uXXXXXXXXXXX","count":150879}

最後に

今回の記事はMVNOチームの@higebuが書きました。

通信モジュールにデータを送信する、Output Pluginも近いうちに追加したいと思っています。

何か不具合やご要望がありましたら、気軽にGitHubにIssueを登録していただければと思います。

本記事の内容とはあまり関係ありませんが、MVNOチームではモバイルネットワークのバックエンドに興味のあるエンジニアを募集しています。

興味のある方は@higebuまでご連絡ください。

以上です。

Pocket

Close Menu