なからなLife

geekに憧れと敬意を抱きながら、SE、ITコンサル、商品企画、事業企画、管理会計、総務・情シス、再び受託でDB屋さんと流浪する人のブログです。

JDBCでInputする - Kibanaを立ててみた

ファイルではなくDBの中身を取得

これまで、Logstashやfluentdが、「ファイル」を読み込んでelasticsearchに流し込む事例を扱ってきましたが、どちらも、「データベース」から直接読み取って、elasticsearchに流し込むことができます。


今回は、MySQLの中身を直接読み取ってElasticsearch/Kibanaに流し込むケースにトライします。


過去ブログで取り上げた「SHOW FULL PROCESSLIST(=SELECT * FROM INFORMATION_SCHEMA.PROCESSLIST)」に取得のタイムスタンプをつけたものを、ファイルを経由せずにElasticsearchに取り込むようにします。


LogstashでJDBC経由でDBの中身をElasticsearchに流し込む手順

Logstashの場合「プラグイン」と呼んでいますが、デフォルトインストールパッケージに含まれるので、設定ファイルで指定してあげるだけで使えます。

ただし、JDBCドライバは別途必要になります。
ですので、MySQLJDBCドライバ「Conector/J」を入手しておきます。


執筆時点の最新版「5.1.43」は、以下のように取得、展開可能します。

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.43.tar.gz
tar xzf mysql-connector-java-5.1.43.tar.gz
export CLASSPATH=/JDBCドライバのjarファイル展開先パス/mysql-connector-java-ver-bin.jar:$CLASSPATH

続いて、Logstashの設定をします。
こんな感じで、/etc/logstash/conf.d/の下の.confファイルを作成します。


テンプレート定義、指定は省略していますが、とりあえず数値として欲しい情報はElasticsearch側でNumberで認識されていました。

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.43-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://MySQLサーバーのホスト:3306/INFORMATION_SCHEMA"
    jdbc_user => "MySQLにログインするユーザ名"
    jdbc_password => "パスワード"
    statement => "SELECT NOW() as logtime, ID as Session_id, USER as Session_User, HOST as Session_Host, DB as Session_Db, COMMAND as Session_Command, TIME as Session_Timer, STATE as Session_State, INFO as Session_Info FROM INFORMATION_SCHEMA.PROCESSLIST;"
    tracking_column => "logtime"
    schedule => "*/1 * * * *"
    type => "jdbc-mysql-processlist"
    last_run_metadata_path => "/var/log/logstash/.logstash_jdbc_last_run"
  }
}
output {
    if [type] == "jdbc-mysql-processlist" {
      elasticsearch {
        hosts => [ "localhost:9200" ]
        index => "lgs-jdbc-mysql-processlist-%{+YYYY.MM.dd}"
      }
    }
}
Input部

前半は、よくあるJDBCベースのプログラミングで必要な接続情報です。


実際に発行するSQLを「statement」に記述するわけですが、ファイルに出力するときはsedで加工して付与していたTimestamp情報をSQLの中で取得させるため、SHOWコマンドではなくInformation_schema.processlistテーブルから取得するSELECT文を使用し、冒頭でNOW関数を使っています。列の名前もElasticsearchにそのまま反映されるので、意図的にasで名前を変えています。

「tracking_column」では、前回取得との差を認識させるためのカラムを指定します。一般的なテーブルだt「最終更新日時」的な列を指定しますが、porcesslistテーブルには該当する列がなく、取得時に強制的にnow()で付与しているため、その列を指定しています。

ファイルから取得するときと違う特徴的なものが「schedule」です。
Linuxのcronと同様の形式で、SQLステートメントの実行間隔を指定できます。今回は「"*/1 * * * *"」つまり1分間隔です。

「last_run_metadata_path」は、最終実行時刻を保持するファイルの場所の指定です。
権限さえあればどこでもよいですが、デフォルトはホームディレクトリです。

Filter部

ファイルからの取込のときにはFilter部でgrokを書いていたのですが、今回はgrokなし、Filter部なしでイケてしまいました。細かな制御をするときにはJDBCでもFilter部を書くことになるかと思いますが、最低限の動作であれば、まったく必要ありません。

Output部

Fileのときと特に差はありません。
Fileのときは、MappingテンプレートをElasticsearchに定義して、そのテンプレートを使うように指定をいれましたが、今回はその部分を省略しても意図したデータ型で認識されていました。


ここまで設定したら、Logstashを起動してあげると、SQLステートメントの実行結果=Processlistの内容が、ElasticsearchのIndexに登録され、Kibanaからも確認できるようになります。


細かいパラメータなど

ここで取り上げなかった細かいパラメータ仕様などは、公式ドキュメントを作成してください。公式以外にも、各所ブログでチラホラと日本語解説記事が上がっています。
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html


fluentd編は?

ごめんなさい。記事化できるレベルでの検証作業が間に合いませんでした。


調べた範囲でいうと、Logstashとの違いは、

    • fluentdはRubyベースなので、JDBCドライバではなくRubyMySQLアクセス用モジュール経由で。
    • fluentdプラグイン開発が活発な一方で、どのプラグインを選んだらいいかわからない。
    • ファイルのtailのようなリアルタイムじゃなく、定期実行でよいなら、(Fluentdじゃなくて)Embulk+embulk-input-mysqlプラグインがラクそうw

といったところでしょうか。