今回は、LogstashからElasticsearchにデータを投入する際に、インデックス名を動的につけて投入する方法を紹介します。
例えば株価データを取り込む場合、データの項目として株価というのは共通ですが、会社ごとにインデックスを分けたい場合があります。
そういったときに、今回紹介する方法は非常に便利です^^
Elastic Stackについて
Elasticsearchと、可視化用のKibana環境の構築は、以下の記事を参考にしてみてください。
LogstashからElasticsearchにデータを送る基礎についてはこちら
今回の前提
今回は、csvファイルを、Logstashを通してElasticsearchに投入します。
それぞれ中身は同じ株価データですが、会社が異なります。なので、会社ごとにインデックスを分けるように登録していきます。
Logstashの設定
それでは、Logstashの設定をしていきます。
configフォルダ内の〇〇.confを設定します。今回は、stock.confというファイルを作り、設定を書いていきます。
まず、今回の設定一覧を紹介します。
設定内容のほとんどは、冒頭で紹介した前回の記事と同じです。
今回のメインは、filterプラグイン内の grok 処理です。
inputプラグイン
今回は、取り込むファイル名が異なります。
ファイル名の違いに対応できるように、inputプラグインのpathでワイルドカード(*)を使っています。
grok
“grok”とは「意味をとらえる」という意味の英単語です。Logstashにおけるgrokプラグインは、非構造データを構造データとして扱うことを可能にします。
難しい表現ですが、要は文字列内の要素を、自分で作ったルールのもとで取り出すことができるといった感じでしょうか。
今回は、ファイル名から、インデックスに使う会社名を抽出します。ファイル名は、inputプラグインの”path”から取り出します。
GREEDYDATAは、任意の文字列を意味します。
ここでは、pathの最後の “/” までの文字列を”path”という変数(便宜上変数と呼ぶ)に格納しています。
そして、会社名(company)はファイル名の”_stock.csv”までの部分を取り出すようにしています。たとえば”amazon_stock.csv”の場合、以下のように区切られます。
outputプラグイン
outputプラグインでは、データ送信先のelasticsearchに関する設定を行います。
indexの指定部分で、先ほどgrokで取り出したcompanyを使います。こうすることで、会社ごとにインデックスが作成されるようになります。
作成されたインデックスを確認する
設定ファイルを保存したら、Logstashを起動します。pipeline.ymlの編集も忘れないでくださいね。
エラーが出ていないことを確認したら、Kibanaで作成されたインデックスを確認してみましょう。
会社別にインデックスが作成されています^^
おまけ:インデックスに年月日を入れる
インデックスに年月日を入れる場合は、%{+YYYY.MM.dd}のように記述します。
この場合、インデックス名の最後に年月が追加されます。
このときの年月は、”@timestamp”の値によって決まります。つまり、データの中身を見て、それぞれの年月のデータごとに振り分けるといった処理が行われます。
まとめ
Logstashからデータを投入する際に、動的にインデックス名をつける方法を解説しました。
grokは便利ですが、とっつきにくさもあると思います。是非、チャレンジする気持ちで使ってみてください^^
ではでは👋