なからなLife

geekに憧れと敬意を抱きながら、SE、ITコンサル、商品企画、事業企画、管理会計、総務・情シス、再び受託でDB屋さんと流浪する人のブログです。

Embulkを使ってみる - Kibanaを立ててみた

Embulkとは

「Fruentdのバッチ版」と呼ばれることが多いツールで、FruentdやLogstashが逐次生成されるデータをストリームとして扱って転送するのに対し、Embulkはほぼ変化のないデータの塊を転送するのに向いているエージェント、という位置付けになります。


開発しているのもFluentdのメインコミッターの一人ということで、Fruentdとセットで語られることが多いです。
ですが、今のところFluentdはTresuredata公式での取扱があるのに対し、Embulkはそのポジションには至っていない、関連OSSの1つ、という扱いのようです。


FluentdやLogstashと同様、プラグインによって様々なInput、Filter、Outputに対応しており柔軟に構成を作ることが可能となっています。


今回は、CSVファイルの中身を直接elasticsearchに取り込むところにチャレンジしてみます。


使用するのは、執筆時点の最新版「0.8.28」です。

セットアップ

EmbulkはソースおよびドキュメントともにGithub上で公開されています。
GitHub - embulk/embulk: Embulk: Pluggable Bulk Data Loader. http://www.embulk.org



EmbulkのドキュメントQuickStartに従って進めてみます。


今回はLinux上に立てますので、
https://github.com/embulk/embulk#linux--mac--bsd
に基づいて

curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

としたあと、サンプルCSVを扱うコマンドを実行します。

embulk example ./try1
embulk guess ./try1/seed.yml -o config.yml
embulk preview config.yml
embulk run config.yml

これで、サンプルのcsvファイルと、それを取り扱うための設定ファイル(.yml)が設定されます。


このコマンドの意味を、ちょっとだけ説明しておきます。


「embulk example」コマンドでは、カレントディレクトリに「try1」ディレクトリを生成、設定ファイル「seed.yml」と、「csvディレクトリにtar.gzで固めたサンプルデータファイルを生成します。


「embulk guess」コマンドでは、「seed.yml」をもとに、完全な形式の設定ファイル「config.yml」をカレントディレクトリに生成します。


「config.yml」には「decoders: - {type: gzip}」が保管されていることから、「try/csv/」の中にtar.gzで保存されているファイルを処理できるようになっていることも見えます。


「embulk preview config.yml」は、いわゆるDry Runです。設定にもとづいてInputファイルの中身を正しくパース出来ているかを確認できます。ここでは、サンプルのcsvファイルの中身が4行、表示されます。


「embulk run」で、設定ファイルに基づいて実行されます。処理過程のログの中に、先程のpreviewで確認できた4行が表示されます。



さて、Embulkからcsvファイルの中身を読み込んでelasticsearchへ送り込んでKibanaへ表示させるのが目標ですが、Embulkのドキュメントの中に、ピンポイントでコレについて解説したページが用意されています。
http://www.embulk.org/docs/recipe/scheduled-csv-load-to-elasticsearch-kibana5.html


ご丁寧にElasticsearchのダウンロード&インストールの話から書いてありますが、こちらはすでに完了していますので、EmbulkでElasticsearchにOutputさせるプラグインの導入のところから進めます。
Scheduled bulk data loading to Elasticsearch + Kibana 5 from CSV files — Embulk 0.8 documentation


embulk gem install embulk-output-elasticsearch

このelasticsearch用プラグインにかぎらず、embulkはgemコマンドでプラグインを追加できるようになっています。


次に、csvファイルとseed.ymlを用意し、設定を行うのですが、先程使用したサンプルを流用したいと思います。

mkdir ./try2
mkdir ./try2/csv
cp ./try1/csv/* ./try2/csv/
vi ./try2/seed.yml

# 以下内容を記述
in:
  type: file
  path_prefix: "/root/./try2/csv/sample_"
out:
  type: elasticsearch
  index: embulk
  index_type: embulk
  nodes:
    - host: localhost


rootでやってますwというところは気にせずに、手続きを進めます。

embulk guess ./try2/seed.yml -o config.yml
embulk preview config.yml
embulk run config.yml 


これで完了です。


なお、「定期的に実行し、かつ前回からの差分(増えたファイル)のみ取り込みたい」という場合、

embulk run config.yml -c diff.yml

と実行すると、最後に取り込んだファイル名を「diff.yml」に保持するようになり、次回は(アルファベット順で)そのファイル以降から取込を行ってくれます。
このコマンドをcronなどに指定すれば、スケジューリング実行が可能となります。


このように、リアルタイム性を重視するならFluentdやLogstash、バッチによるバルクロードで対応するならembulkを使うなど、要求シーンによって使い分けが可能です。


サンプルCSVではない任意のCSVファイルを扱う場合、「embulk guess」で生成するconfig.ymlの結果から「columns:」を確認した後、意図した型に変換できていない所を含めてseed.ymlに記述してから、再度config.ymlを生成してあげましょう。


以下は、今回自動生成されたサンプルcsvについて「embulk guess」で生成されたconfig.ymlの中に生成されている列定義です。seed.ymlには全然書いていないところから、ここまで自動生成してくれます。

    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}


Embulkの公式の英語ドキュメント、コマンドラインオプションの説明があまりないのですが、日本人には嬉しいことにQiitaにめっちゃ詳しく書いてありますので、こちらを頼りにすると良さそうです。
qiita.com



あらたに取り込んでみたいcsvファイルがあったら、とりあえずざっくりとしたseed.yml書いてguessしてconfig.yml生成、previewでdry runして、どう認識されるか確認しながら、configの内容をseedにコピペして修正して、を繰り返すのがラクそうですね。
このあたり、すごい良くできていると思いました。


ちょうどこの原稿を書いている最中*1に、Voyage Groupさんの
techlog.voyagegroup.com
が公開され、好評のようです。

やっぱり、実運用で使っている方の記事が公開されるのは助かります。


*1:公開より数日前から書き溜めてます