版本
filebeat6.6.0 + kafka2.11 +elk7.3.1
elk 是docker 部署的游沿,其他是本地服務(wù)
下載地址:https://mirrors.huaweicloud.com/filebeat/6.6.0/
再次申明,博客真的不靠譜吓揪。最好看官方文檔
filebeat.yml
詳見https://www.elastic.co/guide/en/beats/filebeat/6.6/filebeat-input-log.html
https://www.elastic.co/guide/en/beats/filebeat/6.6/kafka-output.html
#============== Filebeat prospectors ===========
filebeat.inputs: # 6.3以前是 filebeat.prospectors:
- type: log # input類型,默認(rèn)為log彤灶,6.0以前配置是 - input_type: log
paths:
- /usr/local/logs/app-collector.log
multiline.pattern: '^\[' #指定匹配的表達(dá)式
multiline.negate: true #是否匹配到
multiline.match: after #如果沒有匹配到,就合并到上一行的末尾
multiline.max_lines: 2000 #最大行數(shù)
multiline.timeout: 2s #如果在規(guī)定時(shí)間沒有新的日志事件就不等待后面的日志了梁钾,開始把數(shù)據(jù)推送出去
fields:
logbiz: collector
logtopic: app-log-collector #按服務(wù)劃分用作kafka topic
evn: dev
- type: log
paths:
#app-服務(wù)名.log
- /usr/local/logs/error-collector.log
#定義寫入ES時(shí)的_type 值
document_type: "error-log"
multiline.pattern: '^\[' #指定匹配的表達(dá)式
multiline.negate: true #是否匹配到
multiline.match: after #如果沒有匹配到栈源,就合并到上一行的末尾
multiline.max_lines: 2000 #最大行數(shù)
mulitilne.timeout: 2s #如果在規(guī)定時(shí)間沒有新的日志事件就不等待后面的日志了,開始把數(shù)據(jù)推送出去
fields:
logbiz: collector
logtopic: error-log-collector #按服務(wù)劃分用作kafka topic
evn: dev
output.kafka:
hosts: ["192.168.159.128:9092"]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
#acks=0:生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來自服務(wù)器的響應(yīng)
#acks=1:只要集群的首領(lǐng)節(jié)點(diǎn)收到消息荡含,生產(chǎn)者就會(huì)收到一個(gè)來自服務(wù)器成功響應(yīng)
#acks=-1:表示分區(qū)leader必須等待消息被成功寫入到所有的ISR副本中才認(rèn)為producer請(qǐng)求成功咒唆。
required_acks: 1
logging.to_files: true
docker-compose.yml
version: '3'
services:
elasticsearch: #服務(wù)名稱(不是容器名)
image: elasticsearch:7.3.1
ports:
- "9200:9200" #暴露的端口信息和docker run -d -p 80:80 一樣
- "9300:9300"
restart: "always" #重啟策略,能夠使服務(wù)保持始終運(yùn)行释液,生產(chǎn)環(huán)境推薦使用
container_name: elasticsearch #容器名稱
hostname: elasticsearch
environment:
- "discovery.type=single-node" #配置es啟動(dòng)單節(jié)點(diǎn)
- "cluster.name=EsForLog" #配置es集群名稱
- "ES_JAVA_OPTS=-Xms512m -Xmx512m" #配置es啟動(dòng)參數(shù)
kibana:
image: kibana:7.3.1
restart: "always" #重啟策略全释,能夠使服務(wù)保持始終運(yùn)行,生產(chǎn)環(huán)境推薦使用
container_name: kibana #容器名稱
hostname: kibana
#掛載文件
volumes:
- /mydata/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml
links:
- elasticsearch:es01 #容器關(guān)聯(lián)es01是別名
depends_on:
- elasticsearch #依賴es均澳,將會(huì)在es創(chuàng)建成功后才執(zhí)行
ports:
- "5601:5601" #暴露的端口信息和docker run -d -p 80:80 一樣
logstash:
image: logstash:7.3.1
restart: "always" #重啟策略恨溜,能夠使服務(wù)保持始終運(yùn)行,生產(chǎn)環(huán)境推薦使用
container_name: logstash #容器名稱
hostname: logstash
#掛載文件logstash啟動(dòng)配置文件
volumes:
- /mydata/logstash/logstash-springboot.conf:/usr/share/logstash/pipeline/logstash.conf
links:
- elasticsearch:es01 #容器關(guān)聯(lián)es01是別名
depends_on:
- elasticsearch #依賴es找前,將會(huì)在es創(chuàng)建成功后才執(zhí)行
ports:
- "5044:5044" #暴露的端口信息和docker run -d -p 80:80 一樣
logstash.yml
input {
kafka {
## app-log-服務(wù)名稱
topics_pattern => "app-log-.*"
bootstrap_servers => "192.168.159.128:9092"
codec => json
consumer_threads => 1 ## 增加consumer的并行消費(fèi)線程數(shù)
decorate_events => true
group_id => "app-log-group"
}
kafka {
topics_pattern => "error-log-.*"
bootstrap_servers => "192.168.159.128:9092"
codec => json
consumer_threads => 1
decorate_events => true
group_id => "error-log-group"
}
}
filter {
#時(shí)區(qū)轉(zhuǎn)換
ruby{
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
# [fields][logtopic]這串東西 對(duì)應(yīng)的是filebeat的配置文件filebeat.yml里面的fields下的logtopic屬性糟袁,具體的回頭看filebeat的內(nèi)容
if "app-log" in [fields][logtopic]{
grok{
#這個(gè)是匹配日志的格式的,日志的格式可以匹配成功這條數(shù)據(jù)就不過濾躺盛,否則就過濾掉
match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{NOTSPACE:hostName}\] \[%{NOTSPACE:ip}\] \[%{NOTSPACE:applicationName}\] \[%{NOTSPACE:location}\] \[%{NOTSPACE:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in [fields][logtopic]{
grok{
match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{NOTSPACE:hostName}\] \[%{NOTSPACE:ip}\] \[%{NOTSPACE:applicationName}\] \[%{NOTSPACE:location}\] \[%{NOTSPACE:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
}
#輸出到控制臺(tái)
output {
if "app-log" in [fields][logtopic]{
#es插件
elasticsearch{
hosts => ["192.168.159.128:9200"]
#索引名 +號(hào)開頭的项戴,就會(huì)姿容任務(wù)后面是時(shí)間格式
#javalog-app-service-2019.01.23
index => "app-log-%{[fields][logbiz]}-%{index_time}"
#是否嗅探集群ip:一般設(shè)置true
#通過嗅探機(jī)制進(jìn)行es集群負(fù)載均衡發(fā)日志消息
sniffing => true
#logstash默認(rèn)值自帶一個(gè)mapping模板,進(jìn)行模板覆蓋
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
elasticsearch{
hosts => ["192.168.159.128:9200"]
index => "app-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
stdout {
codec => rubydebug
}
}
kibana.yml
elasticsearch.hosts: http://192.168.159.128:9200 #es01是docker-compose中l(wèi)inks的別名
server.host: "0.0.0.0"
server.name: kibana
xpack.monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: zh-CN #中文