【Logstash】インデックス名を動的につけてElasticsearchにデータを投入する

今回は、LogstashからElasticsearchにデータを投入する際に、インデックス名を動的につけて投入する方法を紹介します。

例えば株価データを取り込む場合、データの項目として株価というのは共通ですが、会社ごとにインデックスを分けたい場合があります。

そういったときに、今回紹介する方法は非常に便利です^^

Elastic Stackについて

Elasticsearchと、可視化用のKibana環境の構築は、以下の記事を参考にしてみてください。

LogstashからElasticsearchにデータを送る基礎についてはこちら

今回の前提

今回は、csvファイルを、Logstashを通してElasticsearchに投入します。

それぞれ中身は同じ株価データですが、会社が異なります。なので、会社ごとにインデックスを分けるように登録していきます。

Logstashの設定

それでは、Logstashの設定をしていきます。

configフォルダ内の〇〇.confを設定します。今回は、stock.confというファイルを作り、設定を書いていきます。

まず、今回の設定一覧を紹介します。

stock.conf

設定内容のほとんどは、冒頭で紹介した前回の記事と同じです。

今回のメインは、filterプラグイン内の grok 処理です。

inputプラグイン

今回は、取り込むファイル名が異なります。

ファイル名の違いに対応できるように、inputプラグインのpathでワイルドカード(*)を使っています。

grok

“grok”とは「意味をとらえる」という意味の英単語です。Logstashにおけるgrokプラグインは、非構造データを構造データとして扱うことを可能にします。

難しい表現ですが、要は文字列内の要素を、自分で作ったルールのもとで取り出すことができるといった感じでしょうか。

今回は、ファイル名から、インデックスに使う会社名を抽出します。ファイル名は、inputプラグインの”path”から取り出します。

GREEDYDATAは、任意の文字列を意味します。

ここでは、pathの最後の “/” までの文字列を”path”という変数(便宜上変数と呼ぶ)に格納しています。

そして、会社名(company)はファイル名の”_stock.csv”までの部分を取り出すようにしています。たとえば”amazon_stock.csv”の場合、以下のように区切られます。

outputプラグイン

outputプラグインでは、データ送信先のelasticsearchに関する設定を行います。

indexの指定部分で、先ほどgrokで取り出したcompanyを使います。こうすることで、会社ごとにインデックスが作成されるようになります。

作成されたインデックスを確認する

設定ファイルを保存したら、Logstashを起動します。pipeline.ymlの編集も忘れないでくださいね。

エラーが出ていないことを確認したら、Kibanaで作成されたインデックスを確認してみましょう。

会社別にインデックスが作成されています^^

おまけ:インデックスに年月日を入れる

インデックスに年月日を入れる場合は、%{+YYYY.MM.dd}のように記述します。

この場合、インデックス名の最後に年月が追加されます。

このときの年月は、”@timestamp”の値によって決まります。つまり、データの中身を見て、それぞれの年月のデータごとに振り分けるといった処理が行われます。

まとめ

Logstashからデータを投入する際に、動的にインデックス名をつける方法を解説しました。

grokは便利ですが、とっつきにくさもあると思います。是非、チャレンジする気持ちで使ってみてください^^

ではでは👋