目次

12.ディレクトリに置いたファイルを自動登録する

以下の流れで実現しています。

BeatsもLogstashも自動登録はバイナリファイル向きじゃないですね。
バイナリファイル向きじゃない理由

成功事例

■処理の流れ
ファイル配置→Logstash→ShellScript→ElasticSearch

■処理イメージ図

Logstash.conf

・input:読み込んだバイナリファイルが改行して複数messageとして扱われないようにmultilineで1つの処理につながるようにしています。
・filter:outputでmessageが出るのを防ぐため、messageを削除しています。
・output:読み込んだファイルパスをShellScriptに渡しています。

※inputセクションの[close_older ⇒ 5]はとても大事で、読み込んだファイルがcloseしていないと後工程に進みません!!・・・デフォ値の1時間たてば進むか。。。

input {
#  stdin{}
  file{
    path => "/tmp/filebeat/*"
    close_older => 5
    #mode => "read"                       #file_completed_actionとセットで、読み込んだファイルを削除できますが、
    #file_completed_action => "delete"    #消えるタイミングがoutput処理前なのでコメントアウト
    start_position => "beginning"
    #sincedb_clean_after => "1 seconds"
    sincedb_path => "/dev/sincedb"
    codec => multiline {
             pattern => "^\s,^\S"         #どんな行が来ても結合する正規表現らしい
             max_lines => 100000000000    #指定した行まで1つのmessageとする
             negate => true
             what => "previous"
#             charset => "BINARY"
             }
  }
}
filter {
    mutate {
      remove_field => [ "message" ]
    }
    urldecode { field => "path" }
}
output {
    exec {
      command => "bash /tmp/logstash.sh %{path}"
    }
    stdout{}
}

/tmp/logstash.sh

Aws/ElasticSearch/11.日本語ファイルを登録するの処理をそのまま流用

#! /bin/bash
file_path=$1

file=$(base64 $file_path | perl -pe 's/\n//g')

echo -e "{ \"index\" : { \"_index\" : \"{インデックス名}\", \"_type\" : \"_doc\", \"_id\" : \"{ID}\", \"pipeline\": \"{pipeline名}\" }\n{ \"@timestamp\" : \"`date +'%Y-%m-%dT%H:%M:%S.%NZ'`\", \"data\" : \"$file\" }" > input.json

curl -X POST -H 'Content-Type: application/json' '{Elasticsearchエンドポイント}/_bulk?pretty' --data-binary @input.json

Template、Pipeline

TemplateとPipelineは下記を参考に設定願います。
Aws/ElasticSearch/3.Template
Aws/ElasticSearch/4.Pipeline

失敗事例

その1

Logstashが受け取ったmessageをfilterセクションでbase64に変換して、outputでESに登録する。

■ボツ理由
・色々コーデックを試したが、正しいコーデックどれかわからなかった。
・バイナリファイルをmultilineで繋げるとデータが壊れるらしい?どこかのサイトに書いてあった。

input {
#  stdin{}
  file{
    path => "/tmp/filebeat/*"
    close_older => 5
    #mode => "read"
    #file_completed_action => "delete"
    start_position => "beginning"
    #sincedb_clean_after => "1 seconds"
    sincedb_path => "/dev/sincedb"
    codec => multiline {
             pattern => "^\s,^\S"
             max_lines => 100000000000
             negate => true
             what => "previous"
             charset => "BINARY"
             }
  }
}
filter {
    ruby {
        code => 'event.set("message", Base64.encode64(event.get("message")))'
    }
    mutate {
       gsub => ["message", "\r\n|\r|\n", ""]
    }
}
output {
  elasticsearch {
    hosts => ["{Elasticsearchエンドポイント}:443"]
    index => "{インデックス名}"
    document_type => "_doc"
    pipeline => "{pipeline名}"
    ilm_enabled => false
  }
    stdout { }
}

その2

成功事例の、outputセクションのexecでLogstashを呼び出す。
■ボツ理由
・テスト用端末のせいか、JAVAのエラーがでた。※もちろんpipelines.ymlは設定してあるぞ。
・失敗事例その1でも書いたが、そもそもバイナリファイルをmultilineで繋げるとデータが壊れるらしい?

input {
#  stdin{}
  file{
    path => "/tmp/filebeat/*"
    close_older => 5
    #mode => "read"
    #file_completed_action => "delete"
    start_position => "beginning"
    #sincedb_clean_after => "1 seconds"
    sincedb_path => "/dev/sincedb"
    codec => multiline {
             pattern => "^\s,^\S"
             max_lines => 100000000000
             negate => true
             what => "previous"
#             charset => "BINARY"
             }
  }
}
filter {
    mutate {
      remove_field => [ "message" ]
    }
    urldecode { field => "path" }
}
output {
    exec {
      command => "bash base64 %{path} | perl -pe 's/\n//g' | grep ^ | /usr/share/logstash/bin/logstash -f logstash.conf"
    }
    stdout{}
}