代码之家  ›  专栏  ›  技术社区  ›  Danish Bin Sofwan

Logstash将不同的字段输出到不同的弹性搜索索引

  •  0
  • Danish Bin Sofwan  · 技术社区  · 7 年前

    我有一个 Filebeat Apache Logstash . 伐木场 字段1、字段2&字段3 )至 elastic search 。流程简单;工作这是我的pipeline.conf

    input{
        beats{
            port => "5043"
        }
    }
    filter 
    {
    
        grok 
        {
            patterns_dir => ["/usr/share/logstash/patterns"]
            match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                        "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                    }
            remove_field => "@version"
            remove_field => "beat"
            remove_field => "input_type"
            remove_field => "source"
            remove_field => "type"
            remove_field => "tags"
            remove_field => "http_version"
            remove_field => "@timestamp"
            remove_field => "message"
        }
        mutate
        {
            add_field => { "field1" => "%{access_time}" }
            add_field => { "field2" => "%{host}" }
            add_field => { "field3" => "%{read_timestamp}" }
        }
    }
    output {
        elasticsearch{
            hosts => ["localhost:9200"]
            index => "indexA"
        }
    }
    

    现在我要做的是添加其他三个字段 并将其添加到名为 索引B .所以最后 字段1字段2 虽然 索引B 持有 字段5

    input{
        beats{
            port => "5043"
        }
    }
    filter 
    {
    
        grok 
        {
            patterns_dir => ["/usr/share/logstash/patterns"]
            match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                        "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                    }
            remove_field => "@version"
            remove_field => "beat"
            remove_field => "input_type"
            remove_field => "type"
            remove_field => "http_version"
            remove_field => "@timestamp"
            remove_field => "message"
        }
        mutate
        {
            add_field => { "field1" => "%{access_time}" }
            add_field => { "field2" => "%{host}" }
            add_field => { "field3" => "%{read_timestamp}" }
        }   
    }
    output {
        elasticsearch{
            hosts => ["localhost:9200"]
            index => "indexA"
        }
    }
    filter
    {
        mutate
        {
            add_field => { "field4" => "%{source}" }
            add_field => { "field5" => "%{tags}" }
            remove_field => "field1"
            remove_field => "field2"
            remove_field => "field3"
        }
    }
    output {
        elasticsearch{
            hosts => ["localhost:9200"]
            index => "indexB"
        }
    }   
    

    有人能指出我哪里做错了,或者有什么替代方案吗。

    1 回复  |  直到 7 年前
        1
  •  2
  •   Val    7 年前

    您需要使用 clone filter

    input{
        beats{
            port => "5043"
        }
    }
    filter 
    {
    
        grok 
        {
            patterns_dir => ["/usr/share/logstash/patterns"]
            match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                        "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                    }
            remove_field => "@version"
            remove_field => "beat"
            remove_field => "input_type"
            remove_field => "type"
            remove_field => "http_version"
            remove_field => "@timestamp"
            remove_field => "message"
        }
        clone {
            clones => ["log1", "log2"]
        }
        if [type] == "log1" {
            mutate
            {
                add_field => { "field1" => "%{access_time}" }
                add_field => { "field2" => "%{host}" }
                add_field => { "field3" => "%{read_timestamp}" }
            }
        } else {   
            mutate
            {
                add_field => { "field4" => "%{source}" }
                add_field => { "field5" => "%{tags}" }
            }
        }
    }
    output {
        if [type] == "log1" {
            elasticsearch{
                hosts => ["localhost:9200"]
                index => "indexA"
            }
        } else {   
            elasticsearch{
                hosts => ["localhost:9200"]
                index => "indexB"
            }
        }
    }