JDBCでInputする - Kibanaを立ててみた
ファイルではなくDBの中身を取得
これまで、Logstashやfluentdが、「ファイル」を読み込んでelasticsearchに流し込む事例を扱ってきましたが、どちらも、「データベース」から直接読み取って、elasticsearchに流し込むことができます。
今回は、MySQLの中身を直接読み取ってElasticsearch/Kibanaに流し込むケースにトライします。
過去ブログで取り上げた「SHOW FULL PROCESSLIST(=SELECT * FROM INFORMATION_SCHEMA.PROCESSLIST)」に取得のタイムスタンプをつけたものを、ファイルを経由せずにElasticsearchに取り込むようにします。
LogstashでJDBC経由でDBの中身をElasticsearchに流し込む手順
Logstashの場合「プラグイン」と呼んでいますが、デフォルトインストールパッケージに含まれるので、設定ファイルで指定してあげるだけで使えます。
ただし、JDBCドライバは別途必要になります。
ですので、MySQLのJDBCドライバ「Conector/J」を入手しておきます。
執筆時点の最新版「5.1.43」は、以下のように取得、展開可能します。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.43.tar.gz tar xzf mysql-connector-java-5.1.43.tar.gz export CLASSPATH=/JDBCドライバのjarファイル展開先パス/mysql-connector-java-ver-bin.jar:$CLASSPATH
続いて、Logstashの設定をします。
こんな感じで、/etc/logstash/conf.d/の下の.confファイルを作成します。
テンプレート定義、指定は省略していますが、とりあえず数値として欲しい情報はElasticsearch側でNumberで認識されていました。
input { jdbc { jdbc_driver_library => "mysql-connector-java-5.1.43-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://MySQLサーバーのホスト:3306/INFORMATION_SCHEMA" jdbc_user => "MySQLにログインするユーザ名" jdbc_password => "パスワード" statement => "SELECT NOW() as logtime, ID as Session_id, USER as Session_User, HOST as Session_Host, DB as Session_Db, COMMAND as Session_Command, TIME as Session_Timer, STATE as Session_State, INFO as Session_Info FROM INFORMATION_SCHEMA.PROCESSLIST;" tracking_column => "logtime" schedule => "*/1 * * * *" type => "jdbc-mysql-processlist" last_run_metadata_path => "/var/log/logstash/.logstash_jdbc_last_run" } } output { if [type] == "jdbc-mysql-processlist" { elasticsearch { hosts => [ "localhost:9200" ] index => "lgs-jdbc-mysql-processlist-%{+YYYY.MM.dd}" } } }
Input部
前半は、よくあるJDBCベースのプログラミングで必要な接続情報です。
実際に発行するSQLを「statement」に記述するわけですが、ファイルに出力するときはsedで加工して付与していたTimestamp情報をSQLの中で取得させるため、SHOWコマンドではなくInformation_schema.processlistテーブルから取得するSELECT文を使用し、冒頭でNOW関数を使っています。列の名前もElasticsearchにそのまま反映されるので、意図的にasで名前を変えています。
「tracking_column」では、前回取得との差を認識させるためのカラムを指定します。一般的なテーブルだt「最終更新日時」的な列を指定しますが、porcesslistテーブルには該当する列がなく、取得時に強制的にnow()で付与しているため、その列を指定しています。
ファイルから取得するときと違う特徴的なものが「schedule」です。
Linuxのcronと同様の形式で、SQLステートメントの実行間隔を指定できます。今回は「"*/1 * * * *"」つまり1分間隔です。
「last_run_metadata_path」は、最終実行時刻を保持するファイルの場所の指定です。
権限さえあればどこでもよいですが、デフォルトはホームディレクトリです。
Filter部
ファイルからの取込のときにはFilter部でgrokを書いていたのですが、今回はgrokなし、Filter部なしでイケてしまいました。細かな制御をするときにはJDBCでもFilter部を書くことになるかと思いますが、最低限の動作であれば、まったく必要ありません。
細かいパラメータなど
ここで取り上げなかった細かいパラメータ仕様などは、公式ドキュメントを作成してください。公式以外にも、各所ブログでチラホラと日本語解説記事が上がっています。
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
fluentd編は?
ごめんなさい。記事化できるレベルでの検証作業が間に合いませんでした。
調べた範囲でいうと、Logstashとの違いは、
といったところでしょうか。
サーバ/インフラエンジニア養成読本 ログ収集~可視化編 [現場主導のデータ分析環境を構築!] (Software Design plus)
- 作者: 鈴木健太,吉田健太郎,大谷純,道井俊介
- 出版社/メーカー: 技術評論社
- 発売日: 2014/08/08
- メディア: 大型本
- この商品を含むブログ (1件) を見る