代码之家  ›  专栏  ›  技术社区  ›  qre0ct

将Cloudtrail gzip日志从S3发送到ElasticSearch

  •  0
  • qre0ct  · 技术社区  · 7 年前

    我想做的是将存储在S3上的cloudtrail日志发送到本地托管(我指的是非AWS)ELK设置中。我不会在任何地方使用Filebeat。我相信使用它不是强制性的。Logstash可以直接将数据传递给ES。

    一旦数据在ES中,我只想在Kibana中将其可视化。

    使用S3 logstash插件

    的内容 /etc/logstash/conf.d/aws\u ct\u s3.conf文件

    input {
    s3 {
    access_key_id => "access_key_id"
    bucket => "bucket_name_here"
    secret_access_key => "secret_access_key"
    prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
    sincedb_path => "/tmp/s3ctlogs.sincedb"
    region => "us-east-2"
    codec => "json"
    add_field => { source => gzfiles }
    }
    }
    
    output {
    stdout { codec => json }
    elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "attack-%{+YYYY.MM.dd}"
    }
    }
    

    每个gzip文件的数据格式如下:

    {
      "Records": [
        dictionary_D1,
        dictionary_D2,
        .
        .
        .
      ]
    }
    

    我想把上面词典列表中的每一本词典作为Kibana的一个单独事件。通过谷歌搜索,我明白我可以使用 split 过滤以实现我想要的。现在我的 aws_ct_s3.conf

    input {
    s3 {
    access_key_id => "access_key_id"
    bucket => "bucket_name_here"
    secret_access_key => "secret_access_key"
    prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
    sincedb_path => "/tmp/s3ctlogs.sincedb"
    region => "us-east-2"
    codec => "json"
    add_field => { source => gzfiles }
    }
    }
    
    filter {
    split {
       field => "Records"
     }
    }
    
    output {
    stdout { codec => json }
    elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "attack-%{+YYYY.MM.dd}"
    }
    }
    

    事实上,通过这个我得到了我所需要的关于Kibana的数据。

    现在的问题是

    我不知道我做错了什么。有人能帮忙吗?

    java -XshowSettings:vm => Max Heap Size: 8.9 GB

    elasticsearch jvm options => max and min heap size: 6GB

    logstash jvm options => max and min heap size: 2GB

    ES version - 6.6.0

    LS version - 6.6.0

    Kibana version - 6.6.0

    当前堆的使用情况如下所示: enter image description here

    0 回复  |  直到 7 年前