fluentdを使ってみる - Kibanaを立ててみた
fluentdとは
別名「td-agent」であることからわかるように、Tresure Dataのフロントとなるログ収集エージェントという位置付けですが、やはり様々なプラグインを通じて柔軟な処理が可能となっており、こと日本においては、ログ収集管理のエージェントとしてLogstashより人気があるようです。
Elasticsearch+Kibanaと組み合わせるにあたり、Logstashを使う場合にELKと読んでいたのに対抗して、EFKと呼ぶ人もいるようです。
Logstashで扱ったMySQLのSHOW GLOBAL STATUSの結果ファイルを、今度はfluentd(td-agent)でelasticsearchに登録する流れを追ってみることにします。
手順に入る前に、うまく行かなかった話
Logstash編と同様、MySQLの「GLOBAL STATUS*1」を一定時間間隔で出力したファイルを使っているのですが、ダブルクオートで囲んだ数字は、elasticsearch側で文字列として認識されてしまい、なぜかMappingでは変換できませんでした。fluentdのformat定義の正規表現その他の設定で、うまく””を剥がして数値として扱うことができませんでした。
数値を文字列のママ認識させると、Kibanaで数値として分析できなくなってしまいます。
そんなわけで、ログ出力の仕様の時点で、数字はダブルクオートで囲まないように出力させた上で、formatの定義で正規表現を使ってうまく拾ってあげるようにして対応しました。
Logstashで読み込ませた時のログの例
2017/07/11 18:05:01 "Aborted_clients" "5269" 2017/07/11 18:05:01 "Aborted_connects" "190" 2017/07/11 18:05:01 "Binlog_cache_disk_use" "30646" 2017/07/11 18:05:01 "Binlog_cache_use" "162597" 2017/07/11 18:05:01 "Binlog_stmt_cache_disk_use" "0" 2017/07/11 18:05:01 "Binlog_stmt_cache_use" "2894"
fluentdで読み込ませた時のログの例
2017/08/07 13:48:01 Aborted_clients 40422 2017/08/07 13:48:01 Aborted_connects 422 2017/08/07 13:48:01 Binlog_cache_disk_use 32159 2017/08/07 13:48:01 Binlog_cache_use 472229 2017/08/07 13:48:01 Binlog_stmt_cache_disk_use 0
この辺、Logstashでは全然苦労しませんでした。。。。
Fluentdでそんなに苦しむことないよ、って話を知っている人は教えて欲しいです。
セットアップ
まず、Logstashがjavaベースであるのに対し、fluentdはrubyベースですが、fluentdの一式の中にRubyが含まれています。
Logstashも簡単でしたが、fluentd(td-agent)も、インストールするまでのところは非常に簡単です。elasticsearchへの登録にはプラグインを追加でインストールする必要があるので、そこまでの流れは以下の通りとなります。
curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh /opt/td-agent/embedded/bin/fluent-gem install fluent-plugin-elasticsearch
たったこれだけですね。これだけで、serviceとしても起動可能となります。
設定
Logstashと同様に、In-加工-outについて設定ファイルに定義をしていきます。
インストールした際に、すでに/etc/td-agent/td-agent.confにはデフォルトおよびサンプルの定義がたくさん書いてありますが、この末尾に追記する形で、以下内容を書き加えます。
fluentd自身の挙動を決める
<source> type tail format /(?<logtime>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}) (?<Variable_name>[^ ]*\t)(?<Variable_value>[^ ].*$)/ path /var/log/mysql_monitor/show_global_status_2.log pos_file /tmp/mysql-status_2.pos read_from_head true tag mysql.status </source> <match mysql.status> type elasticsearch host localhost port 9200 logstash_format true logstash_prefix fld-mysql-status type_name fld-mysql-status </match>
構造をLogstashと比較すると、fluentdでは
fluentdの汎用ファイル読み取りでは、「tail」というtypeを使用します。
文字通りtailコマンドのようにファイルの末尾への情報追加を監視して処理をします。
Logstashでgrokを使用していたところを正規表現で定義します。
正規表現ですが、フィールド名(例:?
このfluentd用の正規表現形式のままチェックできるサイトがありますので、こちらで検証するのが良いかと思います。
Fluentular: a Fluentd regular expression editor
LogstashでInputのなかにTypeを定義、FilterとOutputでもTypeに応じた定義を記述したのと同様に、fluentdでは、Soruceでtagを定義し、matchのブロック宣言の中でそのtagを指定することで対応関係を表現、処理を制御します。
matchに定義するアウトプット先としてelasticsearchを使用するにあたり、定義する情報はLogstashとほぼ同じです。
ただし、別途インストールするプラグイン、という位置付けであるため、そのリファレンスはソースコードと一緒にgithub上で公開されています。(プラグイン紹介自体はfruentd本体のドキュメントとして存在しており、そこにelasticsearchプラグインのgithubへのリンクもあります)
GitHub - uken/fluent-plugin-elasticsearch
データが取り込まれることを確認したら、マッピングが期待している通りになっているかを確認します。
おそらく、Logstashの時と同様に、「Variable_Value」が数値ではなく文字列扱いになってしまっています。まずはmapping定義を修正して、取り込み直してみましょう。
もしmappingだけでは修正できない場合、Logstashと同様に送り込む側、mapping両方修正が必要になるようです。mappingはLogstashのものを流用するとして、Fluentdの設定側を以下のように設定しますが、これにもプラグインが必要です。
/opt/td-agent/embedded/bin/fluent-gem install fluent-plugin-typecast vi /etc/td-agent/td-agent.conf 以下を追加 <match mysql.status> type typecast item_types Variavle_value:integer prefix typed </match> 以下を修正 <match mysql.status> -- 修正前 type elasticsearch ・・・ <match typed.mysql.status> --修正後 ・・・
事例を調べていて「fluent-plugin-typecast」を使うケースが多かったので、それに従ったのですが、fluentd公式サイトのプラグイン一覧では「fluent-plugin-filter_typecast」がCertifiedプラグインとして取り上げられており、その設定方法も違うので注意が必要です。
毎回紹介している、こちらの本、Fluentdについては、構成方法、運用ノウハウなど細かく紹介しています。
サーバ/インフラエンジニア養成読本 ログ収集〜可視化編 [現場主導のデータ分析環境を構築!] Software Design plus
- 出版社/メーカー: 技術評論社
- 発売日: 2014/08/14
- メディア: Kindle版
- この商品を含むブログを見る
また、今回のようにEFK構成することをメインテーマに置いた書籍が2017年9月に発売されるということで、こちらも期待です。
データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]
- 作者: 鈴木健太,吉田健太郎,大谷純,道井俊介
- 出版社/メーカー: 技術評論社
- 発売日: 2017/09/21
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
*1:SHOW GLOBAL STATUS または SELECT * FROM INFORMATION_SCHEMA.GLOBAL_STATUS