我想做的是将存储在S3上的cloudtrail日志发送到本地托管(我指的是非AWS)ELK设置中。我不会在任何地方使用Filebeat。我相信使用它不是强制性的。Logstash可以直接将数据传递给ES。
-
一旦数据在ES中,我只想在Kibana中将其可视化。
使用S3 logstash插件
的内容
/etc/logstash/conf.d/aws\u ct\u s3.conf文件
input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}
output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}
每个gzip文件的数据格式如下:
{
"Records": [
dictionary_D1,
dictionary_D2,
.
.
.
]
}
我想把上面词典列表中的每一本词典作为Kibana的一个单独事件。通过谷歌搜索,我明白我可以使用
split
过滤以实现我想要的。现在我的
aws_ct_s3.conf
input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}
filter {
split {
field => "Records"
}
}
output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}
事实上,通过这个我得到了我所需要的关于Kibana的数据。
现在的问题是
我不知道我做错了什么。有人能帮忙吗?
java -XshowSettings:vm => Max Heap Size: 8.9 GB
elasticsearch jvm options => max and min heap size: 6GB
logstash jvm options => max and min heap size: 2GB
ES version - 6.6.0
LS version - 6.6.0
Kibana version - 6.6.0
当前堆的使用情况如下所示: