一菱皆、Logstash簡(jiǎn)介
??Logstash是一個(gè)開源數(shù)據(jù)收集引擎,具有實(shí)時(shí)管道功能搂妻。Logstash可以動(dòng)態(tài)地將來(lái)自不同數(shù)據(jù)源的數(shù)據(jù)統(tǒng)一起來(lái)议慰,并將數(shù)據(jù)標(biāo)準(zhǔn)化到你所選擇的目的地蠢古。
??Logstash管道有兩個(gè)必需的元素,輸入和輸出别凹,以及一個(gè)可選元素過濾器草讶。輸入插件從數(shù)據(jù)源那里消費(fèi)數(shù)據(jù),過濾器插件根據(jù)你的期望修改數(shù)據(jù)炉菲,輸出插件將數(shù)據(jù)寫入目的地堕战。
輸入
:采集各種樣式、大小和來(lái)源的數(shù)據(jù)
數(shù)據(jù)往往以各種各樣的形式拍霜,或分散或集中地存在于很多系統(tǒng)中嘱丢。Logstash 支持各種輸入選擇 ,可以在同一時(shí)間從眾多常用來(lái)源捕捉事件。能夠以連續(xù)的流式傳輸方式刊侯,輕松地從您的日志蚣抗、指標(biāo)、Web 應(yīng)用缀旁、數(shù)據(jù)存儲(chǔ)以及各種 AWS 服務(wù)采集數(shù)據(jù)。
過濾器
:實(shí)時(shí)解析和轉(zhuǎn)換數(shù)據(jù)
數(shù)據(jù)從源傳輸?shù)酱鎯?chǔ)庫(kù)的過程中勺鸦,Logstash 過濾器能夠解析各個(gè)事件并巍,識(shí)別已命名的字段以構(gòu)建結(jié)構(gòu),并將它們轉(zhuǎn)換成通用格式换途,以便更輕松懊渡、更快速地分析和實(shí)現(xiàn)商業(yè)價(jià)值。
Logstash 能夠動(dòng)態(tài)地轉(zhuǎn)換和解析數(shù)據(jù)军拟,不受格式或復(fù)雜度的影響:
利用 Grok 從非結(jié)構(gòu)化數(shù)據(jù)中派生出結(jié)構(gòu)
從 IP 地址破譯出地理坐標(biāo)
將 PII(個(gè)人驗(yàn)證信息) 數(shù)據(jù)匿名化距贷,完全排除敏感字段
整體處理不受數(shù)據(jù)源、格式或架構(gòu)的影響
輸出
:選擇你的存儲(chǔ)吻谋,導(dǎo)出你的數(shù)據(jù)
盡管 Elasticsearch 是我們的首選輸出方向忠蝗,能夠?yàn)槲覀兊乃阉骱头治鰩?lái)無(wú)限可能,但它并非唯一選擇漓拾。
Logstash 提供眾多輸出選擇阁最,您可以將數(shù)據(jù)發(fā)送到您要指定的地方,并且能夠靈活地解鎖眾多下游用例骇两。
logstash是做數(shù)據(jù)采集的速种,類似于flume。
Logstash架構(gòu):
Batcher負(fù)責(zé)批量的從queue中取數(shù)據(jù);
Queue分類:
In Memory : 無(wú)法處理進(jìn)程Crash低千、機(jī)器宕機(jī)等情況配阵,會(huì)導(dǎo)致數(shù)據(jù)丟失
Persistent Queue In Disk:可處理進(jìn)程Crash等情況馏颂,保證數(shù)據(jù)不丟失,保證數(shù)據(jù)至少消費(fèi)一次棋傍,充當(dāng)緩沖區(qū)救拉,可以替代kafka等消息隊(duì)列的作用。
官網(wǎng):https://www.elastic.co/cn/products/logstash
二瘫拣、下載安裝
- 下載安裝
# 下載
[root@localhost ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz
# 解壓
[root@localhost ~]# tar -zxvf logstash-6.4.3.tar.gz -C /usr/local
# 查看內(nèi)容
[root@localhost ~]# ls /usr/local/logstash-6.4.3/
# 測(cè)試logstash-6.4.3
[root@localhost logstash-6.4.3]# ./bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
Sending Logstash logs to /usr/local/logstash-6.4.3/logs which is now configured via log4j2.properties
[2019-09-25T15:12:41,903][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-09-25T15:12:42,613][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.4.3"}
[2019-09-25T15:12:45,490][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2019-09-25T15:12:45,845][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x310540d4 run>"}
The stdin plugin is now waiting for input:
[2019-09-25T15:12:46,543][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-09-25T15:12:47,020][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
如果啟動(dòng)成功會(huì)出現(xiàn)一下的提示語(yǔ)句:
接著屏幕就等著你輸入了亿絮,比如輸入一個(gè)Hello World,會(huì)出現(xiàn)以下的提示語(yǔ)句麸拄。
HelloWorld
{
"@timestamp" => 2019-09-25T07:14:40.491Z,
"host" => "localhost",
"@version" => "1",
"message" => "HelloWorld"
}
- 配置文件簡(jiǎn)單測(cè)試
pipeline配置簡(jiǎn)介:
Pipeline用于配置input派昧、filter和output插件
input {
...
}
filter {
...
}
output {
...
}
創(chuàng)建配置文件logstash.conf:
[root@localhost logstash-6.4.3]# vi config/logstash.conf
# 輸入內(nèi)容
input {
stdin { }
}
output {
stdout {
codec => rubydebug { }
}
elasticsearch {
hosts => ["0.0.0.0:9200"]
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
}
# 啟動(dòng)elasticsearch
# 指定配置文件啟動(dòng)
[root@localhost logstash-6.4.3]# ./bin/logstash -f config/logstash.conf
# 同樣命令行等著你輸入指令
Hello World
{
"@version" => "1",
"host" => "localhost",
"@timestamp" => 2019-09-25T07:25:03.292Z,
"message" => "Hello World"
}
訪問:
http://192.168.77.132:9200/_search?q=Hello
常見問題:
- 錯(cuò)誤提示:Unrecognized VM option 'UseParNewGC'
JDK版本不正確。 - (LoadError) Unsupported platform: x86_64-linux
切換JDK為1.8版本
# 安裝JDK
[root@localhost ~]# yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
# 切換JDK
[root@localhost logstash-6.4.3]# alternatives --config java
共有 2 個(gè)提供“java”的程序拢切。
選項(xiàng) 命令
-----------------------------------------------
+ 1 java-11-openjdk.x86_64 (/usr/lib/jvm/java-11-openjdk-11.0.4.11-1.el7_7.x86_64/bin/java)
* 2 java-1.8.0-openjdk.x86_64 (/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-1.el7_7.x86_64/jre/bin/java)
按 Enter 保留當(dāng)前選項(xiàng)[+]蒂萎,或者鍵入選項(xiàng)編號(hào):2
- error=>"Got response code '401' contacting Elasticsearch at URL 'http://0.0.0.0:9200/'"
原因:
沒有指定Elasticsearch 權(quán)限,修改配置添加elasticsearch 的用戶與密碼:
output {
stdout {
codec => rubydebug { }
}
elasticsearch {
hosts => ["0.0.0.0:9200"]
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
}
三淮椰、采集MySql數(shù)據(jù)庫(kù)數(shù)據(jù)
- 下載MySql-JDBC
MySql-JDBC下載地址:https://dev.mysql.com/downloads/connector/j/5.1.html
5.1下載:
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
解壓:
[root@localhost ~]# tar -zxvf mysql-connector-java-5.1.48.tar.gz
8.0下載地址:
https://dev.mysql.com/downloads/connector/j/8.0.html
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-8.0.17.tar.gz
- 安裝MySql-JDBC插件
# 安裝插件
[root@localhost logstash-6.4.3]# ./bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful
-
創(chuàng)建數(shù)據(jù)庫(kù)五慈、表、與sql文件
數(shù)據(jù)庫(kù):log_test 表:news
數(shù)據(jù)腳本:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for news
-- ----------------------------
DROP TABLE IF EXISTS `news`;
CREATE TABLE `news` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '標(biāo)題',
`content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '內(nèi)容',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
創(chuàng)建執(zhí)行SQL:
# 創(chuàng)建目錄
[root@localhost logstash-6.4.3]# mkdir sql
# 添加內(nèi)容
[root@localhost sql]# vi sql/jdbc.sql
# 內(nèi)容為
select * from news
要注意:這里的內(nèi)容就是logstash依賴執(zhí)行的sql命令实苞,所以這里的表名要跟你實(shí)際的數(shù)據(jù)庫(kù)表名一致豺撑,否則會(huì)失敗烈疚。
至于jdbc.sql的內(nèi)容就是你的業(yè)務(wù)sql黔牵,只能有一條,且末尾不要加分號(hào)爷肝,否則出錯(cuò)猾浦!
- 單數(shù)據(jù)源同步
單數(shù)據(jù)源同步是指,數(shù)據(jù)只寫入一個(gè)index下(注意:6.x版本下灯抛,一個(gè)index下只能有一個(gè)type)金赦,jdbc塊和elasticsearch塊也是 一 一 對(duì)應(yīng)的關(guān)系,具體看下同步conf的配置(示例配置文件名稱: singledb.conf):
# 創(chuàng)建配置文件
[root@localhost logstash-6.4.3]# vi config/singledb.conf
# 內(nèi)容
input {
stdin {
}
jdbc {
# mysql jdbc connection 連接地址
jdbc_connection_string => "jdbc:mysql://10.2.129.166:3306/log_test?serverTimezone=Asia/Shanghai&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
# 登錄數(shù)據(jù)庫(kù)的用戶名对嚼、密碼
jdbc_user => "root"
jdbc_password => "1234"
# jdbc 驅(qū)動(dòng)包路徑
jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar" # 連接驅(qū)動(dòng)類名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/usr/local/logstash-6.4.3/config/jdbc.sql"
# 以下表示定時(shí)執(zhí)行任務(wù)夹抗,使用cron表達(dá)式,本文只全量同步一次纵竖,所以不配置定時(shí)器漠烧,如果要實(shí)現(xiàn)增量更新,需要配合定時(shí)器以及上次查詢的最后一個(gè)值靡砌,具體要根據(jù)你的業(yè)務(wù)來(lái)定已脓。
# schedule => "* * * * *"
type => "jdbc"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["192.168.77.132:9200"]
index => "mynews"
document_id => "%{id}"
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
stdout {
codec => json_lines
}
}
# 復(fù)制jdbc jar 文件
[root@localhost logstash-6.4.3]# cp /root/mysql-connector-java-8.0.17/mysql-connector-java-8.0.17.jar config
# 啟動(dòng)同步
[root@localhost logstash-6.4.3]# ./bin/logstash -f config/singledb.conf
注意:配置文件中的jar與sql一定要使用絕對(duì)路徑。
啟動(dòng)同步通殃,同步的內(nèi)容會(huì)輸出到屏幕上度液。執(zhí)行完成后可在es中查看數(shù)據(jù)。
- 多數(shù)據(jù)源同步
多數(shù)據(jù)源同步是指,需要同步多種類型的數(shù)據(jù)到es中堕担,input的配置添加相應(yīng)的jdbc模塊已慢,output中根據(jù)type類型判斷添加對(duì)應(yīng)的elasticsearch模塊:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.91.149:3306/test"
jdbc_user => "root"
jdbc_password => "1234"
# 此處的路徑最好是絕對(duì)路徑,相對(duì)路徑取決與允許命令的目錄
jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 此處的路徑最好是絕對(duì)路徑照宝,相對(duì)路徑取決與允許命令的目錄
statement_filepath => "/usr/local/logstash-6.4.3/config/jdbc0.sql"
type => "user"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.91.149:3306/test"
jdbc_user => "root"
jdbc_password => "1234"
# 此處的路徑最好是絕對(duì)路徑蛇受,相對(duì)路徑取決與允許命令的目錄
jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 此處的路徑最好是絕對(duì)路徑,相對(duì)路徑取決與允許命令的目錄
statement_filepath => "/usr/local/logstash-6.4.3/config/jdbc1.sql"
type => "user_address"
}
}
output {
#設(shè)置窗口日志輸出
stdout {
codec => json_lines
}
# 與JDBC定義的type對(duì)應(yīng)
if[type] == "user" {
elasticsearch {
hosts => ["192.168.77.132:9200"]
# 注意index的值不支持大寫字母
index => "user"
# document_type自行設(shè)置厕鹃,不設(shè)置時(shí)兢仰,默認(rèn)為doc
# document_type => ""
# 此處的值來(lái)自查詢sql中的列名稱,根據(jù)需要自行配置
document_id => "%{id}"
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
}
if[type] == "user_news" {
elasticsearch {
hosts => ["192.168.77.132:9200"]
# 注意index的值不支持大寫字母
index => "user_news"
# document_type自行設(shè)置剂碴,不設(shè)置時(shí)把将,默認(rèn)為doc
# document_type => ""
# 此處的值來(lái)自查詢sql中的列名稱,根據(jù)需要自行配置
document_id => "%{id}"
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
}
}
全量同步
以上的單數(shù)據(jù)源/多數(shù)據(jù)源同步都是全量同步忆矛,即沒有任何條件地進(jìn)行同步察蹲。增量同步
增量同步需要在jdbc模塊添加相應(yīng)的增量配置
配置參數(shù)
Schedule:是cron格式的同步周期,其它幾個(gè)都是用來(lái)記錄同步增量指標(biāo)的催训,
Tracking_column:是數(shù)據(jù)庫(kù)中的增量指標(biāo)字段名
Tracking_columu_type:目前只支持兩種numeric,timestamp洽议,
Last_run_metadata_path:是保存上次同步的增量指標(biāo)值。
而 :sql_last_value如果input里面use_column_value => true漫拭, 即如果設(shè)置為true的話亚兄,可以是我們?cè)O(shè)定的字段的上一次的值。
默認(rèn) use_column_value => false采驻, 這樣 :sql_last_value為上一次更新的最后時(shí)刻值审胚。
也就是說(shuō),對(duì)于新增的值礼旅,才會(huì)更新膳叨。這樣就實(shí)現(xiàn)了增量更新的目的。
四痘系、相關(guān)配置
- 持久隊(duì)列基本配置(pipelines.yml)
queue.type:persisted # 默認(rèn)是memory
queue.max_bytes:4gb # 隊(duì)列存儲(chǔ)最大數(shù)據(jù)量
- 線程相關(guān)配置(logstash.yml)
pipeline.worksers | -w
# pipeline線程數(shù)菲嘴,即filter_output的處理線程數(shù),默認(rèn)是cpu核數(shù)
pipeline.batch.size | -b
# Batcher一次批量獲取的待處理文檔數(shù)汰翠,默認(rèn)是125龄坪,可以根據(jù)輸出進(jìn)行調(diào)整,越大會(huì)占用越多的heap空間奴璃,可以通過jvm.options調(diào)整
pipeline.batch.delay | -u
# Batcher等待的時(shí)長(zhǎng)悉默,單位為ms
- Logstash配置文件:
logstash設(shè)置相關(guān)的配置文件(在conf文件夾中)
logstash.yml:logstash相關(guān)配置,比如node.name苟穆、path.data抄课、pipeline.workers唱星、queue.type等,這其中的配置可以被命令行參數(shù)中的相關(guān)參數(shù)覆蓋
jvm.options:修改jvm的相關(guān)參數(shù)跟磨,比如修改heap size等
pipeline配置文件:定義數(shù)據(jù)處理流程的文件间聊,以.conf結(jié)尾
logstash.yml配置項(xiàng):
node.name: 節(jié)點(diǎn)名稱,便于識(shí)別
path.data: 持久化存儲(chǔ)數(shù)據(jù)的文件夾抵拘,默認(rèn)是logstash home目錄下的data
path.config: 設(shè)定pipeline配置文件的目錄(如果指定文件夾哎榴,會(huì)默認(rèn)把文件夾下的所有.conf文件按照字母順序拼接為一個(gè)文件)
path.log: 設(shè)定pipeline日志文件的目錄
pipeline.workers: 設(shè)定pipeline的線程數(shù)(filter+output),優(yōu)化的常用項(xiàng)
pipeline.batch.size/delay: 設(shè)定批量處理數(shù)據(jù)的數(shù)據(jù)和延遲
queue.type: 設(shè)定隊(duì)列類型僵蛛,默認(rèn)是memory
queue.max_bytes: 隊(duì)列總?cè)萘可序颍J(rèn)是1g
五、常見問題
- Error: Java::JavaSql::SQLException: null, message from server: "Host 'DESKTOP-T92R1EE' is not allowed to connect to this MySQL server"
數(shù)據(jù)庫(kù)沒有開啟遠(yuǎn)程連接
select user,host from user;
# 在Mysql數(shù)據(jù)庫(kù)中執(zhí)行以下sql
update user set host='%' where user ='root';
flush privileges;
最好重啟下Mysql