組件介紹
Elasticsearch
是一個(gè)基于Lucene的搜索服務(wù)器,提供搜集、分析直焙、存儲(chǔ)數(shù)據(jù)三大功能,它提供了一個(gè)分布式多用戶能力的全文搜索引擎砂轻,基于restful web接口奔誓,Elasticsearch是用java開(kāi)發(fā)的,能夠達(dá)到實(shí)時(shí)搜索、穩(wěn)定厨喂、可靠和措、快速,安裝使用方便蜕煌。
Logstash
主要是用來(lái)日志的搜集派阱、分析、過(guò)濾日志的工具斜纪,用于管理日志和事件的工具贫母,你可以用它去搜集日志、轉(zhuǎn)換日志盒刚、解析日志并將它們作為數(shù)據(jù)提供給其他模塊調(diào)用腺劣。
Kibana
是一個(gè)優(yōu)秀的前端日志展示框架,他可以非常詳細(xì)的將日志轉(zhuǎn)換為各種圖表因块,為用戶提供強(qiáng)大的數(shù)據(jù)可視話支持橘原。它能夠搜索、展示存儲(chǔ)在Elasticsearch中索引數(shù)據(jù)涡上,使用它可以很方便的用圖標(biāo)趾断、表格、地圖展示和分析數(shù)據(jù)吓懈。
Kafka
數(shù)據(jù)緩沖隊(duì)列歼冰,作為消息隊(duì)列解耦了處理過(guò)程,同時(shí)提高了可擴(kuò)展性耻警,具有峰值處理能力隔嫡,使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰甘穿。
Filebeat
隸屬于Beats腮恩,是一個(gè)輕量級(jí)數(shù)據(jù)收集引擎,基于原先的Logstash-fowarder的源碼改造温兼,也會(huì)是ELK Stack的第一選擇秸滴。
目前Beats包含四種工具:
- Packetbeat(搜索網(wǎng)絡(luò)流量數(shù)據(jù))
- Metricbeat(搜集系統(tǒng)、進(jìn)程和文件系統(tǒng)級(jí)別的CPU和內(nèi)存使用情況等數(shù)據(jù)募判,通過(guò)從操作系統(tǒng)和服務(wù)器收集指標(biāo)荡含,幫助監(jiān)控福區(qū)群以及其托管的服務(wù))
- Filebeat(收集文件數(shù)據(jù)-常用文件日志收集分析)
- Winlogbeat(搜集windows事件日志數(shù)據(jù))
版本說(shuō)明
elk相關(guān)的組件版本最好保持一致
- jdk1.8+
- elasticsearch: 6.6.1
- logstash: 6.6.1
- kibana: 6.6.1
- Filebeat: 6.6.1
- kafka: 2.11-1
組件架構(gòu)圖
依賴組件安裝
jdk安裝
jdk安裝包自行下載解壓
環(huán)境變量配置
linux中jdk環(huán)境變量:
echo '
JAVA_HOME=/usr/local/java
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH
' >>/etc/profile
source /etc/profile
elk部署前環(huán)境的檢查
服務(wù)器設(shè)置成固定ip
通過(guò)命令查看
/etc/sysconfig/network-scripts/ifcfg-ens33
主機(jī)名修改
方便集群配置時(shí)服務(wù)器的區(qū)分
hostname : 顯示默認(rèn)主機(jī)名
- 使用vim命令修改”/etc/hostname“主機(jī)名稱文件
- 將原始主機(jī)名稱刪除后追加”新主機(jī)名
- 重啟主機(jī)
防火墻是否關(guān)閉
systemctl status firewalld
確保每個(gè)節(jié)點(diǎn)防火墻高狀態(tài)是關(guān)閉的,這樣每個(gè)節(jié)點(diǎn)才能被相互訪問(wèn)
selinux狀態(tài)
通過(guò)命令:getenforce
如果狀態(tài)不是disabled時(shí)通過(guò)下面命令進(jìn)行設(shè)置
vim /etc/selinux/config
selinux=disabled
iptables規(guī)則檢查
不管你在安裝linux時(shí)是否啟動(dòng)了防火墻,如果你想配置屬于自己的防火墻,那就清除現(xiàn)在filter的所有規(guī)則,為了保證后面elk部署節(jié)點(diǎn)通信正常
通過(guò)命令進(jìn)行查看
iptables -nL
如果不是上圖所示届垫,則需要進(jìn)行iptables規(guī)則的清除
iptables -F 清除預(yù)設(shè)表filter中的所有規(guī)則鏈的規(guī)則
iptables -X 清除預(yù)設(shè)表filter中使用者自定鏈中的規(guī)則
服務(wù)器上文件的傳遞
為了方便各個(gè)服務(wù)器上進(jìn)行文件的拷貝可以進(jìn)行如下設(shè)置释液。
公鑰的生成
ssh-keygen 服務(wù)器公鑰生成,直接回車(chē)即可
ip的解析
多個(gè)服務(wù)器上傳遞該公鑰的時(shí)候可以做ip和主機(jī)名映射装处,方便傳輸
#中添加ip和主機(jī)名的映射如下, 下面操作需要在每個(gè)節(jié)點(diǎn)進(jìn)行配置误债,復(fù)制即可
vim /etc/hosts
172.16.80.159 lph_node1
172.16.80.160 lph_node2
各節(jié)點(diǎn)映射完成后可以進(jìn)行數(shù)據(jù)和文件的拷貝
公鑰的拷貝-各節(jié)點(diǎn)
ssh-copy-id -i #{ip映射的主機(jī)名}
占位符可以用真實(shí)的主機(jī)名替換
軟件包的拷貝
公鑰準(zhǔn)備好之后進(jìn)行組件軟件包的拷貝
// `pwd`獲取軟件包素在路徑,將該軟件拷貝到指定節(jié)點(diǎn)下的對(duì)應(yīng)路徑下,#{占位符}
scp #{kafka_2.11-2.2.2.tgz} lph_client:`pwd`
elasticsearch安裝部署
在安裝elasticsearch前可以先添加一個(gè)普通用戶用來(lái)后續(xù)elasticsearch的啟動(dòng)操作寝蹈,因?yàn)閑lasticsearch不能用root用戶啟動(dòng)李命。
// 集群環(huán)境下每個(gè)節(jié)點(diǎn)都需要配置
useradd lphelk
// 設(shè)置密碼,密碼可以隨意設(shè)值
echo "1" | passwd --stdin lphelk
安裝包的解壓
// 集群環(huán)境下每個(gè)節(jié)點(diǎn)都需要解壓
tar -zxvf #{安裝包位置} -C #{鎮(zhèn)定解壓到哪個(gè)位置}
配置文件的修改
命令:vim /elk/elasticsearch-6.6.1/config/elasticsearch.yml
# 注釋掉的在是在集群環(huán)境下配置
#cluster.name: clustername
node.name: lph_node
node.master: true
node.data: true
# 需要?jiǎng)?chuàng)建相應(yīng)的數(shù)據(jù)和日志路徑
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
# 方便每個(gè)節(jié)點(diǎn)正常通行
network.host: 0.0.0.0
http.port: 9200
#discovery.zen.ping.unicast.hosts: ["172.16.80.160"]
#discovery.zen.minimum_master_nodes: 1
#discovery.zen.ping_timeout: 150s
#discovery.zen.fd.ping_retries: 10
#client.transprt.ping_timeout: 60s
http.cors.enabled: true
http.cors.allow-origin: "*"
配置項(xiàng)聲明
配置 | 描述 |
---|---|
cluster.name | 集群名稱箫老,各個(gè)幾點(diǎn)配置相同的集群名稱封字。 |
node.name | 節(jié)點(diǎn)名稱,各個(gè)節(jié)點(diǎn)配置不同槽惫。 |
node.master | 指示某個(gè)節(jié)點(diǎn)是否符合成為主節(jié)點(diǎn)的條件周叮。 |
node.data | 指示節(jié)點(diǎn)是否為數(shù)據(jù)節(jié)點(diǎn),數(shù)據(jù)節(jié)點(diǎn)包含并管理索引的一部分界斜。 |
path.data | 數(shù)據(jù)存儲(chǔ)目錄仿耽。 |
path.logs | 日志存儲(chǔ)目錄。 |
bootstrap.memory_lock | 內(nèi)存鎖定各薇,是否禁用交換项贺。 |
bootstrap.system_call_filter | 系統(tǒng)調(diào)用過(guò)濾器。 |
network.host | 綁定節(jié)點(diǎn)ip峭判。 |
http.port | rest api端口开缎。 |
discovery.zen.ping.unicast.hosts | 提供其他elasticsearch服務(wù)節(jié)點(diǎn)單點(diǎn)廣播發(fā)現(xiàn)功能。 |
discovery.zen.minimum_master_nodes | 集群中可工作的具有master節(jié)點(diǎn)資格的最下數(shù)量林螃,官方的推薦值是(N/2)+1奕删,其中N是具有master資格的節(jié)點(diǎn)的數(shù)量。 |
discovery.zen.ping_timeout | 節(jié)點(diǎn)在發(fā)現(xiàn)過(guò)程中的等待時(shí)間疗认。 |
discovery.zen.fd.ping_retries | 節(jié)點(diǎn)發(fā)現(xiàn)重試次數(shù)完残。 |
http.cors.enabled | 是否允許跨源rest請(qǐng)求,用允許head插件訪問(wèn)elasticsearch横漏。 |
http.cors.allow-origin | 允許的源地址谨设。 |
設(shè)置jvm堆大小
可以根據(jù)服務(wù)器本身配置進(jìn)行優(yōu)化
設(shè)置jvm堆大小(根據(jù)服務(wù)器內(nèi)存大小進(jìn)行設(shè)置,用2g替換1g)
sed -i 's/-Xms1g/-Xms2g/' /elk/elasticsearch-6.6.1/config/jvm.options
sed -i 's/-Xmx1g/-Xmx2g/' /elk/elasticsearch-6.6.1/config/jvm.options
注意:
確保堆內(nèi)存最大和最小一樣缎浇,防止程序運(yùn)行時(shí)改變堆內(nèi)存大小
創(chuàng)建ES數(shù)據(jù)及日志存儲(chǔ)目錄
mkdir -pv /data/elasticsearch/data
mkdir -pv /data/elasticsearch/logs
修改安裝目錄及存儲(chǔ)目錄權(quán)限
查看用戶:cat /etc/passwd|grep #{上面創(chuàng)建啟動(dòng)es的普通用戶}
chown -R lphelk:lphelk /data/elasticsearch
chown -R lphelk:lphelk /elk/elasticsearch-6.6.1
elasticsearch啟動(dòng)報(bào)錯(cuò)
上面如果修改了/etc/sysctl.conf文件后需要輸入下面命令進(jìn)行重新加載
sysctl -p
啟動(dòng)elasticsearch
// 切換到es普通用戶扎拣,集群環(huán)境下多個(gè)節(jié)點(diǎn)需要同時(shí)啟動(dòng)(間隔時(shí)間切記太長(zhǎng))
su - lphelk
cd /elk/elasticsearch-6.6.1
nohup bin/elasticsearch &
kibana安裝部署
安裝包的解壓
tar -zxvf #{安裝包位置} -C #{鎮(zhèn)定解壓到哪個(gè)位置}
配置文件修改
vim /elk/kibana-6.6.1-linux-x86_64/config/kibana.yml
echo '
server.port: 5601
server.host: "172.16.80.159"
elasticsearch.url: "http://172.16.80.159:9200"
kibana.index: ".kibana"
' >> /elk/kibana-6.6.1-linux-x86_64/config/kibana.yml
配置項(xiàng)聲明
配置 | 描述 |
---|---|
server.port | kibana服務(wù)端口,默認(rèn)為5601素跺。 |
server.host | kibana主機(jī)ip地址二蓝,默認(rèn)localhost。 |
elasticsearch.url | 用來(lái)做查詢的es節(jié)點(diǎn)的url指厌,默認(rèn)http://localhost:9200刊愚。 |
kibana.index | kibana在elasticsearch中使用的索引來(lái)存儲(chǔ)保存的searches,visualizations和dashboards,默認(rèn)是 .kibana仑乌。 |
kibana啟動(dòng)
cd /elk/kibana-6.6.1-linux-x86_64/
nohup ./bin/kibana &
kibana啟動(dòng)后是可以直接訪問(wèn)的百拓,不需要進(jìn)行認(rèn)證,測(cè)試環(huán)境可行
需要認(rèn)證的話可以通過(guò)nginx和nginx中http-tools
http-tools用于生成nginx認(rèn)證訪問(wèn)的用戶密碼文件晰甚,需要可自行百度
kafka安裝部署
本文檔是kafka單節(jié)點(diǎn)部署衙传,集群自行百度。
安裝配置jdk8
由于kafka厕九、zookeeper(簡(jiǎn)稱:zk)運(yùn)行依賴于jdk,jdk已安裝可跳過(guò)蓖捶。
安裝配置zk
kafka運(yùn)行依賴zk,kafka官網(wǎng)提供的tar包中包含了zk,這里不需要額外下載zk包。
安裝包的解壓
tar -zxvf #{安裝包位置} -C #{鎮(zhèn)定解壓到哪個(gè)位置}
配置文件修改
#vim /elk/kafka_2.11-2.2.2/config/zookeeper.properties
# 通過(guò)echo方式注入的話需要將對(duì)應(yīng)配置文件中的相同配置項(xiàng)注釋掉
echo '
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
#kafka集群ip:port
server.1=172.16.80.159:2888:3888
#server.1=172.16.80.160:2888:3888
#server.1=172.16.80.161:2888:3888
'>>/elk/kafka_2.11-2.2.2/config/zookeeper.properties
配置項(xiàng)聲明
配置 | 描述 |
---|---|
dataDir | zk數(shù)據(jù)存放目錄扁远。 |
dataLogDir | zk日志存放目錄俊鱼。 |
clientPort | 客戶端連接zk服務(wù)器的端口。 |
tickTime | zk服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時(shí)間間隔畅买。 |
initLimit | 允許follower連接并同步到Leader的初始化連接時(shí)間并闲,以tickTime為單位,當(dāng)舒適化連接時(shí)間超過(guò)該值谷羞,則表示連接失敗帝火。 |
syncLimit | Leader與Follower之間發(fā)生消息時(shí),請(qǐng)求和應(yīng)答時(shí)間長(zhǎng)度湃缎,如果 follower在設(shè)置數(shù)據(jù)啊in內(nèi)不能與leader通信犀填,那么此follower將會(huì)被丟棄。 |
server.1 | 2888是follower與leader交換信息的端口嗓违,38888是當(dāng)leader掛了時(shí)用來(lái)執(zhí)行選舉時(shí)服務(wù)器相互通信的端口九巡。 |
創(chuàng)建數(shù)據(jù)和日志目錄
mkdir -pv /data/zookeeper/data
mkdir -pv /data/zookeeper/logs
創(chuàng)建myid文件
# 每臺(tái)kafka服務(wù)器都需要做成唯一的id,myid中對(duì)應(yīng)的值對(duì)應(yīng)上面配置文件中server.1中的值
echo 1 > /data/zookeeper/data/myid
配置kafka
修改配置文件
修改前可以先將原配置文件中的有效配置注釋掉蹂季,可以通過(guò)下面命令:
sed -i 's/^[^#]/#&/' /elk/kafka_2.11-2.2.2/config/server.properties
echo '
broker.id=1
listeners=PLAINTEXT://172.16.80.159:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
# 單節(jié)點(diǎn)這個(gè)必須為1冕广,否則可能會(huì)導(dǎo)致kafka消費(fèi)失敗
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
#zookeeper.connect=172.16.80.159:2181,172.16.80.160:2181
zookeeper.connect=172.16.80.159:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
' >>/elk/kafka_2.11-2.2.2/config/server.properties
配置項(xiàng)聲明
配置 | 描述 |
---|---|
broker.id | 每個(gè)server需要單獨(dú)配置broker id,如果不配置系統(tǒng)會(huì)自動(dòng)配置乏盐,需要和上一步id一致佳窑。 |
listeners | 監(jiān)聽(tīng)地址,格式PLAINTEXT://IP:PORT父能。 |
num.network.threads | 接收和發(fā)送網(wǎng)絡(luò)信息的線程數(shù)神凑。 |
num.io.threads | 服務(wù)器用于處理請(qǐng)求的線程數(shù),其中肯能包括磁盤(pán)i/o何吝。 |
socket.send.buffer.bytes | 套接字服務(wù)器使用的發(fā)送緩沖區(qū)溉委。 |
socket.receive.buffer.bytes | 套接字服務(wù)器使用接收緩沖區(qū)。 |
socket.request.max.bytes | 套接字服務(wù)器請(qǐng)求的最大大邪拧(防止oom)瓣喊。 |
log.dirs | 日志文件目錄。 |
num.partitions | partition數(shù)量黔酥。 |
num.recovery.threads.per.data.dir | 在啟動(dòng)時(shí)恢復(fù)日志藻三、關(guān)閉時(shí)刷盤(pán)日志每個(gè)數(shù)據(jù)目錄的線程的數(shù)量洪橘。默認(rèn)。 |
offsets.topic.replication.factor | 偏移量話題的復(fù)制因子棵帽。 |
transaction.state.log.replication.factor | - |
transaction.state.log.min.isr | - |
log.retention.hours | 日志文件刪除之前保留的時(shí)間單位小時(shí)熄求,默認(rèn)168。 |
log.segment.bytes | 單個(gè)日志文件的大小逗概,默認(rèn)1073741824 弟晚。 |
log.retention.check.interval.ms | 檢查日志段以查看是否可以根據(jù)保留策略刪除他們的時(shí)間間隔。 |
zookeeper.connect | zk的主機(jī)地址逾苫,如果zookeeper是集群的話則以逗號(hào)隔開(kāi)卿城。 |
zookeeper.connection.timeout.ms | 接收到zk的超時(shí)時(shí)間。 |
group.initial.rebalance.delay.ms | - |
創(chuàng)建log目錄
mkdir -pv /data/kafka/logs
其他節(jié)點(diǎn)配置
針對(duì)集群環(huán)境下铅搓,只需要把配置好的安裝包分發(fā)到其他節(jié)點(diǎn)瑟押,然后修改zk的myid,kafkade broker.id和listeners就可以了,單節(jié)點(diǎn)不用配置這一步星掰。
啟動(dòng)勉耀,驗(yàn)證zk
(1) 啟動(dòng)
cd /elk/kafka_2.11-2.2.2/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
集群的話需要每個(gè)節(jié)點(diǎn)依次執(zhí)行啟動(dòng)操作
(2)驗(yàn)證
yum -y install nc
echo conf | nc 127.0.0.1 2181
(3)查看zk的狀態(tài)
echo stat | nc 127.0.0.1 2181
啟動(dòng),驗(yàn)證kafka
(1) 啟動(dòng)
cd /elk/kafka_2.11-2.2.2
nohup bin/kafka-server-start.sh config/server.properties &
集群環(huán)境下依次啟動(dòng)即可
(2) 驗(yàn)證
在kafka服務(wù)器上創(chuàng)建topic
cd /elk/kafka_2.11-2.2.2
bin/kafka-topics.sh --create --zookeeper 172.16.80.159:2181 --replication-factor 1 --partitions 1 --topic testtopic
查看topic:
bin/kafka-topics.sh --zookeeper 172.16.80.159:2181 --list
監(jiān)控kafka manager
elk中主要收集日志蹋偏,不需要kafka manager便斥,需要的話可以自行官網(wǎng)下載
logstash安裝部署
Logstash運(yùn)行同樣依賴jdk,需要在服務(wù)器上安裝jdk版本。
(1) 安裝
tar -zxvf #{安裝包位置} -C #{指定解壓到哪個(gè)位置}
(2) 配置
創(chuàng)建目錄,我將所有的input威始、filter枢纠、output配置文件全部放到該目錄中。
mkdir -pv /elk/logstash-6.6.1/etc/conf.d
vim /elk/logstash-6.6.1/etc/conf.d/input.conf
input {
# 從kafka獲取日志數(shù)據(jù)到logstash
kafka {
bootstrap_servers => "172.16.4.35:9092"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["emp_log_topic"]
codec => "json"
}
}
vim /elk/logstash-6.6.1/etc/conf.d/output.conf
output {
# 輸出到elasticsearch中
elasticsearch {
index => "emp-log-%{+YYYY-MM-dd}"
hosts => ["172.16.4.35:9200"]
}
# 輸出到控制臺(tái)黎棠,在測(cè)試的時(shí)候可以配置
stdout {
codec => rubydebug
}
}
vim /elk/logstash-6.6.1/etc/conf.d/filter.conf
filter {
# 過(guò)濾日志中不包含指定信息:【lph123578】的日志輸出到es中晋渺,看需求配置
if ([message] =~ "^(?!.*?lph123578).*$") {
drop {}
}
# 通過(guò)grok 正則的方式進(jìn)行日志內(nèi)容的提取并賦值到指定的字段上,如下面的messageDate脓斩、appName等
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]+)\] \[(?<thread>[^\[\]]+)\] \[(?<tradeId>[^\[\]]+)\] \[(?<keyword1>[^\[\]]*)\] %{LOGLEVEL:level}" }
}
}
具體grok配置可以參考下面網(wǎng)頁(yè)
http://www.reibang.com/p/443f1ea7b640
http://www.reibang.com/p/5df5055070b2?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation
(3) 啟動(dòng)
cd /elk/logstash-6.6.1
nohup bin/logstash -f etc/conf.d/ --config.reload.automatic &
(4) 報(bào)錯(cuò)信息
啟動(dòng)logstash有如下錯(cuò)誤時(shí)可以按照下面的方式嘗試解決木西,沒(méi)有報(bào)錯(cuò)既可以跳過(guò)。
解決方案
Filebeat安裝部署
為什么用Filebeat,而不用原來(lái)的Logstash呢?原因很簡(jiǎn)單随静,每個(gè)服務(wù)上按照一個(gè)logstash對(duì)資源消耗比較大八千。
解壓
tar -zxvf #{安裝包位置} -C #{指定解壓到哪個(gè)位置}
cd /elk/filebeat-6.6.1-linux-x86_64
修改配置
修改filebeat配置,支持收集本地目錄日志,并輸出日志到kafka中。
cd /elk/filebeat
// 之前的filebeat.yml可以備份下
mv filebeat.yml filebeat.yml.bak
vim filebeat.yml
filebeat.prospectors:
- input_type: log
paths:
- /usr/local/easymainance/easymaintenance-log-manager-1.0-SNAPSHOT/D:/log/emp-log-manager/config-service.log
# 解析的是text文本內(nèi)容燎猛,不規(guī)則的
multiline:
# 指的是每行以時(shí)間格式開(kāi)始的字符串
pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'
negate: true
match: after
max_lines: 2000
timeout: 2s
# 解析的是json格式的內(nèi)容
#json.keys_under_root: true
# json.add_error_key: true
output.kafka:
enabled: true
hosts: ["172.16.4.35:9092"]
topic: 'emp_log_topic'
Filebeat6.0之后一些配置參數(shù)變動(dòng)比較大恋捆,比如document_type就不支持,需要用fields來(lái)代替等等重绷。
配置項(xiàng)聲明
參數(shù) | 描述 |
---|---|
json.keys_under_root | 可以讓字段位于根節(jié)點(diǎn)沸停,默認(rèn)為false |
json.overwrite_keys | 對(duì)于同名的key,覆蓋原有key值 |
json.message_key | - |
json.add_error_key | 將錯(cuò)誤消息記錄存儲(chǔ)在error.message字段中 |
檢查配置是否正確
cd /elk/filebeat
./filebeat -c filebeat.yml -configtest
啟動(dòng)
cd /elk/filebeat
nohup ./filebeat -e -c filebeat.yml &
日志收集
日志格式
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appname:-}] [%thread] [%X{traceId:-}] [%X{keyword1:-}] %-5level %logger{50}:%L - %msg%n
格式說(shuō)明
- %date{yyyy-MM-dd HH:mm:ss.SSS}:日志的時(shí)間2020-11-16 17:14:58.661
- %X{appname:-} : 應(yīng)用服務(wù)名稱
- %thread: 輸出產(chǎn)生該日志事件的線程名
- %X{traceId:-} : 自定義流程追蹤Id
- %X{keyword1:-}: 關(guān)鍵字1
- %-5level: 輸出日志級(jí)別昭卓,-5表示左對(duì)齊并且固定輸出5個(gè)字符如果不足在右邊補(bǔ)空格愤钾,類型有DEBUG,INFO,WARN,ERROR
- %logger{50}: 為輸出日志的class命名 限定長(zhǎng)度50
- %msg%n:輸出的日志內(nèi)容
日志格式輸出內(nèi)容展示
2020-11-16 17:14:58.661 [emp-system-manager] [main] [tradeId] [keyword1] INFO com.hzsun.emp.config.ConfigApplication:61 - Started ConfigApplication in 8.757 seconds (JVM running for 9.172)
日志埋點(diǎn)
單服務(wù)\單節(jié)點(diǎn)日志埋點(diǎn)
package com.hzsun.emp.aop;
import com.hzsun.emp.constant.FileLogContants;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.UUID;
// 處理的單服務(wù)日志埋點(diǎn)
@Aspect
@Component
public class FileLogAspect {
public static final String exepress = "execution(public * com.hzsun.emp.*.controllers.*.*(..))";
@Value("${spring.application.name}")
private String applicationName;
@Pointcut(exepress)
public void setUpBuriedPoint() {
}
@Around("setUpBuriedPoint()")
public Object markFileLog(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
System.err.println("服務(wù)名稱:"+ applicationName);
// 日志埋點(diǎn)
insertMDC();
Object result = proceedingJoinPoint.proceed();
MDC.clear();
return result;
}
private void insertMDC() {
String traceId = UUID.randomUUID().toString().replace("-", "");
MDC.put(FileLogContants.TRADE_ID, traceId + FileLogContants.FILTER_FIELD);
MDC.put(FileLogContants.SERVICE_NAME, applicationName);
MDC.put(FileLogContants.APP_NAME, FileLogContants.EASY_MAINTENANCE);
}
}
分布式服務(wù)\多服務(wù)調(diào)用日志埋點(diǎn)
LogTrackInterceptorConfig.class
LogTrackInterceptor.class
EmpFeignConfig.class
LogService.class
package com.hzsun.emp.jwt;
import interceptor.LogTrackInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* 添加攔截器
* @author lph
*/
@Configuration
public class LogTrackInterceptorConfig implements WebMvcConfigurer {
@Bean
public LogTrackInterceptor getLogTrackInterceptor() {
return new LogTrackInterceptor();
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
/**
* 根據(jù)具體情況設(shè)置
* LoginInterceptor是自定義的攔截器
* addPathPatterns參數(shù)/**是通配符表示攔截所有的請(qǐng)求
* excludePathPatterns方法的參數(shù)是可變參數(shù),可以輸n個(gè)字符串類型的參數(shù),用來(lái)添加白名單
*/
String[] excludePath = {
"/doc.html",
"/swagger-resources/**",
"/webjars/**",
"/v2/**",
"/swagger-ui.html/**"
};
registry.addInterceptor(getLogTrackInterceptor()).addPathPatterns("/**").excludePathPatterns(excludePath);
}
}
package interceptor;
import com.hzsun.emp.constant.FileLogContants;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Optional;
import java.util.UUID;
/**
* 日志追蹤攔截器瘟滨,feign服務(wù)間調(diào)用確保traceId一致
*
* @version V1.0
* @author: lph
*/
@Component
public class LogTrackInterceptor implements HandlerInterceptor {
private static final ThreadLocal<String> TRACE_ID_THREAD_LOCAL = new ThreadLocal<>();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 請(qǐng)求頭中不存在則生成
String traceId = Optional.ofNullable(request.getHeader(FileLogContants.TRACE_ID)).orElse(UUID.randomUUID().toString().replaceAll("-", ""));
TRACE_ID_THREAD_LOCAL.set(traceId);
MDC.put(FileLogContants.TRACE_ID, getTraceId());
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 完成后移除,防止內(nèi)存占用過(guò)大
TRACE_ID_THREAD_LOCAL.remove();
MDC.clear();
}
public static String getTraceId() {
return TRACE_ID_THREAD_LOCAL.get();
}
public static void setTraceId(String traceId) {
TRACE_ID_THREAD_LOCAL.set(traceId);
}
}
package com.hzsun.emp.configuration;
import com.hzsun.emp.constant.FileLogContants;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import interceptor.LogTrackInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.util.Optional;
import java.util.UUID;
/**
* Feign調(diào)用時(shí)添加追蹤id到請(qǐng)求頭
*
* @author :lph
* @version: 1.0
*/
@Configuration
public class EmpFeignConfig implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
// 非空校驗(yàn)的目的是防止在啟動(dòng)類中通過(guò)feign調(diào)用第三方接口時(shí)拋出空指針異常
if (attributes != null) {
// 將traceId傳遞到下個(gè)服務(wù)
String traceId = Optional.ofNullable(LogTrackInterceptor.getTraceId()).orElse(UUID.randomUUID().toString().replaceAll("-",""));
requestTemplate.header(FileLogContants.TRACE_ID, traceId);
}
}
}
package com.hzsun.emp.serviceapi;
import com.hzsun.emp.configuration.EmpFeignConfig;
import com.hzsun.emp.pojo.po.ScOperationLogsPO;
import com.hzsun.emp.response.HZSunResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
/**
* feign調(diào)用時(shí)在注解中配置
* value: 指的是被調(diào)用方項(xiàng)目在nacos唯一標(biāo)識(shí) configuration:指的是feign請(qǐng)求的攔截器
* @FeignClient(value = "emp-log-manager", configuration = EmpFeignConfig.class)
* @author :lph
* @version: 1.0
*/
@FeignClient(value = "emp-log-manager", configuration = EmpFeignConfig.class)
public interface LogService {
/**
* 獲取數(shù)據(jù)并插入消息記錄表
*
* @param logsPo 封裝請(qǐng)求參數(shù)對(duì)象
*/
@PostMapping("/logmanager/log/insert")
HZSunResponse insertLog(ScOperationLogsPO logsPo);
}
grok提取日志內(nèi)容
在vim /elk/logstash-6.6.1/etc/conf.d/filter.conf配置
方式一:
處理的日志格式:
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appName:-${app_name}}] [%X{serviceName:-${processName}}] [%thread|%X{traceId:-}] [%X{key1:-}] [%X{key2:-}] %-5level %logger{50}:%L - %msg%n
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]+)\] \[(?<thread>[^\[\]]+)\] \[(?<tradeId>[^\[\]]+)\] \[(?<keyword1>[^\[\]]*)\] %{LOGLEVEL:level}" }
}
方式二:
處理的日志格式:
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appName:-${app_name}}|%X{serviceName:-${processName}}|%thread|%X{traceId:-}|%X{key1:-}|%X{key2:-}] %-5level %logger{50}:%L - %msg%n
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]*)\|(?<serviceName>[^\[\]]*)\|(?<thread>[^\[\]]*)\|(?<traceId>[^\[\]]*)\|(?<key1>[^\[\]]*)\|(?<key2>[^\[\]]*)\] %{LOGLEVEL:level}"}
}
# 可以選擇性的使用remove_field去掉原始內(nèi)容message這一行的信息能颁。
mutate{
remove_field => ["message","timestamp"]
}
springboot整合elasticsearch
導(dǎo)入依賴
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client-->
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>6.6.1</version>
</dependency>
常量類
public class FileLogContants {
public static final String APP_NAME = "appName";
public static final String SERVICE_NAME = "serviceName";
public static final String TRADE_ID = "tradeId";
public static final String KEY_WORD_1 = "keyword1";
public static final String EASY_MAINTENANCE = "易維";
public static final String FILTER_FIELD = "lph123578";
public static final String MESSAGEDATE = "messageDate";
// 通配符
public static final String STARS_SYMBOL = "*";
// 排序
public static final String TIMESTAMP = "@timestamp";
// total用于es記錄總數(shù)的查詢
public static final Integer TOTAL = 0;
// 關(guān)鍵字標(biāo)識(shí)室奏,es字符串凡是標(biāo)識(shí)為關(guān)鍵字則不會(huì)進(jìn)行分詞處理
public static final String ES_KEYWORD_SYMBOL = ".keyword";
}
elasticsearch配置類
package com.hzsun.emp.log.config;
import com.hzsun.emp.utils.StringUtil;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Configuration
public class ElasticSearchConfig {
public static final Logger logger = LoggerFactory.getLogger(ElasticSearchConfig.class);
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private Integer port;
@Value("${elasticsearch.cluster-name}")
private String clusterName;
private TransportClient transportClient;
@Bean
public TransportClient transportClient(){
Settings settings = Settings.EMPTY;
if(!StringUtil.isEmpty(clusterName)){
settings = Settings.builder()
.put("cluster.name", clusterName)
.build();
}
try {
transportClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
// 集群環(huán)境可以嘗試用addTransportAddresses()方法
} catch (UnknownHostException e) {
logger.error("創(chuàng)建elasticsearch客戶端失敗");
e.printStackTrace();
}
logger.info("創(chuàng)建elasticsearch客戶端成功");
return transportClient;
}
@Bean
public BulkProcessor bulkProcessor() throws UnknownHostException {
Settings settings = Settings.EMPTY;
if(!StringUtil.isEmpty(clusterName)){
settings = Settings.builder()
.put("cluster.name", clusterName)
.build();
}
TransportClient transportClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
return BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
}
}).setBulkActions(1000)//分批,每10000條請(qǐng)求當(dāng)成一批請(qǐng)求劲装。默認(rèn)值為1000
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//每次5MB,刷新一次bulk昌简。默認(rèn)為5m
.setFlushInterval(TimeValue.timeValueSeconds(5))//每5秒一定執(zhí)行占业,不管已經(jīng)隊(duì)列積累了多少。默認(rèn)不設(shè)置這個(gè)值
.setConcurrentRequests(1)//設(shè)置并發(fā)請(qǐng)求數(shù)纯赎,如果是0谦疾,那表示只有一個(gè)請(qǐng)求就可以被執(zhí)行,如果為1犬金,則可以積累并被執(zhí)行念恍。默認(rèn)為1.
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//這里有個(gè)backoff策略,最初等待100ms,然后按照指數(shù)增長(zhǎng)并重試3次晚顷。每當(dāng)一個(gè)或者多個(gè)bulk請(qǐng)求失敗,并出現(xiàn)EsRejectedExecutionException異常時(shí).就會(huì)嘗試重試峰伙。這個(gè)異常表示用于處理請(qǐng)求的可用計(jì)算資源太少。如果要禁用這個(gè)backoff策略该默,需要用backoff.nobackoff()瞳氓。
.build();
}
@PostConstruct
void init() {
System.setProperty("es.set.netty.runtime.available.processors", "false");
}
}
elasticsearch檢索
package com.hzsun.emp.log.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hzsun.emp.constant.FileLogContants;
import com.hzsun.emp.log.bean.FileLogDTO;
import com.hzsun.emp.log.bean.FileLogQO;
import com.hzsun.emp.log.service.IFileLogService;
import com.hzsun.emp.utils.DateUtil;
import com.hzsun.emp.utils.ObjectMapperUtil;
import com.hzsun.emp.utils.StringUtil;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Service
public class FileLogServiceImpl implements IFileLogService {
public static final Logger logger = LoggerFactory.getLogger(FileLogServiceImpl.class);
@Autowired
private TransportClient transportClient;
@Value("${elasticsearch.index}")
private String elasticIndex;
@Value("${elasticsearch.type}")
private String elasticType;
@Override
public JSONObject selectLogFromElastic(FileLogQO logQo) {
logger.info("進(jìn)行日志埋點(diǎn)");
// 進(jìn)行日志埋點(diǎn)操作
MDC.put(FileLogContants.KEY_WORD_1, "關(guān)鍵字");
// 根據(jù)傳入的參數(shù)進(jìn)行查詢條件的拼接
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
if (!StringUtil.isEmpty(logQo.getAppName())) {
// termQuery中參數(shù)加上FileLogContants.ES_KEYWORD_SYMBOL是防止es分詞器進(jìn)行分詞影響查詢
queryBuilder.must(QueryBuilders.termQuery(FileLogContants.APP_NAME + FileLogContants.ES_KEYWORD_SYMBOL, logQo.getAppName()));
}
if (!StringUtil.isEmpty(logQo.getServiceName())) {
queryBuilder.must(QueryBuilders.termQuery(FileLogContants.SERVICE_NAME + FileLogContants.ES_KEYWORD_SYMBOL, logQo.getServiceName()));
}
if (!StringUtil.isEmpty(logQo.getKeyword1())) {
queryBuilder.must(QueryBuilders.wildcardQuery(FileLogContants.KEY_WORD_1, FileLogContants.STARS_SYMBOL + logQo.getKeyword1() + FileLogContants.STARS_SYMBOL));
}
if (!StringUtil.isEmpty(logQo.getTradeId())) {
queryBuilder.must(QueryBuilders.wildcardQuery(FileLogContants.TRADE_ID, FileLogContants.STARS_SYMBOL + logQo.getTradeId() + FileLogContants.STARS_SYMBOL));
}
if (!StringUtil.isEmpty(logQo.getBeginTime()) && !StringUtil.isEmpty(logQo.getEndTime())) {
// rangeQuery中參數(shù)加上FileLogContants.ES_KEYWORD_SYMBOL是防止es分詞器進(jìn)行分詞影響時(shí)間范圍查詢
queryBuilder.must(QueryBuilders.rangeQuery(FileLogContants.MESSAGEDATE + FileLogContants.ES_KEYWORD_SYMBOL).from(logQo.getBeginTime()).to(logQo.getEndTime()));
}
int offset = logQo.getOffset();
SearchRequestBuilder builder = transportClient.prepareSearch(elasticIndex).setTypes(elasticType)
.setQuery(queryBuilder)
.setFrom(offset)
.setSize(logQo.getPageSize())
// 根據(jù)指定字段進(jìn)行排序
.addSort(FileLogContants.TIMESTAMP, SortOrder.DESC)
//explain為true表示根據(jù)數(shù)據(jù)相關(guān)度排序,和關(guān)鍵字匹配最高的排在前面
.setExplain(true);
List<FileLogDTO> result = new ArrayList<>();
SearchResponse searchResponse = builder.get();
for (SearchHit hit : searchResponse.getHits()) {
String messageJson = ObjectMapperUtil.writeValueAsString(hit.getSourceAsMap());
FileLogDTO fileLogDTO = ObjectMapperUtil.readValue(messageJson, FileLogDTO.class);
assert fileLogDTO != null;
fileLogDTO.setId(hit.getId());
result.add(fileLogDTO);
}
JSONObject object = new JSONObject();
object.put("file_log_list", result);
return object;
}
@Override
public long getTotalSize() {
// 查詢總條數(shù)
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
SearchHits hits = transportClient.prepareSearch(elasticIndex).setTypes(elasticType)
.setQuery(boolQueryBuilder)
// 設(shè)置為0的時(shí)候即查詢所有數(shù)據(jù)總數(shù)
.setSize(FileLogContants.TOTAL)
.get().getHits();
return hits.getTotalHits();
}
}
備注
- 日志系統(tǒng)密碼設(shè)置
elk訪問(wèn)密碼設(shè)置-參考