配置&解释
日志进入Kafka之后,多个端进行消费,本身W3A SOC 的Agent端就会进行读取分析,同时也需要将这个日志放到ES上(主要用于日志查询,在平台里有个Web日志查询的功能,直接联动ES的,实现简单的查询服务,后续会做一些特殊的查询功能放到平台里,具体有需要可以提需求定制也行),这个时候就可以配置一个Filebeat端进行日志的推送,从Kafka消费到ES里。在docker-compose里我有写好的配置文件可作参考,具体解释下:
- hosts: [“elasticsearch:9200”] 中 「elasticsearch」是实际的es的地址,这里用的是docker-compose的服务名,如果你有自己的ES,可以直接写自己的ES的IP地址或域名,直接替换就行,但是记得一定要能通啊,不然发不过去。
- hosts: [“kafka:9092”] 中「kafka」是实际的Kafka数据源地址,如果有自己的Kafka并且已经配置好配置Nginx采集器的话,直接就用那个kafka地址就行,我这里写的是docker-compose的服务名称。
filebeat.inputs:
- type: kafka
enabled: true
hosts: ["kafka:9092"]
topics: ["weblogs"]
group_id: "weblogs"
json.keys_under_root: true
json.add_error_key: true
json.overwrite_keys: true
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "weblog-%{+yyyy.MM.dd}"
setup.template.name: "filebeattest"
setup.template.pattern: "filebeattest-*"
processors:
# kafka的消息会在message字段,通过该processor将json解析出来
- decode_json_fields:
fields: ["message"]
process_array: true
max_depth: 1
target: ""
overwrite_keys: true
add_error_key: true
# 下边这两个处理器根据自身需求设置
# 将json中start_time字段的时间放到@timestamp中
- timestamp:
# 格式化时间值 给 时间戳
field: start_time
# 使用我国东八区时间 格式化log时间
timezone: Asia/Shanghai
layouts:
# - '2006-01-02 15:04:05'
- '2006-01-02 15:04:05.999'
test:
- '2019-06-22 16:33:51.111'