代码之家  ›  专栏  ›  技术社区  ›  Thomas Decaux

logstash:如果输出失败,则将事件发送到其他地方

  •  -1
  • Thomas Decaux  · 技术社区  · 7 年前

    给出以下logstash管道:

    input
    {
        generator
        {
            lines => [
            '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
            '{"name" : "user_interaction", "product" : { "module" : "search" , "name" : "front"}, "data" : { "query" : "toto"}}',
            '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
            '{"hello": "world"}',
            '{"name" :"wrong data", "data" : "I am wrong !"}',
            '{"name" :"wrong data", "data" : { "hello" : "world" }}'
            ]
            codec => json
            count => 1
        }
    }
    
    filter
    {
      mutate
      {
        remove_field => ["sequence", "host", "@version"]
      }
    }
    
    output
    {
       elasticsearch
       {
         hosts => ["elasticsearch:9200"]
         index => "events-dev6-test"
         document_type => "_doc"
         manage_template => false
       }
    
       stdout
       {
           codec => rubydebug
       }
    }
    

    ElasticSearch对此索引有严格的映射,因此,某些事件会给出400个错误 "mapping set to strict, dynamic introduction of [hello] within [data] is not allowed" (这是正常的)。

    如何将失败事件发送到其他地方(文本日志或其他ElasticSearch索引)(这样我就不会丢失事件)?

    1 回复  |  直到 7 年前
        1
  •  1
  •   Alcanzar    7 年前

    介绍logstash 6.2 Dead Letter Queues 这可以用来做你想做的事。您需要启用 dead_letter_queue.enable: true 在你的 logstash.yml .

    然后把它作为输入处理:

    input {
      dead_letter_queue {
        path => "/path/to/data/dead_letter_queue" 
        commit_offsets => true 
        pipeline_id => "main" 
      }
    }
    
    output {
      file {
        path => ...
           codec => line { format => "%{message}"}
       }    
    }
    

    在6.2之前,我不相信有什么方法可以做到你想要的。