前面我們搭建了一個簡單的ELK日志收集系統(tǒng),可以看到其中的Logstash起的作用。Logstash 是一個實時數(shù)據收集引擎,可收集各類型數(shù)據并對其進行分析坪哄,過濾和歸納。按照自己條件分析過濾出符合數(shù)據導入到可視化界面呢撞。Logstash的功能很強大损姜,遠不止一個input和一個output那幾行配置起的作用。下面介紹Logstash在經典場景中的用法殊霞。
簡單模式:以logstash作為日志搜索器
架構:logstash采集摧阅、處理、轉發(fā)到elasticsearch存儲绷蹲,在kibana進行展示
特點:這種結構因為需要在各個服務器上部署 Logstash棒卷,而它比較消耗 CPU 和內存資源,所以比較適合計算資源豐富的服務器祝钢,否則容易造成服務器性能下降比规,甚至可能導致無法正常工作(因為Logstash有點重)。
input和output的配置是比較靈活的拦英,首先看最簡單的一種蜒什,從控制臺輸入日志,然后輸出到控制臺:
input { stdin { } }
output{ stdout { } }
前面的內容介紹了從文件讀取疤估,輸出到es當中灾常,輸出也可以到文件中:
output?{
? ? #輸出到文件
? ? file {
? ? ? ? path => "/logs/app/logstash/all.log" #指定寫入文件路徑
? ? ? ? flush_interval => 0????????????????? # 指定刷新間隔,0代表實時寫入
? ? ? ? codec => json
? ? ?}
}
簡單模式基本上能滿足百分之八九十的公司的日志業(yè)務了铃拇,下面介紹的幾種模式適用于日志量十分龐大的系統(tǒng)钞瀑。
安全模式:beats(Filebeat、Metricbeat慷荔、Packetbeat雕什、Winlogbeat等)作為日志搜集器
因為Logstash有點重,所以就有了beats。Beats 是一個面向輕量型采集器的平臺贷岸,這些采集器可從邊緣機器發(fā)送數(shù)據壹士。我們從文件收集日志常用的是filebeat。下面是幾種beats:
Packetbeat(搜集網絡流量數(shù)據)凰盔;
Topbeat(搜集系統(tǒng)墓卦、進程和文件系統(tǒng)級別的 CPU 和內存使用情況等數(shù)據)倦春;
Filebeat(搜集文件數(shù)據)-------最常用
Winlogbeat(搜集 Windows 事件日志數(shù)據)户敬。
從架構圖可以看到,我們的應用服務器收集日志的不再是Logstash睁本,而且對應的beats尿庐,Beats 將搜集到的數(shù)據發(fā)送到 Logstash,經 Logstash 解析呢堰、過濾后抄瑟,將其發(fā)送到 Elasticsearch 存儲,并由 Kibana 呈現(xiàn)給用戶枉疼。
這種架構解決了 Logstash 在各服務器節(jié)點上占用系統(tǒng)資源高的問題皮假。相比 Logstash撮抓,Beats 所占系統(tǒng)的 CPU 和內存幾乎可以忽略不計犬辰。另外,Beats 和 Logstash 之間支持 SSL/TLS 加密傳輸毛萌,客戶端和服務器雙向認證航闺,保證了通信安全褪测。因此這種架構適合對數(shù)據安全性要求較高,同時各服務器性能比較敏感的場景潦刃。?
來看一個filebeat的配置:
#=========== Filebeat prospectors ===========
filebeat.prospectors:?
- input_type: log
?paths:
??? - /home/admin/helloworld/logs/*.log
#--------------------------- Logstash output --------------------------------
output.logstash:
?hosts: ["192.168.80.34:5044"]
Logstash的input配置:
input?{
? ? ?beats {
? ? ? ? port => 5044
? ? ? ? codec => "json"
? ? }
}
Logstash的output配置:
output?{
?#?輸出到控制臺
??? # stdout { }
?#?輸出到redis
??? redis {
??????? host => "192.168.80.32"?? # redis主機地址
??????? port => 6379????????????? # redis端口號
?? ?????password => "123456"????????? # redis 密碼
?????? ?#db => 8?????????????????? # redis數(shù)據庫編號
??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式
??????? key => "logstash_list_0"? # 發(fā)布通道名稱
}
#輸出到kafka
??? kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
??????? topic_id???????? => "test"?
?????? }
#輸出到es
elasticsearch {
??????? hosts => "node18:9200"
??????? codec => json
??????? }
}
從上面可以看出侮措,Logstash不僅可以輸出到es和控制臺,還可以輸出到redis緩存中和kafka中乖杠,通過配置可以實現(xiàn)強大的數(shù)據傳輸功能分扎。
消息模式:Beats 還不支持輸出到消息隊列(新版本除外:5.0版本及以上)
在消息隊列前后兩端只能是 Logstash 實例。logstash從各個數(shù)據源搜集數(shù)據胧洒,不經過任何處理轉換僅轉發(fā)出到消息隊列(kafka畏吓、redis、rabbitMQ等)略荡,后logstash從消息隊列取數(shù)據進行轉換分析過濾庵佣,輸出到elasticsearch,并在kibana進行圖形化展示汛兜。
架構(Logstash進行日志解析所在服務器性能各方面必須要足夠好):
模式特點:這種架構適合于日志規(guī)模比較龐大的情況巴粪。但由于 Logstash 日志解析節(jié)點和 Elasticsearch 的負荷比較重,可將他們配置為集群模式,以分擔負荷肛根。引入消息隊列辫塌,均衡了網絡傳輸,從而降低了網絡閉塞派哲,尤其是丟失數(shù)據的可能性臼氨,但依然存在 Logstash 占用系統(tǒng)資源過多的問題
工作流程:Filebeat采集—>? logstash轉發(fā)到kafka—>? logstash處理從kafka緩存的數(shù)據進行分析—>? 輸出到es—>? 顯示在kibana
從服務器接收日志的Logstash的配置:
input?{
??? beats {
??? port => 5044
??? codec => "json"
?????? }
??? syslog{
?????? }
}
output?{
??? # 輸出到控制臺
??? # stdout { }
??? # 輸出到redis
??? redis {
??????? host => "192.168.80.32"?? # redis主機地址
??????? port => 6379????????????? # redis端口號
??????? password => "123456"????????? # redis 密碼
?????? #db => 8?????????????????? # redis數(shù)據庫編號
??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式
??????? key => "logstash_list_0"? # 發(fā)布通道名稱
??? }
??#輸出到kafka
??? kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
??????? topic_id????????? => "test"?
?????? }?????
}
從kafka接收日志到es的Logstash的配置:
input{
??? kafka {
? ? ? ? ? ?bootstrap_servers => "192.168.80.42:9092"
?????? ??? topics????????? => ["test"]
?????? ??? #decroate_events?? => true
? ? ? ? ? ?group_id????????? => "consumer-test"(消費組)
?????? ??? #decroate_events? => true
? ? ? ? ? ??auto_offset_reset => "earliest"(初始消費,相當于from beginning芭届,不設置储矩,相當于是監(jiān)控啟動后的kafka的消息生產)
? ? ? ? }
}
output?{
?elasticsearch {
?????? hosts => "192.168.80.18:9200"???
?????? codec => json
?????? }
}
消息模式:logstash從kafka消息隊列直接讀取數(shù)據并處理、輸出到es(因為從kafka內部直接讀取褂乍,相當于是已經在緩存內部持隧,直接logstash處理后就可以進行輸出,輸出到文件逃片、es等)
工作模式:【數(shù)據已存在kafka對應主題內】單獨的logstash屡拨,kafka讀取,經過處理輸出到es并在kibana進行展示
Logstash配置如下:
input{
??? kafka {
? ? ? ? ? bootstrap_servers => "192.168.80.42:9092"
?????? ???? topics??? ??????=> ["test"]
? ? ? ? ? ?group_id????? ?=> "consumer-test"
? ? ? ? ? ?#decroate_events? => true
? ? ? ? ? ? auto_offset_reset => "earliest"
? ? ?}
}?
output?{
?????? elasticsearch {
?????? hosts => "192.168.80.18:9200"
?????? codec => json
?????? }
}
filebeat新版本(5.0以上)支持直接支持輸出到kafka褥实,而無需經過logstash接收轉發(fā)到kafka
Filebeat采集完畢直接入到kafka消息隊列呀狼,進而logstash取出數(shù)據,進行處理分析輸出到es损离,并在kibana進行展示哥艇。
filebeat配置:?
#======== Filebeat prospectors=================
filebeat.prospectors:?
- input_type: log
?paths:
??? - /home/admin/helloworld/logs/*.log?
#-----------------------------kafka? output-----------------------------------
output.kafka:
? hosts: ["192.168.80.42:9092"]
? topic: test
? required_acks: 1
Logstash的配置:
input{
??? kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
?????? ???? topics????????? => ["test"]
???????? group_id?????? => "consumer-test"
?????? ? #decroate_events? => true
?????? auto_offset_reset => "earliest"
? ?}
}
output?{
?????? elasticsearch {
?????? hosts => "192.168.80.18:9200"
?????? codec => json
?????? }
}
SSL加密傳輸(增強安全性,僅配置了秘鑰和證書的filebeat服務器和logstash服務器才能進行日志文件數(shù)據的傳輸)
Logstash的配置文件:
ssl_certificate_authorities :filebeat端傳來的證書所在位置
ssl_certificate?=>?本端生成的證書所在的位置
ssl_key?=>?/本端生成的密鑰所在的位置
ssl_verify_mode?=> "force_peer"
input {
??? beats {
??? port => 5044
??? codec => "json"
ssl => true
?? ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"]
?? ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt"
?? ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key"
ssl_verify_mode => "force_peer"#(需與ssl_certificate_authorities一起使用)
?????? }
??? syslog{
?????? }
}
?
output {
??? # 輸出到控制臺
??? # stdout { }
??? # 輸出到redis
??? redis {
??????? host => "192.168.80.32"?? # redis主機地址
??????? port => 6379????????????? # redis端口號
??????? password => "123456"????????? # redis 密碼
?????? #db => 8?????????????????? # redis數(shù)據庫編號
??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式
??????? key => "logstash_list_0" ?# 發(fā)布通道名稱
??? }
??? #輸出到kafka
??? kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
??????? topic_id????????? => "test"?
?????? }?????
??? #輸出到es
??? elasticsearch {
?????? hosts => "node18:9200"
?????? codec => json
?????? }
}
filebeat的配置文件:?
#=================== Filebeat prospectors ========================
filebeat.prospectors:?
- input_type: log
?paths:
??? - /home/admin/helloworld/logs/*.log
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
?hosts: ["192.168.80.18:5044"]
#加密傳輸
?ssl.certificate_authorities: ["/usr/local/filebeat-5.6.10/pki/tls/certs/logstash.crt"]
? ssl.certificate: "/usr/local/filebeat-5.6.10/pki/tls/certs/filebeat.crt"
? ssl.key: "/usr/local/filebeat-5.6.10/pki/tls/private/filebeat.key"?
logstash(非filebeat)進行文件采集草冈,輸出到kafka緩存她奥,讀取kafka數(shù)據并處理輸出到文件或es
從文件采集到kafka:
input?{
?file?{
??????? path => [
??????????? # 這里填寫需要監(jiān)控的文件
??????????? "/home/admin/helloworld/logs/catalina.out"
??????? ]
??? }
}
output?{
?kafka?{
??? # 輸出到控制臺
??? # stdout { }
?#?輸出到kafka
??bootstrap_servers => "192.168.80.42:9092"
??? topic_id????????? => "test"
??? }
}
從kafka或者redis讀取數(shù)據輸出到es或者文件:
input{
#從redis讀取
?redis {
??????? host => "192.168.80.32"?? # redis主機地址
??????? port => 6379????????????? # redis端口號
?????? password? => "123456"??? ? # redis 密碼
??????? #db => 8?????????????????? # redis數(shù)據庫編號
??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式
??????? key => "logstash_list_0"? # 發(fā)布通道名稱
}
#從kafka讀取
kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
?????? ??? topics????????? => ["test"]
??????? auto_offset_reset => "earliest"
?????? }
}
?
output?{
??#輸出到文件
??? file {
??????? path => "/usr/local/logstash-5.6.10/data/log/logstash/all1.log" # 指定寫入文件路徑
#?????? message_format => "%{host} %{message}"???????? # 指定寫入格式
??????? flush_interval => 0???????????????????????????? # 指定刷新間隔,0代表實時寫入
?? ? codec => json
?????? }
?#輸出到es
?? elasticsearch {
?????? hosts => "node18:9200"
?????? codec => json
?????? }
}
logstash同步mysql數(shù)據庫數(shù)據到es(logstash5版本以上已集成jdbc插件怎棱,無需下載安裝哩俭,直接使用)
從mysql讀取數(shù)據到es:
input {
?stdin { }
??? jdbc {
??jdbc_connection_string => "jdbc:mysql://192.168.80.18:3306/fyyq-mysql"
??????? jdbc_user => "fyyq"
??????? jdbc_password => "fyyq@2017"
jdbc_driver_library => "/usr/local/logstash-5.6.10/mysql-connector-java-5.1.46.jar"
???? ???jdbc_driver_class => "com.mysql.jdbc.Driver"
??????? jdbc_paging_enabled => "true"
??????? statement_filepath => "/usr/local/logstash-5.6.10/mysql2es.sql"
??????? #schedule => "* * * * *"
??? }
?}
?
?output {
???? stdout {
??????? codec => json_lines
??? }
??elasticsearch {
??????? hosts => "node18:9200"
??????? #index => "mainIndex"
??????? #document_type => "user"
??????? #document_id => "%{id}"
??? }
}
Logstash-input插件及插件參數(shù)概覽
所有輸入插件都支持以下配置選項:
codec:可選
json?(json格式編解碼器)
msgpack?(msgpack格式編解碼器)
plain(文本格式編解碼器)
multiline(將多行文本event合并成一個event,eg:將java中的異常跟蹤日志合并成一條消)]
常用輸入插件:
1拳恋、beat-input:Receives events from the Elastic Beats framework凡资,從框架接收事件? ? Settings:
2、file-input:來自文件的Streams事件(path字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
3谬运、stdin-input:從標準輸入讀取事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html
4隙赁、syslog-input:將syslog消息作為事件讀取
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html
5、tcp-input:從TCP讀取事件(port字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html
6梆暖、udp-input:通過UDP讀取事件(port字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html
7伞访、twitter-input:從Twitter Streaming API讀取事件(相對常用場景)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-twitter.html
(consumer_key、consumer_secret轰驳、oauth_token厚掷、oauth_token_secret必填項)
8弟灼、redis-input:從Redis實例讀取事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-redis.html
(data_type["list", "channel", "pattern_channel"]、key必填項冒黑,)
9田绑、kafka-input:從Kafka主題中讀取事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
(參數(shù)過多,自行查看)
10抡爹、jdbc-input:從JDBC數(shù)據創(chuàng)建事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
(jdbc_connection_string掩驱、jdbc_driver_class、jdbc_user必填項)
11冬竟、http-input:通過HTTP或HTTPS接收事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html
12欧穴、elasticsearch-input:從Elasticsearch集群讀取查詢結果
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-elasticsearch.html
13、exec-input:將shell命令的輸出捕獲為事件(command字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-exec.html
非 常用輸入插件:
自行進入logstash的插件中心進行查看诱咏,有需要自行配置
總:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
Logstash-filter插件及插件參數(shù)概覽
所有處理插件均支持的配置:
常用處理插件:
1苔可、 grok-filter:可以將非結構化日志數(shù)據解析為結構化和可查詢的內容
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html#_grok_basics
grok模式的語法是%{SYNTAX:SEMANTIC}
SYNTAX是與您的文本匹配的模式的名稱
SEMANTIC是您為匹配的文本提供的標識符
grok是通過系統(tǒng)預定義的正則表達式或者通過自己定義正則表達式來匹配日志中的各個值
正則解析式比較容易出錯缴挖,建議先調試(地址):
grok debugger調試:http://grokdebug.herokuapp.com/
grok事先已經預定義好了許多正則表達式規(guī)則袋狞,該規(guī)則文件存放路徑:
/usr/local/logstash-5.6.10/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns
等等,可自行進入查看映屋。
示例一:
初始輸入的message是:
經過grok的正則分析后:
示例二:
COMBINEDAPACHELOG的具體內容見:
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/httpd
初始輸入message為:
經過grok正則分析后:
示例三(自定義grok表達式mypattern[A-Z]):
初始輸入message:
經過grok正則分析后:
示例四(移除重復字段):
初始輸入message:
經過grok正則解析后(json格式):
示例五(過濾篩選catalina.out文件中的信息苟鸯,message字段已移除):
【Data在pattern中的定義是:.*? GREEDYDATA在pattern中的定義是:.*】
初始輸入message:
經過grok正則解析后(截圖及json格式如下):
常用參數(shù):
1)match:match作用:用來對字段的模式進行匹配
2)patterns_dir:用來指定規(guī)則的匹配路徑,如果使用logstash自定義的規(guī)則時棚点,不需要寫此參數(shù)早处。Patterns_dir可以同時制定多個存放過濾規(guī)則的目錄;
3)remove_field:如果匹配到某個”日志字段瘫析,則將匹配的這個日志字段從這條日志中刪除(多個以逗號隔開)
2砌梆、 clone-filter:克隆過濾器用于復制事件
3、? drop-filter:丟棄所有活動
4贬循、? json-filter:解析JSON事件
5咸包、? kv-filter:解析鍵值對
非常用參數(shù):參考教程:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
Logstash-output插件及插件參數(shù)概覽
所有輸出插件均支持以下配置:
常用插件:
1、Elasticsearch-output:此插件是在Elasticsearch中存儲日志的推薦方法杖虾。如果您打算使用Kibana Web界面烂瘫,則需要使用此輸出
2、file-output:此輸出將事件寫入磁盤上的文件(path字段必填項)
3奇适、kafka-output:將事件寫入Kafka主題(topic_id是必填項)
4坟比、 redis-output:此輸出將使用RPUSH將事件發(fā)送到Redis隊列
5、stdout-output:一個簡單的輸出嚷往,打印到運行Logstash的shell的STDOUT
非常用插件:參考官網教程鏈接:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
文章原文鏈接:https://www.cnblogs.com/qingqing74647464/p/9378385.html
我們的交流基地葛账,“JAVA互聯(lián)網技術交流:789650498”歡迎小伙伴們一起來交流: