ELK 是三個開源項目的首字母縮寫满粗,這三個項目分別是:Elasticsearch爱谁、Logstash 和 Kibana晒喷。它是一套以 Elasticsearch 為核心的 OLAP 集成解決方案。
本方案基于CentOS8系統(tǒng)設(shè)計访敌,建議在RedHat/CentOS系統(tǒng)中使用凉敲。方案使用服務(wù)器及網(wǎng)絡(luò)資源較多,建議在實施前做好規(guī)劃工作寺旺,有利于部署工作順利爷抓、有序進行。
目錄
1.前言
2.集群部署拓撲圖
3.Elasticsearch 單點安裝和配置
3.1.安裝和配置
3.2.安全性擴展
3.3.中文分詞擴展
4.Elasticsearch 集群化
4.1.Elasticsearch 集群的概念
4.2.Elasticsearch 集群的示例
5.Kibana 安裝和配置
6.Logstash 安裝和作業(yè)部署
7.Java 開發(fā)集成
7.1.ES 編程語言
7.2.ES + Jest 可復(fù)用類庫
7.3.ELK + Jest 最佳集成方案
1.前言
1阻塑、"集群"指的是 Elasticsearch 組成的集群蓝撇,實現(xiàn)負載均衡,故障轉(zhuǎn)移陈莽,實時熱備和讀寫分離渤昌。但與 Logstash 和 Kibana 無關(guān)。
2传透、核心組件簡介
1)Elasticsearch: 一個搜索和分析引擎耘沼,提供搜集、分析朱盐、存儲數(shù)據(jù)三大功能群嗤。具有分布式,零配置兵琳,自動發(fā)現(xiàn)狂秘,索引自動分片,索引副本機制躯肌,Restful風(fēng)格接口者春,多數(shù)據(jù)源,自動搜索負載等特點清女。Elasticsearch 能夠?qū)B級海量數(shù)據(jù)進行近實時的處理(從寫入到查詢的延時≈1秒)钱烟,且能夠支持數(shù)百臺服務(wù)器節(jié)點的分布式集群架構(gòu)。
2)IKAnalyzer :一個開源的,基于Java語言開發(fā)的輕量級的中文分詞工具包拴袭。
2)Logstash: 一個服務(wù)器端數(shù)據(jù)處理管道读第,能夠同時從多個來源采集數(shù)據(jù),轉(zhuǎn)換數(shù)據(jù)拥刻,然后將數(shù)據(jù)發(fā)送到 Elasticsearch 中怜瞒。Logstash 通過部署JOB來完成工作,JOB工作流程如下:
3)Kibana: 一個 Elasticsearch 的可視化管理用戶接口般哼,使用圖形和圖表對數(shù)據(jù)進行可視化吴汪。
3、ELK 適用于以下應(yīng)用場景:
1)普通用于具有全文檢索蒸眠、海量數(shù)據(jù)檢索類型的應(yīng)用或功能漾橙,作為OLAP數(shù)據(jù)庫支撐業(yè)務(wù);
2)可以用于數(shù)據(jù)分析類型的應(yīng)用或功能黔宛,且能夠滿足較高的實時性要求近刘,作為OLAP數(shù)據(jù)庫支撐業(yè)務(wù);
3)可以用于具有高并發(fā)讀寫臀晃,弱ACID管理觉渴,弱安全性控制,小型文檔型數(shù)據(jù)特點的應(yīng)用或功能徽惋,作為數(shù)據(jù)庫支撐業(yè)務(wù)案淋。
補充知識:ACID是數(shù)據(jù)庫事務(wù)的4個特征,分別是原子性、一致性险绘、隔離性和持久性踢京。
2.集群部署拓撲圖
網(wǎng)絡(luò)資源規(guī)劃:
1、ES Node 1 : OLAP 數(shù)據(jù)庫服務(wù)器
1)操作系統(tǒng):CentOS8宦棺;
2)IP地址和端口號:192.168.216.128:9200瓣距;
3)主機名:ES-1;
4)數(shù)據(jù)庫:Elasticsearch代咸;
5)角色:OLAP 數(shù)據(jù)庫集群節(jié)點蹈丸。
2、ES Node 2 :OLAP 數(shù)據(jù)庫服務(wù)器
1)操作系統(tǒng):CentOS8呐芥;
2)IP地址和端口號:192.168.216.129:9200逻杖;
3)主機名:ES-2;
4)數(shù)據(jù)庫:Elasticsearch思瘟;
5)角色:OLAP 數(shù)據(jù)庫集群節(jié)點荸百。
3、ES Node 3 :OLAP 數(shù)據(jù)庫服務(wù)器
1)操作系統(tǒng):CentOS8滨攻;
2)IP地址和端口號:192.168.216.130:9200够话;
3)主機名:ES-3蓝翰;
4)數(shù)據(jù)庫:Elasticsearch;
5)角色:OLAP 數(shù)據(jù)庫集群節(jié)點更鲁。
4霎箍、Kinaba: 可視化管理客戶端
1)操作系統(tǒng):CentOS8;
2)IP地址和端口號:192.168.216.131:5601澡为;
3)主機名:ESVM;
4)中間件:Kibana景埃;
5)角色:OLAP 數(shù)據(jù)庫可視化管理器媒至。
5、Logstash : 數(shù)據(jù) ETL 服務(wù)器
1)操作系統(tǒng):CentOS8谷徙;
2)IP地址:192.168.216.132拒啰;
3)主機名:ETL;
4)中間件:Logstash完慧;
5)角色:數(shù)據(jù)采集器谋旦。
6、外部數(shù)據(jù)源: 作為 OLAP 數(shù)據(jù)分析的外部數(shù)據(jù)來源數(shù)據(jù)庫屈尼,如:PostgreSQL册着、MySQL 等 RDBMS 數(shù)據(jù)庫 。IP地址:192.168.216.133
3.Elasticsearch 單點安裝和配置
3.1.安裝和配置
以"ES Node 1"節(jié)點為例:
1脾歧、打開 Elasticsearch 官方網(wǎng)站下載頁面【https://www.elastic.co/cn/downloads/elasticsearch】甲捏,下載 Elasticsearch 的編譯程序 tar 包到用戶主目錄中。
2鞭执、解壓縮編譯程序 tar 包到"/usr/local"目錄中司顿。
[centos@ES-1 ~]$ sudo tar zxvf elasticsearch-7.6.2-linux-x86_64.tar.gz -C /usr/local
[centos@ES-1 ~]$ ll /usr/local
drwxr-xr-x. 9 root root 155 3月 26 14:36 elasticsearch-7.6.2
3、創(chuàng)建 Elastcsearch 的數(shù)據(jù)存儲目錄和日志存儲目錄兄纺。
[centos@ES-1 ~]$ sudo mkdir -p /data/elasticsearch/data
[centos@ES-1 ~]$ sudo mkdir -p /data/elasticsearch/logs
4大溜、創(chuàng)建 ELK 管理用戶和組,并設(shè)置為程序安裝目錄估脆、數(shù)據(jù)存儲目錄的擁有者钦奋。
[centos@ES-1 ~ ]$ sudo id elastic
id: “elastic”:無此用戶
[centos@ES-1 ~ ]$ sudo groupadd elastic
[centos@ES-1 ~ ]$ sudo useradd -g elastic -s /bin/false elastic
[centos@ES-1 ~ ]$ sudo chown -R elastic:elastic /usr/local/elasticsearch-7.6.2
[centos@ES-1 ~ ]$ sudo chown -R elastic:elastic /data/elasticsearch
ELK組件的管理用戶和組(如:"elastic")可以共享使用,創(chuàng)建用戶和組之前首先查詢一下賬戶是否存在旁蔼。如果賬戶和組已經(jīng)存在锨苏,可能是因為在安裝其他ELK組件時已創(chuàng)建,這種情況下不必重復(fù)創(chuàng)建用戶和組棺聊,只需要更改程序安裝目錄伞租、數(shù)據(jù)存儲目錄的擁有者即可。
5限佩、設(shè)置 Elastcsearch 的配置文件參數(shù)葵诈。
使用文本編輯器打開配置文件:
[centos@ES-1 ~]$ sudo gedit /usr/local/elasticsearch-7.6.2/config/elasticsearch.yml
修改或驗證文件中的以下參數(shù)并保存:
# --常用設(shè)置項--
node.name: es-node-1
# 設(shè)置當前ES節(jié)點的名稱裸弦,默認隨機指定一個預(yù)置列表中名字。
path.data: /data/elasticsearch/data
# 設(shè)置索引數(shù)據(jù)的存儲路徑作喘,默認是ES根目錄下的data文件夾理疙。可以設(shè)置多個存儲路徑泞坦,用逗號隔開窖贤,例:
# path.data: /path/to/data1,/path/to/data2
path.logs: /data/elasticsearch/logs
# 設(shè)置日志文件的存儲路徑,默認是ES根目錄下的logs文件夾 贰锁。
network.host: 0.0.0.0
# 設(shè)置ES服務(wù)的監(jiān)聽網(wǎng)絡(luò)地址(IPv4 或 IPv6)赃梧,默認是示例地址逮刨。
http.port: 9200
# 設(shè)置ES服務(wù)的HTTP端口反惕,默認為9200。
transport.tcp.port: 9300
# 設(shè)置節(jié)點之間交互的TCP端口诫钓,默認是9300锣险。
http.cors.enabled: true
http.cors.allow-origin: "*"
# 設(shè)置允許HTTP跨域訪問
discovery.seed_hosts: ["127.0.0.1", "[::1]"]
# 設(shè)置集群節(jié)點發(fā)現(xiàn)清單蹄皱,默認為IPv4和IPv6回路地址。
cluster.initial_master_nodes: ["es-node-1"]
# 設(shè)置集群初始化主要節(jié)點清單芯肤,默認是node-1巷折。
# --其他可用設(shè)置項--
# cluster.name: elasticsearch-cluster
# 設(shè)置的集群名稱,默認是elasticsearch纷妆。ES通過廣播方式自動發(fā)現(xiàn)同一網(wǎng)段其他節(jié)點盔几,若同一網(wǎng)段下有多個集群,則通過集群名稱區(qū)分不同的集群掩幢。
# node.master: true
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點逊拍,默認是true。ES默認集群中的第一個節(jié)點為主要節(jié)點际邻,如果這個節(jié)點故障就會重新選舉主要節(jié)點芯丧。
# node.data: true
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù),默認為true世曾。
# path.conf: /path/to/conf
# 設(shè)置配置文件的存儲路徑缨恒,默認是ES根目錄下的config文件夾。
# path.work: /path/to/work
# 設(shè)置臨時文件的存儲路徑轮听,默認是ES根目錄下的work文件夾骗露。
# path.plugins: /path/to/plugins
# 設(shè)置插件的存放路徑,默認是ES根目錄下的plugins文件夾, 插件在ES里面普遍使用血巍,用來增強原系統(tǒng)核心功能萧锉。
# bootstrap.mlockall: true
# 設(shè)置為true來鎖住JVM內(nèi)存。
# network.bind_host: 0.0.0.0
# 設(shè)置監(jiān)聽的ip地址述寡,可以是IPv4或IPv6的柿隙,默認為0.0.0.0叶洞。可以綁定這臺機器的任何一個IP地址禀崖。
# network.publish_host: 0.0.0.0
# 設(shè)置其它節(jié)點和該節(jié)點交互的IP地址衩辟,如果不設(shè)置它會自動判斷,值必須是個真實的IP地址波附。
# transport.tcp.compress: true
# 設(shè)置是否壓縮TCP傳輸時的數(shù)據(jù)艺晴,默認為false。
# http.max_content_length: 100mb
# 設(shè)置HTTP協(xié)議請求內(nèi)容的最大容量掸屡,默認100mb财饥。
# http.enabled: true
# 設(shè)置是否使用HTTP協(xié)議對外提供服務(wù),默認為true折晦。
# gateway.type: local
# 設(shè)置gateway的類型,默認為local即為本地文件系統(tǒng)沾瓦÷牛可以設(shè)置為本地文件系統(tǒng),分布式文件系統(tǒng)贯莺,hadoop的HDFS风喇,和amazon的s3服務(wù)器等。
# gateway.recover_after_nodes: 1
# 設(shè)置集群中N個節(jié)點啟動時進行數(shù)據(jù)恢復(fù)缕探,默認為1魂莫。
# gateway.recover_after_time: 5m
# 設(shè)置初始化數(shù)據(jù)恢復(fù)進程的超時時間,默認是5分鐘爹耗。
# gateway.expected_nodes: 2
# 設(shè)置這個集群中節(jié)點的數(shù)量耙考,默認為2,一旦這N個節(jié)點啟動潭兽,就會立即進行數(shù)據(jù)恢復(fù)倦始。
# cluster.routing.allocation.node_initial_primaries_recoveries: 4
# 初始化數(shù)據(jù)恢復(fù)時,并發(fā)恢復(fù)線程的個數(shù)山卦,默認為4鞋邑。
# cluster.routing.allocation.node_concurrent_recoveries: 2
# 添加刪除節(jié)點或負載均衡時并發(fā)恢復(fù)線程的個數(shù),默認為4账蓉。
# indices.recovery.max_size_per_sec: 0
# 設(shè)置數(shù)據(jù)恢復(fù)時限制的帶寬枚碗,如入100mb,默認為0铸本,即無限制肮雨。
# indices.recovery.concurrent_streams: 5
# 設(shè)置這個參數(shù)來限制從其它分片恢復(fù)數(shù)據(jù)時最大同時打開并發(fā)流的個數(shù),默認為5归敬。
# discovery.zen.ping.timeout: 3s
# 設(shè)置集群中自動發(fā)現(xiàn)其它節(jié)點時ping連接超時時間酷含,默認為3秒鄙早,對于比較差的網(wǎng)絡(luò)環(huán)境可以高點的值來防止自動發(fā)現(xiàn)時出錯。
# discovery.zen.ping.multicast.enabled: true
# 設(shè)置是否打開多播發(fā)現(xiàn)節(jié)點椅亚,默認是true限番。
# discovery.zen.ping.unicast.hosts: ["127.0.0.1", "[::1]"]
# 設(shè)置集群中主要節(jié)點的初始列表,可以通過這些節(jié)點來自動發(fā)現(xiàn)新加入集群的節(jié)點呀舔。
# discovery.zen.minimum_master_nodes: 1
# 設(shè)置集群有主要節(jié)點資格的數(shù)量弥虐,默認為1。對于大的集群來說媚赖,可以設(shè)置大一點的值(2-4)霜瘪。
# xpack.security.enabled: true
# xpack.security.transport.ssl.enabled: true
# xpack.security.transport.ssl.verification_mode: certificate
# xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
# xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
# 啟用 X-Pack 安全認證模塊,啟用前需要導(dǎo)出證書文件惧磺,如:elastic-certificates.p12颖对。
6、設(shè)置 Elasticsearch 運行的Linux系統(tǒng)參數(shù)磨隘。
1)在"/etc/sysctl.conf"文件中設(shè)置內(nèi)核參數(shù)缤底。
使用文本編輯器創(chuàng)建配置文件:
[centos@ES-1 ~ ]$ sudo gedit /etc/sysctl.conf
在文件中追加內(nèi)容并保存如下:
vm.max_map_count=655360
2)在"/etc/security/limits.conf"文件中設(shè)置用戶 SHELL 參數(shù)。
使用文本編輯器創(chuàng)建配置文件:
[centos@ES-1 ~ ]$ sudo gedit /etc/security/limits.conf
在文件中追加內(nèi)容并保存如下:
elastic soft nofile 65536
elastic hard nofile 65536
3)重新啟動操作系統(tǒng)番捂。
[centos@ES-1 ~ ]$ sudo reboot
7个唧、配置 Elasticsearch 服務(wù)開機自啟動。
使用文本編輯器創(chuàng)建配置文件:
[centos@ES-1 ~ ]$ sudo gedit /usr/lib/systemd/system/elasticsearch-server.service
編寫文件內(nèi)容并保存如下:
[Unit]
Description=Elasticsearch Server
After=syslog.target network.target
[Service]
User=elastic
Group=elastic
ExecStart=/usr/local/elasticsearch-7.6.2/bin/elasticsearch
ExecStop=/bin/kill -HUP $MAINPID
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT
LimitNOFILE=100000
LimitNPROC=100000
[Install]
WantedBy=multi-user.target
設(shè)置開機啟動:
[centos@ES-1 ~]$ sudo systemctl daemon-reload
[centos@ES-1 ~]$ sudo systemctl enable elasticsearch-server.service
8设预、啟動 Elasticsearch 服務(wù)徙歼。
[centos@ES-1 ~]$ sudo systemctl start elasticsearch-server.service
9、設(shè)置防火墻端口(CentOS8默認安裝firewall防火墻)鳖枕,允許"9200"魄梯,"9300"端口(Elasticsearch 默認端口)訪問服務(wù)器。
[centos@ES-1 ~]$ sudo firewall-cmd --zone=public --add-port=9200/tcp --permanent
[centos@ES-1 ~]$ sudo firewall-cmd --zone=public --add-port=9300/tcp --permanent
[centos@ES-1 ~]$ sudo firewall-cmd --reload
10耕魄、驗證 Elasticsearch 服務(wù)正常運行画恰。使用瀏覽器客戶端訪問 Elasticsearch 的 Http 服務(wù)接口進行測試,輸入:http://<IP>:<PORT>
11吸奴、Elasticsearch 運維管理
1)啟動 Elasticsearch 服務(wù)(任選一種方式)
[centos@ES-1 ~]$ sudo systemctl start elasticsearch-server.service
或者
[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch
2)停止 Elasticsearch 服務(wù)(任選一種方式)
[centos@ES-1 ~]$ sudo systemctl stop elasticsearch-server.service
或者
[centos@ES-1 ~]$ sudo ps -ef | grep Elasticsearch
elastic 3377 1 2 13:15 ? 00:00:37 /usr/local/elasticsearch-7.6.2/jdk/bin/java ...
[centos@ES-1 ~]$ sudo kill -9 3377
3)重啟 Elasticsearch 服務(wù)
[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service
或者
[centos@ES-1 ~]$ sudo ps -ef | grep Elasticsearch
elastic 3377 1 2 13:15 ? 00:00:37 /usr/local/elasticsearch-7.6.2/jdk/bin/java...
[centos@ES-1 ~]$ sudo kill -9 3377
[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch
4)查看 Elasticseach 服務(wù)狀態(tài)
[centos@ES-1 ~]$ sudo systemctl status elasticsearch-server.service
或者
[centos@ES-1 ~]$ sudo netstat -ntap | grep -E "9200|9300"
tcp6 0 0 :::9200 :::* LISTEN 3377/java
tcp6 0 0 :::9300 :::* LISTEN 3377/java
5)開啟 Elasticsearch 服務(wù)開機自啟動
[centos@ES-1 ~]$ sudo systemctl enable elasticsearch-server.service
6)禁用 Elasticsearch 服務(wù)開機自啟動
[centos@ES-1 ~]$ sudo systemctl disable elasticsearch-server.service
在多個實例并行的情況下允扇,全部實例共享使用管理用戶和組、Linux內(nèi)核配置则奥,但每個實例獨立安裝(包括獨立的程序安裝目錄和數(shù)據(jù)存儲目錄)考润、獨立端口、獨立啟動服務(wù)读处。
注意:其他"ES Node"節(jié)點上全部需要按照以上步驟配置糊治。
3.2.安全性擴展
Elasticsearch 集成了 X-Pack 作為安全性組件,直接開啟使用即可罚舱。
首先完成 Elasticsearch 的安裝和配置井辜。以"ES Node 1"節(jié)點為例:
1绎谦、導(dǎo)出 Elasticsearch X-Pack 證書文件。導(dǎo)出文件命令需要設(shè)置證書口令為空粥脚。
[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch-certutil cert -out /usr/local/elasticsearch-7.6.2/config/elastic-certificates.p12 -pass ""
2窃肠、修改 Elastcsearch 的配置文件參數(shù)。
使用文本編輯器打開配置文件:
[centos@ES-1 ~]$ sudo gedit /usr/local/elasticsearch-7.6.2/config/elasticsearch.yml
追加或驗證文件中的以下參數(shù)并保存:
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
3刷允、重新啟動 Elasticsearch 服務(wù)冤留。
[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service
4、設(shè)置 Elasticsearch 服務(wù)內(nèi)置賬號的口令树灶。
[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch-setup-passwords interactive
Initiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.
You will be prompted to enter passwords as the process progresses.
Please confirm that you would like to continue [y/N]y
Enter password for [elastic]:
Reenter password for [elastic]:
Enter password for [apm_system]:
Reenter password for [apm_system]:
Enter password for [kibana]:
Reenter password for [kibana]:
Enter password for [logstash_system]:
Reenter password for [logstash_system]:
Enter password for [beats_system]:
Reenter password for [beats_system]:
Enter password for [remote_monitoring_user]:
Reenter password for [remote_monitoring_user]:
Changed password for user [apm_system]
Changed password for user [kibana]
Changed password for user [logstash_system]
Changed password for user [beats_system]
Changed password for user [remote_monitoring_user]
Changed password for user [elastic]
5纤怒、驗證 X-Pack 認證。使用瀏覽器客戶端訪問 Elasticsearch 的 Http 服務(wù)接口進行測試天通,輸入:http://<IP>:<PORT>
注意:其他 "ES Node" 節(jié)點上不需要執(zhí)行此步驟泊窘。但是需要將此服務(wù)器上已經(jīng)創(chuàng)建的 "elastic-certificates.p12" 文件和 "elasticsearch.keystore" 文件拷貝到其他"ES Node"節(jié)點的同一位置上,一致使用像寒。即:集群中所有節(jié)點的安全認證文件是一致的州既。
3.3.中文分詞擴展
在中文環(huán)境下使用 Elasticsearch 一般是需要擴展中文分詞器,建議使用 IkAnalyzer 萝映。IKAnalyzer 是一個開源的,基于 Java 語言開發(fā)的輕量級的中文分詞工具包阐虚。
首先完成 Elasticsearch 的安裝和配置序臂。以"ES Node 1"節(jié)點為例:
1、從【https://github.com/medcl/elasticsearch-analysis-ik/releases】下載 IKAnalyzer 的編譯程序 zip 包到用戶主目錄中实束。使用的 IKAnalyzer 分詞器的版本應(yīng)與 Elasticsearch 的版本保持一致奥秆。
2、解壓縮編譯程序 zip 包到 Elasticsearch 程序安裝目錄下的"plugins"目錄下咸灿,并設(shè)置 ELK 管理用戶和組為擁有者构订。
[centos@ES-1 ~]$ sudo unzip elasticsearch-analysis-ik-7.6.2.zip -d /usr/local/elasticsearch-7.6.2/plugins/ik
[centos@ES-1 ~]$ sudo chown -R elastic:elastic /usr/local/elasticsearch-7.6.2/plugins/ik
3、重新啟動 Elasticsearch 服務(wù)避矢。
[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service
4悼瘾、驗證 IkAnalyzer 插件集成。使用瀏覽器客戶端訪問 Elasticsearch 的 Http 服務(wù)接口進行測試审胸,輸入:http://<IP>:<PORT>/_cat/plugins
注意:其他"ES Node"節(jié)點上全部需要按照以上步驟配置亥宿。
4.Elasticsearch 集群化
4.1.Elasticsearch 集群的概念
1、Elasticsearch 集群的組成
1)節(jié)點(Node)和 集群(Cluster):節(jié)點是單個 Elastcsearch 服務(wù)砂沛,一般部署在單獨的服務(wù)器上烫扼。如果這些單個 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件) 中具有相同的"cluster.name"設(shè)置值,那就自動發(fā)現(xiàn)這些節(jié)點并組成一個集群碍庵,集群中允許有1個或者大于1個的單數(shù)節(jié)點組成映企。
2)文檔(Document)和 索引(Index):文檔是一組由 JSON 結(jié)構(gòu)組成的結(jié)構(gòu)化數(shù)據(jù)悟狱,它由若干屬性和值組成,而索引是存儲文檔的庫堰氓〖方ィ可以將索引比作 RDBMS 中僅有一個"表"的數(shù)據(jù)庫,文檔是"表"中的"行"豆赏,而文檔屬性是"行"中的"列"挣菲。
3)shard(分片):Elasticsearch 將一個索引切分為多個分片,在集群的節(jié)點中分布存儲掷邦,構(gòu)成分布式搜索機制白胀。因此 ES 具有橫向擴展的能力,可以存儲大量數(shù)據(jù)并且讓搜索和分析等操作分布到多臺服務(wù)器上去執(zhí)行抚岗,提升吞吐量和性能或杠。分片數(shù)量在建立索引時一次設(shè)置,不能修改宣蔚,默認5個向抢。
4)shard replica(分片副本):Elasticsearch 允許每個分片創(chuàng)建多個副本。分片副本可以在多個節(jié)點中冗余數(shù)據(jù)胚委,以保障在節(jié)點故障時數(shù)據(jù)不被丟失挟鸠,同時可以提升搜索操作的吞吐量和性能。分片副本數(shù)量可以隨時修改亩冬,默認1個艘希。
2、Elasticsearch 集群節(jié)點的類型
1)主節(jié)點:主節(jié)點負責創(chuàng)建索引硅急、刪除索引覆享、分配分片、追蹤集群中的節(jié)點狀態(tài)等工作营袜。Elasticsearch 中的主節(jié)點的工作量相對較輕撒顿,用戶的請求可以發(fā)往集群中任何一個節(jié)點,由該節(jié)點負責分發(fā)和返回結(jié)果荚板,而不需要經(jīng)過主節(jié)點轉(zhuǎn)發(fā)凤壁。而主節(jié)點是由候選主節(jié)點通過 ZenDiscovery 機制選舉出來的。
2)候選主節(jié)點:在 Elasticsearch 集群初始化或者主節(jié)點宕機的情況下跪另,從候選主節(jié)點中選舉其中一個作為主節(jié)點客扎。候選主節(jié)點的 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件)設(shè)置項特點:
node.master: true
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點,默認是true罚斗。ES默認集群中的第一個節(jié)點為主要節(jié)點徙鱼,如果這個節(jié)點故障就會重新選舉主要節(jié)點。
node.data: false
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù),默認為true袱吆。
3)數(shù)據(jù)節(jié)點:數(shù)據(jù)節(jié)點負責數(shù)據(jù)的存儲和相關(guān)具體操作厌衙,比如CRUD、搜索绞绒、聚合婶希。所以,數(shù)據(jù)節(jié)點對機器配置要求比較高蓬衡,首先需要有足夠的磁盤空間來存儲數(shù)據(jù)喻杈,其次數(shù)據(jù)操作對系統(tǒng)CPU、Memory和IO的性能消耗都很大狰晚。數(shù)據(jù)節(jié)點的 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件)設(shè)置項特點:
node.master: false
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點筒饰,默認是true。ES默認集群中的第一個節(jié)點為主要節(jié)點壁晒,如果這個節(jié)點故障就會重新選舉主要節(jié)點瓷们。
node.data: true
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù),默認為true秒咐。
4)候選主節(jié)點+數(shù)據(jù)節(jié)點:一個節(jié)點可以同時成為候選主節(jié)點和數(shù)據(jù)節(jié)點谬晕,主需要在候選主節(jié)點配置的基礎(chǔ)上開啟數(shù)據(jù)節(jié)點即可。候選主節(jié)點+數(shù)據(jù)節(jié)點的 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件)設(shè)置項特點:
node.master: true
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點携取,默認是true攒钳。ES默認集群中的第一個節(jié)點為主要節(jié)點,如果這個節(jié)點故障就會重新選舉主要節(jié)點雷滋。
node.data: true
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù)夕玩,默認為true。
5)客戶端節(jié)點:客戶端節(jié)點就是既不做候選主節(jié)點也不做數(shù)據(jù)節(jié)點的節(jié)點惊豺,只負責請求的分發(fā)、匯總禽作。這是所有節(jié)點的共性能力尸昧,單獨增加這樣的節(jié)點一般是為了負載均衡。數(shù)據(jù)節(jié)點的 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件)設(shè)置項特點:
node.master: false
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點旷偿,默認是true烹俗。ES默認集群中的第一個節(jié)點為主要節(jié)點,如果這個節(jié)點故障就會重新選舉主要節(jié)點萍程。
node.data: false
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù)幢妄,默認為true。
4.2.Elasticsearch 集群的示例
首先完成 Elasticsearch 的安裝和配置茫负。假設(shè)按照本文規(guī)劃的集群拓撲圖蕉鸳,所有集群節(jié)點都是"候選主節(jié)點+數(shù)據(jù)節(jié)點"的類型,以"ES Node 1"節(jié)點為例:
1、設(shè)置 Elastcsearch 的配置文件參數(shù)潮尝。
使用文本編輯器打開配置文件:
[centos@ES-1 ~]$ sudo gedit /usr/local/elasticsearch-7.6.2/config/elasticsearch.yml
修改或驗證文件中的以下參數(shù)并保存:
# -- 必須一致的配置 --
cluster.name: elasticsearch-cluster
# 設(shè)置的集群名稱榕吼,同一集群的節(jié)點應(yīng)該設(shè)置相同的集群名稱。
cluster.initial_master_nodes: ["es-node-1","es-node-2"]
# 設(shè)置集群初始化主節(jié)點清單勉失。本例中將【es-node-1】和【es-node-2】節(jié)點設(shè)置為初始化主節(jié)點羹蚣。
discovery.send_hosts: ["192.168.216.128:9300", "192.168.216.129:9300"]
# 設(shè)置集群節(jié)點發(fā)現(xiàn)清單,一般是候選主節(jié)點IP地址乱凿,不寫端口號時默認9300端口顽素。
discovery.zen.minimum_master_nodes: 2
# 設(shè)置集群候選主節(jié)點的最少數(shù)量。
# -- 各節(jié)點的個性化配置 --
node.name: es-node-1
# 設(shè)置當前ES節(jié)點的名稱徒蟆,每個節(jié)點應(yīng)設(shè)置不同的名稱胁出。
node.master: true
# 設(shè)置當前節(jié)點是候選主節(jié)點。
node.data: true
# 設(shè)置當前節(jié)點是數(shù)據(jù)節(jié)點后专。
path.data: /data/elasticsearch/data
# 設(shè)置索引數(shù)據(jù)的存儲路徑划鸽,默認是ES根目錄下的data文件夾∑莅ィ可以設(shè)置多個存儲路徑裸诽,用逗號隔開,例:
# path.data: /path/to/data1,/path/to/data2
path.logs: /data/elasticsearch/logs
# 設(shè)置日志文件的存儲路徑型凳,默認是ES根目錄下的logs文件夾 丈冬。
network.host: 0.0.0.0
# 設(shè)置ES服務(wù)的監(jiān)聽網(wǎng)絡(luò)地址(IPv4 或 IPv6),默認是示例地址甘畅。
http.port: 9200
# 設(shè)置ES服務(wù)的HTTP端口埂蕊,默認為9200。
transport.tcp.port: 9300
# 設(shè)置節(jié)點之間交互的TCP端口疏唾,默認是9300蓄氧。
http.cors.enabled: true
http.cors.allow-origin: "*"
# 設(shè)置允許HTTP跨域訪問
# xpack.security.enabled: true
# xpack.security.transport.ssl.enabled: true
# xpack.security.transport.ssl.verification_mode: certificate
# xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
# xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
# 啟用 X-Pack 安全認證模塊,集群中所有節(jié)點的安全認證文件是一致的槐脏。
# 只需要在其中一個節(jié)點中創(chuàng)建認證文件(elastic-certificates.p12 文件和 elasticsearch.keystore 文件)喉童,其他節(jié)點只需要拷貝這兩個文件即可,注意要存放在一致的目錄下顿天。
2堂氯、認證文件一致化設(shè)置。
假設(shè)在 "ES Node 1" 上已經(jīng)創(chuàng)建安全認證文件 "elastic-certificates.p12" 和 "elasticsearch.keystore" 牌废,這些文件存放在 Elasticsearch 程序安裝目錄下的 "config" 目錄中(本例為:/usr/local/elasticsearch-7.6.2/config 目錄)咽白。
將兩個文件分別拷貝至其他 "ES Node" 節(jié)點上的 Elasticsearch 程序安裝目錄下的 "config" 目錄中(本例為:/usr/local/elasticsearch-7.6.2/config 目錄)。
注意:除上述操作以外鸟缕,無需進行任何有關(guān)安全性擴展的配置晶框。
3、重啟啟動集群。
首先啟動"ES Node 1"節(jié)點三妈,然后依次啟動其他"ES Node"節(jié)點畜埋。
[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service
[centos@ES-2 ~]$ sudo systemctl restart elasticsearch-server.service
[centos@ES-3 ~]$ sudo systemctl restart elasticsearch-server.service
4、驗證集群畴蒲。使用瀏覽器客戶端訪問 Elasticsearch 的 Http 服務(wù)接口進行測試悠鞍,輸入:http://<IP>:<PORT>/_cluster/health
5.Kibana 安裝和配置
1、打開 Kibana 官方網(wǎng)站下載頁面【https://www.elastic.co/cn/downloads/kibana】模燥,下載 Kibana 的編譯程序 tar 包到用戶主目錄中咖祭。
2、解壓縮編譯程序 tar 包到"/usr/local"目錄中蔫骂。
[centos@ESVM ~]$ sudo tar zxvf kibana-7.6.2-linux-x86_64.tar.gz -C /usr/local
[centos@ESVM ~]$ ll /usr/local
drwxr-xr-x. 13 root root 266 4月 20 15:25 kibana-7.6.2-linux-x86_64
3么翰、創(chuàng)建 ELK 管理用戶和組,并設(shè)置為程序安裝目錄的擁有者辽旋。
[centos@ESVM ~ ]$ sudo id elastic
id: “elastic”:無此用戶
[centos@ESVM ~ ]$ sudo groupadd elastic
[centos@ESVM ~ ]$ sudo useradd -g elastic -s /bin/false elastic
[centos@ESVM ~ ]$ sudo chown -R elastic:elastic /usr/local/kibana-7.6.2-linux-x86_64
ELK組件的管理用戶和組(如:"elastic")可以共享使用很洋,創(chuàng)建用戶和組之前首先查詢一下賬戶是否存在峡竣。如果賬戶和組已經(jīng)存在,可能是因為在安裝其他ELK組件時已創(chuàng)建,這種情況下不必重復(fù)創(chuàng)建用戶和組仑撞,只需要更改程序安裝目錄艘刚、數(shù)據(jù)存儲目錄的擁有者即可凝垛。
4策橘、設(shè)置 Kibana 的配置文件參數(shù)。
使用文本編輯器打開配置文件:
[centos@ESVM ~]$ sudo gedit /usr/local/kibana-7.6.2-linux-x86_64/config/kibana.yml
修改或驗證文件中的以下參數(shù)并保存:
server.port: 5601
# 服務(wù)監(jiān)聽端口瓶逃。
server.host: "192.168.216.131"
# 服務(wù)監(jiān)聽地址束铭。
server.name: "ESVM Kibana"
# 服務(wù)的顯示名稱。
elasticsearch.hosts: ["http://192.168.216.128:9200","http://192.168.216.129:9200","http://192.168.216.130:9200"]
# ElasticSearch 集群 URLs 集合厢绝。
kibana.index: ".kibana"
# Kibana 在 ElasticSearch 中存儲搜索契沫、可視化和儀表板數(shù)據(jù)的索引名字。
elasticsearch.username: "elastic"
# Kibana 訪問 ElasticSearch 的賬號昔汉,"elastic" 賬號是 Elasticsearch 的預(yù)置用戶懈万。
elasticsearch.password: "password"
# Kibana 訪問 ElasticSearch 賬號的口令。
i18n.locale: "zh-CN"
# 本地化區(qū)域語言設(shè)置挤庇,支持英文和中文,默認是中文贷掖。
5嫡秕、配置 Kibana 服務(wù)開機自啟動。
使用文本編輯器創(chuàng)建配置文件:
[centos@ESVM ~ ]$ sudo gedit /usr/lib/systemd/system/kibana-server.service
編寫文件內(nèi)容并保存如下:
[Unit]
Description=Kibana Server
After=syslog.target network.target
[Service]
User=elastic
Group=elastic
ExecStart=/usr/local/kibana-7.6.2-linux-x86_64/bin/kibana
ExecStop=/bin/kill -HUP $MAINPID
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT
LimitNOFILE=100000
LimitNPROC=100000
[Install]
WantedBy=multi-user.target
設(shè)置開機啟動:
[centos@ESVM ~]$ sudo systemctl daemon-reload
[centos@ESVM ~]$ sudo systemctl enable kibana-server.service
6苹威、啟動 Kibana 服務(wù)昆咽。
[centos@ESVM ~]$ sudo systemctl start kibana-server.service
7、設(shè)置防火墻端口(CentOS8默認安裝firewall防火墻),允許"5601"端口(Kibana 默認端口)訪問服務(wù)器掷酗。
[centos@ESVM ~]$ sudo firewall-cmd --zone=public --add-port=5601/tcp --permanent
[centos@ESVM ~]$ sudo firewall-cmd --reload
8调违、驗證 Kibana 服務(wù)正常運行。使用瀏覽器客戶端訪問 Kibana 的 Http 服務(wù)接口進行測試泻轰,輸入:http://<IP>:<PORT>
9技肩、Kibana 運維管理
1)啟動 Kibana 服務(wù)(任選一種方式)
[centos@ESVM ~]$ sudo systemctl start kibana-server.service
或者
[centos@ESVM ~]$ sudo -u elastic /usr/local/kibana-7.6.2-linux-x86_64/bin/kibana
2)停止 Kibana 服務(wù)(任選一種方式)
[centos@ESVM ~]$ sudo systemctl stop kibana-server.service
或者
[centos@ESVM ~]$ sudo ps -ef | grep kibana
elastic 12268 1 1 4月20 ? 00:12:00 /usr/local/kibana-7.6.2-linux-x86_64/bin/..
[centos@ESVM ~]$ sudo kill -9 12268
3)重啟 Kibana 服務(wù)
[centos@ESVM ~]$ sudo systemctl restart kibana-server.service
或者
[centos@ESVM ~]$ sudo ps -ef | grep kibana
elastic 12268 1 1 4月20 ? 00:12:00 /usr/local/kibana-7.6.2-linux-x86_64/bin/..
[centos@ESVM ~]$ sudo kill -9 12268
[centos@ESVM ~]$ sudo -u elastic /usr/local/kibana-7.6.2-linux-x86_64/bin/kibana
4)查看 Kibana 服務(wù)狀態(tài)
[centos@ESVM ~]$ sudo systemctl status kibana-server.service
或者
[centos@ESVM ~]$ sudo netstat -ntap | grep 5601
tcp 0 0 192.168.216.131:5601 0.0.0.0:* LISTEN 12268/node
5)啟動 Kibana 服務(wù)開機自啟動
[centos@ESVM ~]$ sudo systemctl enable kibana-server.service
6)禁用 Kibana 服務(wù)開機自啟動
[centos@ESVM ~]$ sudo systemctl disable kibana-server.service
6.Logstash 安裝和作業(yè)部署
1、安裝 OpenJDK 1.8 或 OracleJDK 1.8浮声。
1)使用 YUM 源安裝OpenJDK 1.8虚婿。
[centos@ETL ~]$ sudo dnf install java-1.8.0-openjdk
2)驗證Java運行環(huán)境。
[centos@ETL ~]$ java -vserion
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)
2泳挥、打開 Logstash 官方網(wǎng)站下載頁面【https://www.elastic.co/cn/downloads/logstash】然痊,下載 Logstash 的編譯程序 tar 包到用戶主目錄中。
3屉符、解壓縮編譯程序 tar 包到"/usr/local"目錄中剧浸。
[centos@ETL ~]$ sudo tar zxvf logstash-7.6.2.tar.gz -C /usr/local
[centos@ETL ~]$ ll /usr/local
drwxr-xr-x. 12 631 503 255 3月 26 17:42 logstash-7.6.2
4、創(chuàng)建 Logstash 的數(shù)據(jù)矗钟、日志唆香、作業(yè)配置存儲目錄。
[centos@ETL ~]$ sudo mkdir -p /data/logstash/data
[centos@ETL ~]$ sudo mkdir -p /data/logstash/logs
[centos@ETL ~]$ sudo mkdir -p /data/logstash/config
5真仲、創(chuàng)建 ELK 管理用戶和組袋马,并設(shè)置為程序安裝目錄、數(shù)據(jù)存儲目錄的擁有者秸应。
[centos@ETL ~ ]$ sudo id elastic
id: “elastic”:無此用戶
[centos@ETL ~ ]$ sudo groupadd elastic
[centos@ETL ~ ]$ sudo useradd -g elastic -s /bin/false elastic
[centos@ETL ~ ]$ sudo chown -R elastic:elastic /usr/local/logstash-7.6.2
[centos@ETL ~ ]$ sudo chown -R elastic:elastic /data/logstash
ELK組件的管理用戶和組(如:"elastic")可以共享使用虑凛,創(chuàng)建用戶和組之前首先查詢一下賬戶是否存在。如果賬戶和組已經(jīng)存在软啼,可能是因為在安裝其他ELK組件時已創(chuàng)建桑谍,這種情況下不必重復(fù)創(chuàng)建用戶和組,只需要更改程序安裝目錄祸挪、數(shù)據(jù)存儲目錄的擁有者即可锣披。
6、設(shè)置 Logstash 的配置文件參數(shù)贿条。
使用文本編輯器打開配置文件:
[centos@ETL ~]$ sudo gedit /usr/local/logstash-7.6.2/config/logstash.yml
修改或驗證文件中的以下參數(shù)并保存:
# --常用設(shè)置項--
path.data: /data/logstash/data
# 插件數(shù)據(jù)持久化目錄雹仿,默認為程序安裝目錄下的"data"目錄。
path.logs: /data/logstash/logs
# 設(shè)置日志文件目錄整以,默認輸出到控制臺胧辽。
path.config: /data/logstash/config
# 設(shè)置管道配置文件目錄。
config.reload.automatic: true
# 開啟管道配置文件自動加載公黑,默認為false邑商。
config.reload.interval: 3s
# 設(shè)置管道配置文件檢查更新的時間摄咆。
# --其他可用設(shè)置項--
# node.name: ETL Logstash
# 節(jié)點名稱,默認為主機名稱人断。
# pipeline.workers: 2
# 輸出通道的工作workers數(shù)據(jù)量(提升輸出效率)默認為CPU核數(shù)吭从。
# pipeline.batch.size: 125
# 每批次輸入數(shù)據(jù)的條數(shù),默認125條恶迈。
# pipeline.batch.delay: 50
# 每批次延時等待的時間(單位毫秒)涩金,默認50毫秒。
# http.host: "127.0.0.1"
# 用戶指標收集REST服務(wù)綁定主機地址蝉绷,默認為"127.0.0.1"鸭廷。
# http.port: 9600-9700
# 用戶指標收集REST服務(wù)綁定主機端口,默認為9600-9700熔吗。
# log.level: info
# 日志輸出級別辆床,如果config.debug開啟,這里一定要是debug日志 桅狠,默認為 info讼载。
# path.plugins: []
# 自定義插件目錄
7、配置 ETL 作業(yè)中跌。
以從 PostgreSQL 數(shù)據(jù)庫中抽取數(shù)據(jù)咨堤,并寫入 Elasticsearch 中為例:
1)下載 PostgreSQL JDBC 的 jar 包到指定位置。打開 PostgreSQL JDBC 官方網(wǎng)站下載頁面【https://jdbc.postgresql.org/download.html】漩符,下載 PostgreSQL JDBC 的 jar 包到用戶主目錄中一喘。**
2)將 PostgreSQL JDBC 的 jar 包拷貝到 Logstash 程序目錄中的指定位置:
[centos@ETL ~]$ sudo -u elastic cp postgresql-42.2.12.jar /usr/local/logstash-7.6.2/logstash-core/lib/jars
3)在 Logstash 的作業(yè)配置存儲目錄中創(chuàng)建 JOB 配置文件。
使用文本編輯器創(chuàng)建配置文件:
[centos@ETL ~]$ sudo -u elastic gedit /data/logstash/config/pgsql.conf
編寫以下配置腳本并保存:
input {
stdin {
}
jdbc {
# 數(shù)據(jù)庫 JDBC 的 jar 包位置嗜暴,JDBC 的 jar 包需要自行下載準備
jdbc_driver_library => "/usr/local/logstash-7.6.2/logstash-core/lib/jars/postgresql-42.2.12.jar"
# 數(shù)據(jù)庫驅(qū)動
jdbc_driver_class => "org.postgresql.Driver"
# 數(shù)據(jù)庫連接字符串
jdbc_connection_string => "jdbc:postgresql://192.168.216.133:5432/pgsql"
# 數(shù)據(jù)庫用戶名密碼
jdbc_user => "postgres"
jdbc_password => "postgres"
# 數(shù)據(jù)庫重連嘗試次數(shù)
connection_retry_attempts => "3"
# 判斷數(shù)據(jù)庫連接是否可用凸克,默認false不開啟
jdbc_validate_connection => "true"
# 數(shù)據(jù)庫連接可用校驗超時時間,默認3600S
jdbc_validation_timeout => "3600"
# 開啟分頁查詢(默認false不開啟)
jdbc_paging_enabled => "true"
# 單次分頁查詢條數(shù)(默認100000,若字段較多且更新頻率較高闷沥,建議調(diào)低此值)
jdbc_page_size => "5000"
# sql腳本文件或腳本
# statement_filepath => "pgsql.sql"
statement => "select row_number() over(order by last_modify_dt asc) as rn,* from table where last_modify_dt >= :sql_last_value"
# 是否將列名轉(zhuǎn)為小寫萎战,默認為true
lowercase_column_names => "false"
# 定期執(zhí)行周期,"*"從左到右依次代表: 分 時 天 月 年
schedule => "* * * * *"
# 是否記錄上次執(zhí)行結(jié)果, 如果為真,將會把上次執(zhí)行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => "true"
# 是否需要記錄某個column 的值,如果record_last_run為真,可以自定義我們需要 track 的 column 名稱舆逃,此時該參數(shù)就要為 true. 否則默認 track 的是 timestamp 的值.
use_column_value => "true"
# 如果 use_column_value 為真,需配置數(shù)據(jù)表增量字段的類型蚂维,只能是"numeric"或"timestamp"
tracking_column_type => "timestamp"
# 如果 use_column_value 為真,需配置數(shù)據(jù)表增量字段名,該字段必須是遞增的. 一般序號或時間戳
tracking_column => "last_modify_dt"
last_run_metadata_path => "/data/logstash/config/sql_last_value"
}
}
filter {
mutate {
# 將 keyword_tag 字段的內(nèi)容通過 "," 分割成數(shù)組
split => { "keyword_tag" => "," }
}
mutate {
# 將 taxonomy 字段的內(nèi)容中的前綴"/"和"http://"替換為""。
gsub => ["taxonomy", "^/(/)?", ""]
}
}
output {
elasticsearch {
# Elasticsearch 服務(wù)IP和端口,賬號和密碼
hosts => ["192.168.216.128:9200", "192.168.216.129:9200", "192.168.216.130:9200"]
user => "elastic"
password => "elastic"
# 索引名
index => "pgsql_index"
# 需要關(guān)聯(lián)的數(shù)據(jù)表中有一個uuid字段路狮,對應(yīng)索引文檔的id號
document_id => "%{uuid}"
}
stdout {
codec => rubydebug
}
}
4)驗證 JOB 配置文件虫啥。
[centos@ETL ~]$ sudo -u elastic /usr/local/logstash-7.6.2/bin/logstash -f /data/logstash/config/pgsql.conf -t
其他配置請參加官方文檔:
- input(輸入)配置:【https://www.elastic.co/guide/en/logstash/current/input-plugins.html】
- filter(過濾器)配置:【https://www.elastic.co/guide/en/logstash/current/filter-plugins.html】
- output(輸出)配置:【https://www.elastic.co/guide/en/logstash/current/output-plugins.html】
注意:每個作業(yè)編寫?yīng)毩⒌呐渲媚_本。
8奄妨、配置 Logstash 開機自啟動涂籽。
使用文本編輯器創(chuàng)建配置文件:
[centos@ETL ~ ]$ sudo gedit /usr/lib/systemd/system/logstash.service
編寫文件內(nèi)容并保存如下:
[Unit]
Description=Logstash
After=syslog.target network.target
[Service]
User=elastic
Group=elastic
ExecStart=/usr/local/logstash-7.6.2/bin/logstash
ExecStop=/bin/kill -HUP $MAINPID
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT
LimitNOFILE=100000
LimitNPROC=100000
[Install]
WantedBy=multi-user.target
設(shè)置開機啟動:
[centos@ETL ~]$ sudo systemctl daemon-reload
[centos@ETL ~]$ sudo systemctl enable logstash.service
9、啟動 Logstash 服務(wù)展蒂。
[centos@ETL ~]$ sudo systemctl start logstash.service
10又活、Logstash 運維管理
1)啟動 Logstash 服務(wù)(任選一種方式)
[centos@ETL ~]$ sudo systemctl start logstash.service
或者
[centos@ETL ~]$ sudo -u elastic /usr/local/logstash-7.6.2/bin/logstash
2)停止 Logstash 服務(wù)(任選一種方式)
[centos@ETL ~]$ sudo systemctl stop logstash.service
或者
[centos@ETL ~]$ sudo ps -ef | grep logstash
elastic 26393 26391 5 11:01 ? 00:01:38 /bin/java...
[centos@ETL ~]$ sudo kill -9 26393
3)重啟 Logstash 服務(wù)
[centos@ETL ~]$ sudo systemctl restart logstash.service
或者
[centos@ETL ~]$ sudo ps -ef | grep logstash
elastic 26393 26391 5 11:01 ? 00:01:38 /bin/java...
[centos@ETL ~]$ sudo kill -9 26393
[centos@ETL ~]$ sudo -u elastic /usr/local/logstash-7.6.2/bin/logstash
4)查看 Logstash 服務(wù)狀態(tài)
[centos@ETL ~]$ sudo systemctl status logstash.service
或者
[centos@ETL ~]$ sudo -u elastic tail /data/logstash/logs/logstash-plain.log
5)啟動 Logstash 服務(wù)開機自啟動
[centos@ETL ~]$ sudo systemctl enable logstash.service
6)禁用 Logstash 服務(wù)開機自啟動
[centos@ETL ~]$ sudo systemctl disable logstash.service
7.Java 開發(fā)集成
7.1.ES 編程語言
- Elasticsearch: 權(quán)威指南:【https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html】
- Elasticsearch 7.x 實戰(zhàn)入門:【http://www.reibang.com/nb/44794154】
7.2.ES + Jest 可復(fù)用類庫
JestClient 是 Elasticsearch 的 JavaAPI 包,以下是 Maven 項目的程序案例锰悼。
1柳骄、從 Maven 庫中引入 JestClient 包。
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest-common</artifactId>
<version>6.3.1</version>
</dependency>
2箕般、編寫 Elasticsearch 會話工廠類 "SearchSessionFacade.java"耐薯。
/**
*
* ElasticSearch Libs
*
* ?2020 張毅
*
*/
import com.google.gson.GsonBuilder;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
/**
* ElasticSearch 會話工廠類 (ElasticSearch Session Factory)
*/
public class SearchSessionFactory {
/** 請求服務(wù)地址 */
private String[] serverUris;
/** 是否啟用多線程 */
private boolean multiThreaded = true;
/** 連接超時時長 */
private int connTimeout = 10000;
/** 數(shù)據(jù)讀取超時時長 */
private int readTimeout = 10000;
/** 日期格式 */
private String dateFormat = "yyyy-MM-dd HH:mm:ss";
/** 訪問賬號 */
private String user;
/** 訪問密碼 */
private String pass;
/** Jest客戶端 */
private JestClient jestClient;
/**
* 獲取請求服務(wù)地址。
*
* @return 請求服務(wù)地址丝里。
*/
public String[] getServerUris() {
return serverUris;
}
/**
* 設(shè)置請求服務(wù)地址曲初。
*
* @param serverUris 請求服務(wù)地址。
*/
public void setServerUris(String[] serverUris) {
this.serverUris = serverUris;
}
/**
* 獲取是否啟用多線程杯聚。
*
* @return 啟用多線程返回true臼婆,否則返回false。
*/
public boolean isMultiThreaded() {
return multiThreaded;
}
/**
* 設(shè)置是否啟用多線程幌绍。
*
* @param multiThreaded 設(shè)置為true啟用颁褂,false停用。
*/
public void setMultiThreaded(boolean multiThreaded) {
this.multiThreaded = multiThreaded;
}
/**
* 獲取連接超時時長(毫秒)傀广。
*
* @return 連接超時時長(毫秒)颁独。
*/
public int getConnTimeout() {
return connTimeout;
}
/**
* 設(shè)置連接超時時長(毫秒)。
*
* @param connTimeout 連接超時時長(毫秒)伪冰。
*/
public void setConnTimeout(int connTimeout) {
this.connTimeout = connTimeout;
}
/**
* 獲取數(shù)據(jù)讀取超時時長(毫秒)誓酒。
*
* @return 數(shù)據(jù)讀取超時時長(毫秒)。
*/
public int getReadTimeout() {
return readTimeout;
}
/**
* 設(shè)置數(shù)據(jù)讀取超時時長(毫秒)贮聂。
*
* @param readTimeout 數(shù)據(jù)讀取超時時長(毫秒)靠柑。
*/
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}
/**
* 獲取日期格式。
*
* @return 日期格式寂汇。
*/
public String getDateFormat() {
return dateFormat;
}
/**
* 設(shè)置日期格式病往。
*
* @param dateFormat 日期格式。
*/
public void setDateFormat(String dateFormat) {
this.dateFormat = dateFormat;
}
/**
* 獲取訪問賬號骄瓣。
*
* @return 訪問賬號停巷。
*/
public String getUser() {
return user;
}
/**
* 設(shè)置訪問賬號。
*
* @param user 訪問賬號榕栏。
*/
public void setUser(String user) {
this.user = user;
}
/**
* 獲取訪問密碼畔勤。
*
* @return 訪問密碼。
*/
public String getPass() {
return pass;
}
/**
* 設(shè)置訪問訪問密碼扒磁。
*
* @param pass 訪問密碼庆揪。
*/
public void setPass(String pass) {
this.pass = pass;
}
/**
* 獲取Jest連接客戶端。
*
* @return Jest客戶端妨托。
*/
public JestClient getJestClient() {
if (jestClient == null) {
jestClient = this.createJestClient();
}
return jestClient;
}
/**
* 創(chuàng)建Jest客戶端實例缸榛。
*
* @return Jest客戶端實例吝羞。
*/
protected JestClient createJestClient() {
JestClientFactory factory = new JestClientFactory();
if (this.getUser() != null && !this.getUser().equals("") && this.getPass() != null
&& !this.getPass().equals("")) {
factory.setHttpClientConfig(new HttpClientConfig.Builder(serverUris).multiThreaded(multiThreaded)
.connTimeout(connTimeout).readTimeout(readTimeout)
.gson(new GsonBuilder().setDateFormat(dateFormat).create()).defaultCredentials(user, pass).build());
this.jestClient = factory.getObject();
} else {
factory.setHttpClientConfig(new HttpClientConfig.Builder(serverUris).multiThreaded(multiThreaded)
.connTimeout(connTimeout).readTimeout(readTimeout)
.gson(new GsonBuilder().setDateFormat(dateFormat).create()).build());
this.jestClient = factory.getObject();
}
return this.jestClient;
}
}
3、編寫 Elasticsearch 會話類 "SearchSession.java"内颗。
/**
*
* ElasticSearch Libs
*
* ?2020 張毅
*
*/
import java.util.Collection;
import java.util.List;
import java.util.Map;
import com.google.gson.JsonObject;
import io.searchbox.action.BulkableAction;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.cluster.GetSettings;
import io.searchbox.cluster.Health;
import io.searchbox.cluster.NodesInfo;
import io.searchbox.cluster.NodesStats;
import io.searchbox.core.Bulk;
import io.searchbox.core.Count;
import io.searchbox.core.CountResult;
import io.searchbox.core.Delete;
import io.searchbox.core.DeleteByQuery;
import io.searchbox.core.Get;
import io.searchbox.core.Index;
import io.searchbox.core.MultiGet;
import io.searchbox.core.Search;
import io.searchbox.core.Search.Builder;
import io.searchbox.core.SearchResult;
import io.searchbox.core.SearchScroll;
import io.searchbox.core.Update;
import io.searchbox.core.UpdateByQuery;
import io.searchbox.indices.ClearCache;
import io.searchbox.indices.CloseIndex;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.Flush;
import io.searchbox.indices.IndicesExists;
import io.searchbox.indices.OpenIndex;
import io.searchbox.indices.Optimize;
import io.searchbox.indices.aliases.AddAliasMapping;
import io.searchbox.indices.aliases.AliasExists;
import io.searchbox.indices.aliases.GetAliases;
import io.searchbox.indices.aliases.ModifyAliases;
import io.searchbox.indices.aliases.RemoveAliasMapping;
import io.searchbox.indices.mapping.DeleteMapping;
import io.searchbox.indices.mapping.GetMapping;
import io.searchbox.indices.mapping.PutMapping;
import io.searchbox.indices.settings.UpdateSettings;
import io.searchbox.indices.template.DeleteTemplate;
import io.searchbox.indices.template.GetTemplate;
import io.searchbox.indices.template.PutTemplate;
import io.searchbox.params.Parameters;
/**
* ElasticSearch 會話類 (ElasticSearch Session)
*/
public class SearchSession {
/** Jest客戶端 */
protected final JestClient jestClient;
/**
* 獲取Jest連接客戶端钧排。
*
* @return Jest客戶端。
*/
protected JestClient getJestClient() {
return jestClient;
}
/**
* 構(gòu)造器均澳。
*
* @param factory ElasticSearch創(chuàng)建工廠恨溜。
*/
public SearchSession(SearchSessionFactory factory) {
this.jestClient = factory.getJestClient();
}
/**
* 執(zhí)行批量事務(wù)。
*
* @param actions 事務(wù)動作集合找前。
* @return 執(zhí)行結(jié)果模型糟袁。
*/
@SuppressWarnings("rawtypes")
public JestResult executeBulk(Collection<? extends BulkableAction> actions) {
try {
Bulk bulk = new Bulk.Builder().addAction(actions).build();
JestResult jestResult = jestClient.execute(bulk);
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 優(yōu)化。
*
* @return 執(zhí)行結(jié)果模型躺盛。
*/
public JestResult optimize() {
try {
JestResult jestResult = jestClient.execute(new Optimize.Builder().build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 刷新项戴。
*
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult flush() {
try {
JestResult jestResult = jestClient.execute(new Flush.Builder().build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 清除緩存槽惫。
*
* @return 執(zhí)行結(jié)果模型肯尺。
*/
public JestResult clearCache() {
try {
JestResult jestResult = jestClient.execute(new ClearCache.Builder().build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取集群健康信息。
*
* @return 執(zhí)行結(jié)果模型躯枢。
*/
public JestResult getClusterHealth() {
try {
JestResult jestResult = jestClient.execute(new Health.Builder().build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取節(jié)點信息则吟。
*
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult getNodesInfo() {
try {
JestResult jestResult = jestClient.execute(new NodesInfo.Builder().build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取節(jié)點狀態(tài)信息锄蹂。
*
* @return 執(zhí)行結(jié)果模型氓仲。
*/
public JestResult getNodesStats() {
try {
JestResult jestResult = jestClient.execute(new NodesStats.Builder().build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 更新配置。
*
* @param setting 配置更新腳本JSON字符串得糜。敬扛。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult updateSettings(String setting) {
try {
JestResult jestResult = jestClient.execute(new UpdateSettings.Builder(setting).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 更新配置朝抖。
*
* @param setting 配置更新腳本JSON模型啥箭。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult updateSettings(JsonObject setting) {
return this.updateSettings(setting.toString());
}
/**
* 獲取配置治宣。
*
* @return 執(zhí)行結(jié)果模型急侥。
*/
public JestResult geSettings() {
try {
JestResult jestResult = jestClient.execute(new GetSettings.Builder().build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 創(chuàng)建索引。
*
* @param index 索引名字侮邀。
* @return 執(zhí)行結(jié)果模型坏怪。
*/
public JestResult createIndex(String index) {
try {
if (this.indicesExists(index).isSucceeded()) {
this.deleteIndex(index);
}
JestResult jestResult = jestClient.execute(new CreateIndex.Builder(index).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 刪除索引。
*
* @param index 索引名字绊茧。
* @return 執(zhí)行結(jié)果模型铝宵。
*/
public JestResult deleteIndex(String index) {
try {
JestResult jestResult = jestClient.execute(new DeleteIndex.Builder(index).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 判斷索引是否存在。
*
* @param index 索引名字华畏。
* @return 執(zhí)行結(jié)果模型鹏秋。
*/
public JestResult indicesExists(String index) {
try {
JestResult jestResult = jestClient.execute(new IndicesExists.Builder(index).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 關(guān)閉索引尊蚁。
*
* @param index 索引名字。
* @return 執(zhí)行結(jié)果模型侣夷。
*/
public JestResult closeIndex(String index) {
try {
JestResult jestResult = jestClient.execute(new CloseIndex.Builder(index).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 打開索引枝誊。
*
* @param index 索引名字。
* @return 執(zhí)行結(jié)果模型惜纸。
*/
public JestResult openIndex(String index) {
try {
JestResult jestResult = jestClient.execute(new OpenIndex.Builder(index).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 創(chuàng)建映射。
*
* @param index 索引名字绝骚。
* @param type 索引類型耐版。
* @param mapping 映射創(chuàng)建腳本JSON字符串。
* @return 執(zhí)行結(jié)果模型压汪。
*/
public JestResult putMappting(String index, String type, String mapping) {
try {
JestResult jestResult = jestClient.execute(new PutMapping.Builder(index, type, mapping).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 創(chuàng)建映射粪牲。
*
* @param index 索引名字。
* @param type 索引類型止剖。
* @param mapping 映射創(chuàng)建腳本JSON模型腺阳。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult putMappting(String index, String type, JsonObject mapping) {
return this.putMappting(index, type, mapping.toString());
}
/**
* 刪除映射穿香。
*
* @param index 索引名字亭引。
* @param type 索引類型。
* @return 執(zhí)行結(jié)果模型皮获。
*/
public JestResult deleteMappting(String index, String type) {
try {
JestResult jestResult = jestClient.execute(new DeleteMapping.Builder(index, type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取映射焙蚓。
*
* @param index 索引名字。
* @param type 索引類型洒宝。
* @return 執(zhí)行結(jié)果模型购公。
*/
public JestResult getMappting(String index, String type) {
try {
JestResult jestResult = jestClient.execute(new GetMapping.Builder().addIndex(index).addType(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 創(chuàng)建別名。
*
* @param indexes 索引名字列表雁歌。
* @param alias 別名宏浩。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult addAliasMapping(List<String> indexes, String alias) {
try {
AddAliasMapping build = new AddAliasMapping.Builder(indexes, alias).build();
JestResult jestResult = jestClient.execute(new ModifyAliases.Builder(build).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 刪除別名靠瞎。
*
* @param indexes 索引名字列表比庄。
* @param alias 別名。
* @return 執(zhí)行結(jié)果模型乏盐。
*/
public JestResult deleteAliasMapping(List<String> indexes, String alias) {
try {
RemoveAliasMapping build = new RemoveAliasMapping.Builder(indexes, alias).build();
JestResult jestResult = jestClient.execute(new ModifyAliases.Builder(build).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 判斷別名是否存在印蔗。
*
* @param alias 別名。
* @return 執(zhí)行結(jié)果模型丑勤。
*/
public JestResult aliasExists(String alias) {
try {
JestResult jestResult = jestClient.execute(new AliasExists.Builder().alias(alias).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取別名华嘹。
*
* @param indexes 索引名字列表。
* @param alias 別名法竞。
* @return 執(zhí)行結(jié)果模型耙厚。
*/
public JestResult getAliases(String index) {
try {
JestResult jestResult = jestClient.execute(new GetAliases.Builder().addIndex(index).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 創(chuàng)建模板强挫。
*
* @param name 模板名字。
* @param template 模板創(chuàng)建腳本JSON字符串薛躬。
* @return 執(zhí)行結(jié)果模型俯渤。
*/
public JestResult putTemplate(String name, String template) {
try {
JestResult jestResult = jestClient.execute(new PutTemplate.Builder(name, template).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 創(chuàng)建模板。
*
* @param name 模板名字型宝。
* @param template 模板創(chuàng)建腳本JSON模型八匠。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult putTemplate(String name, JsonObject template) {
return this.putTemplate(name, template.toString());
}
/**
* 刪除模板趴酣。
*
* @param name 模板名字梨树。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult deleteTemplate(String name) {
try {
JestResult jestResult = jestClient.execute(new DeleteTemplate.Builder(name).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取模板岖寞。
*
* @param name 模板名字抡四。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult getTemplate(String name) {
try {
JestResult jestResult = jestClient.execute(new GetTemplate.Builder(name).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 插入文檔仗谆。
*
* @param doc 文檔指巡。
* @param index 索引名字。
* @param type 索引類型隶垮。
* @return 執(zhí)行結(jié)果模型藻雪。
*/
public JestResult insertDocuments(List<Map<String, Object>> docs, String index, String type) {
try {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index);
for (Map<String, Object> doc : docs) {
Index action;
if (doc.get("id") != null) {
action = new Index.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build();
bulk.addAction(action);
} else {
action = new Index.Builder(doc).index(index).build();
bulk.addAction(action);
}
}
JestResult jestResult = jestClient.execute(bulk.build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 插入文檔。
*
* @param doc 文檔狸吞。
* @param id 文檔ID阔涉。
* @param index 索引名字。
* @param type 索引類型捷绒。
* @return 執(zhí)行結(jié)果模型瑰排。
*/
public JestResult insertDocument(Map<String, Object> doc, String id, String index, String type) {
try {
JestResult jestResult = jestClient.execute(new Index.Builder(doc).id(id).index(index).type(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 插入文檔。
*
* @param doc 文檔暖侨。
* @param index 索引名字椭住。
* @param type 索引類型。
* @return 執(zhí)行結(jié)果模型字逗。
*/
public JestResult insertDocument(Map<String, Object> doc, String index, String type) {
try {
if (doc.get("id") != null) {
JestResult jestResult = jestClient
.execute(new Index.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} else {
JestResult jestResult = jestClient.execute(new Index.Builder(doc).index(index).type(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 更新文檔京郑。
*
* @param doc 文檔。
* @param index 索引名字葫掉。
* @param type 索引類型些举。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult updateDocuments(List<Map<String, Object>> docs, String index, String type) {
try {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index);
for (Map<String, Object> doc : docs) {
Update action;
if (doc.get("id") != null) {
action = new Update.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build();
bulk.addAction(action);
}
}
JestResult jestResult = jestClient.execute(bulk.build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 更新文檔俭厚。
*
* @param doc 文檔户魏。
* @param id 文檔ID。
* @param index 索引名字。
* @param type 索引類型叼丑。
* @return 執(zhí)行結(jié)果模型关翎。
*/
public JestResult updateDocument(Map<String, Object> doc, String id, String index, String type) {
try {
JestResult jestResult = jestClient.execute(new Update.Builder(doc).id(id).index(index).type(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 更新文檔。
*
* @param doc 文檔鸠信。
* @param id 文檔ID纵寝。
* @param index 索引名字。
* @param type 索引類型星立。
* @return 執(zhí)行結(jié)果模型爽茴。
*/
public JestResult updateDocument(Map<String, Object> doc, String index, String type) {
try {
if (doc.get("id") != null) {
JestResult jestResult = jestClient
.execute(new Update.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
}
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 更新文檔。
*
* @param query 查詢腳本JSON字符串绰垂。
* @param index 索引名字室奏。
* @param type 索引類型。
* @return 執(zhí)行結(jié)果模型辕坝。
*/
public JestResult updateDocumentByQuery(String query, String index, String type) {
try {
JestResult jestResult = jestClient
.execute(new UpdateByQuery.Builder(query).addIndex(index).addType(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 更新文檔。
*
* @param query 查詢腳本JSON模型荐健。
* @param index 索引名字酱畅。
* @param type 索引類型。
* @return 執(zhí)行結(jié)果模型江场。
*/
public JestResult updateDocumentByQuery(JsonObject query, String index, String type) {
try {
JestResult jestResult = jestClient
.execute(new UpdateByQuery.Builder(query.toString()).addIndex(index).addType(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 刪除文檔纺酸。
*
* @param ids 文檔ID。
* @param index 索引名字址否。
* @param type 索引類型餐蔬。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult deleteDocuments(String[] ids, String index, String type) {
try {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
for (String id : ids) {
Delete action = new Delete.Builder(id).build();
bulk.addAction(action);
}
JestResult jestResult = jestClient.execute(bulk.build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 刪除文檔佑附。
*
* @param id 文檔ID樊诺。
* @param index 索引名字。
* @param type 索引類型音同。
* @return 執(zhí)行結(jié)果模型词爬。
*/
public JestResult deleteDocument(String id, String index, String type) {
try {
JestResult jestResult = jestClient.execute(new Delete.Builder(id).index(index).type(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 刪除文檔。
*
* @param query 查詢腳本权均。
* @param index 索引名字顿膨。
* @param type 索引類型。
* @return 執(zhí)行結(jié)果模型叽赊。
*/
public JestResult deleteDocumentByQuery(String query, String index, String type) {
try {
JestResult jestResult = jestClient
.execute(new DeleteByQuery.Builder(query).addIndex(index).addType(index).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 刪除文檔恋沃。
*
* @param query 查詢腳本JSON模型。
* @param index 索引名字必指。
* @param type 索引類型囊咏。
* @return 執(zhí)行結(jié)果模型。
*/
public JestResult deleteDocumentByQuery(JsonObject query, String index, String type) {
try {
JestResult jestResult = jestClient
.execute(new DeleteByQuery.Builder(query.toString()).addIndex(index).addType(index).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取指定ID的文檔。
*
* @param ids 文檔ID集合匆笤。
* @param index 索引名字研侣。
* @param type 索引類型。
* @return 指定ID的文檔JSON模型集合炮捧。
*/
public List<JsonObject> getDocumentsByIds(List<String> ids, String index, String type) {
try {
JestResult jestResult = jestClient.execute(new MultiGet.Builder.ById(index, type).addId(ids).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
List<JsonObject> docs = jestResult.getSourceAsObjectList(JsonObject.class);
return docs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取指定ID庶诡。
*
* @param id 文檔ID。
* @param index 索引名字咆课。
* @param type 索引類型末誓。
* @return 指定ID的文檔JSON模型。
*/
public JsonObject getDocumentsById(String id, String index, String type) {
try {
JestResult jestResult = jestClient.execute(new Get.Builder(index, id).type(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
JsonObject doc = jestResult.getSourceAsObject(JsonObject.class);
return doc;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取文檔數(shù)量书蚪。
*
* @param search 查詢腳本JSON字符串喇澡。
* @param index 索引名字。
* @param type 索引類型殊校。
* @return 文檔數(shù)量晴玖。
*/
public Double getDocumentCount(String query, String index, String type) {
try {
CountResult jestResult = jestClient
.execute(new Count.Builder().query(query).addIndex(index).addType(type).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult.getCount();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取文檔數(shù)量。
*
* @param query 查詢腳本JSON模型为流。
* @param index 索引名字呕屎。
* @param type 索引類型。
* @return 文檔數(shù)量敬察。
*/
public Double getDocumentCount(JsonObject query, String index, String type) {
return this.getDocumentCount(query.toString(), index, type);
}
/**
* 查詢文檔秀睛。
*
* @param query 查詢腳本JSON字符串。
* @param from 頁起點莲祸。
* @param size 頁長度蹂安。
* @param index 索引名字。
* @param type 索引類型锐帜。
* @return 查詢結(jié)果模型田盈。
*/
public SearchResult search(String query, Integer from, Integer size, String index, String type) {
try {
Builder builder = new Search.Builder(query).addIndex(index).addType(type)
.setParameter(Parameters.SIZE, size != null && size > 0 ? size : 15)
.setParameter(Parameters.FROM, from != null && from > 0 ? from : 0);
SearchResult result = jestClient.execute(builder.build());
if (!result.isSucceeded()) {
throw new Exception(result.getErrorMessage());
}
return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 查詢文檔。
*
* @param query 查詢腳本JSON模型缴阎。
* @param from 頁起點缠黍。
* @param size 頁長度。
* @param index 索引名字药蜻。
* @param type 索引類型阐滩。
* @return 查詢結(jié)果模型抡锈。
*/
public SearchResult search(JsonObject query, Integer from, Integer size, String index, String type) {
return this.search(query.toString(), from, size, index, type);
}
/**
* 向后滾動查詢文檔滤馍。
*
* @param scrollId 向后滾動ID拓劝。
* @param scroll 有效時間,如:"5m"踱卵。
* @return 查詢結(jié)果模型廊驼。
*/
public JestResult search(String scrollId, String scroll) {
try {
JestResult jestResult = jestClient.execute(new SearchScroll.Builder(scrollId, scroll).build());
if (!jestResult.isSucceeded()) {
throw new Exception(jestResult.getErrorMessage());
}
return jestResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 向后滾動查詢文檔据过。
*
* @param scrollId 向后滾動ID。
* @return 查詢結(jié)果模型妒挎。
*/
public JestResult search(String scrollId) {
return this.search(scrollId, "5m");
}
/**
* 實例回收方法绳锅。
*/
@Override
protected void finalize() throws Throwable {
if (this.jestClient != null) {
this.jestClient.close();
}
super.finalize();
}
}
4、編寫主程序酝掩。
1)編寫 Elasticsearch 連接文件 "elasticsearch.properties" 鳞芙。
es.serverUri=http://192.168.216.128:9200
es.multiThreaded=true
es.connTimeout=10000
es.readTimeout=10000
es.dateFormat=yyyy-MM-dd HH:mm:ss
es.user=elastic
es.pass=elastic
2)編寫主程序文件 "JestApp.java"。
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.ResourceBundle;
public class JestApp {
public static void main( String[] args ) {
// 獲取 Elasticsearch 連接文件信息
ResourceBundle resource = ResourceBundle.getBundle("elasticsearch");
String serverUri = resource.getString("serverUri");
String multiThreaded = resource.getString("multiThreaded");
String connTimeout = resource.getString("connTimeout");
String readTimeout = resource.getString("readTimeout");
String dateFormat = resource.getString("dateFormat");
String user = resource.getString("user");
String pass = resource.getString("pass");
// 定義 Elasticsearch 連接工廠的實例
SearchSessionFactory factory = new SearchSessionFactory();
factory.setServerUri(serverUri);
factory.setMultiThreaded(Boolean.valueOf(multiThreaded));
factory.setConnTimeout(Integer.valueOf(connTimeout));
factory.setReadTimeout(Integer.valueOf(readTimeout));
factory.setDateFormat(dateFormat);
factory.setUser(user);
factory.setPass(pass);
// 定義 Elasticsearch 連接會話
SearchSession session = new SearchSession(factory);
// 索引名稱
String index = "pgsql_index";
// 索引文檔類型期虾,固定值
String type = "_doc";
// 文檔ID
String id = "document_id";
// 文檔內(nèi)容原朝,一個空文檔。
Map<String, Object> doc = new LinkedHashMap<String, Object>();
// 創(chuàng)建索引
session.createIndex(index);
// 刪除索引
session.deleteIndex(index);
// 驗證索引是否存在
session.indicesExists(index);
// 打開索引
session.openIndex(index);
// 關(guān)閉索引
session.closeIndex(index);
// 插入文檔
session.insertDocument(doc, id, index, type);
// 更新文檔
session.updateDocument(doc, id, index, type);
// 刪除文檔
session.deleteDocument(id, index, type);
// 獲取指定ID的文檔
session.getDocumentsById(id, index, type);
// 查詢?nèi)繑?shù)據(jù)的前10個文檔
String query = "{\"query\": {\"match_all\": {}}}";
session.search(query, 0, 10, index, type);
}
}
7.3.ELK + Jest 最佳集成方案
Elasticsearch 作為 OLAP 數(shù)據(jù)庫镶苞,它的數(shù)據(jù)一般都是來源于業(yè)務(wù)系統(tǒng)的持久化數(shù)據(jù)庫中喳坠,如:PostgreSQL、MySQL茂蚓、Oracle壕鹉、SQLServer 等 RDBMS 數(shù)據(jù)庫或 MongoDB 等 NoSQL數(shù)據(jù)庫×牵基于類似場景晾浴,建議通過以下方式來完成 Elasticsearch、Kibana牛郑、Logstash怠肋、Jest 的集成使用:
1敬鬓、使用 Kibana 創(chuàng)建和管理 Elasticsearch 模板淹朋。
Elasticsearch 支持通過模板來定義索引格式,模板可以應(yīng)用于索引名稱中包含指定字符的索引钉答,它實現(xiàn)在創(chuàng)建索引時础芍,將索引中包含指定字符的屬性設(shè)置為特定的格式。使用 Kibana 在 Elasticsearch 中一次性創(chuàng)建模板数尿。例如:
PUT _template/pgsql_template
{
"version": 1,
"order": 1,
"index_patterns": [
"pgsql-*"
],
"settings": {
"index": {
"analysis": {
"analyzer": {
"ik_smart_analyzer": {
"tokenizer": "ik_smart"
}
}
},
"number_of_shards": "3",
"number_of_replicas": "3",
"refresh_interval": "5s"
}
},
"mappings": {
"dynamic_templates": [
{
"tag_field": {
"mapping": {
"norms": false,
"type": "keyword"
},
"match_mapping_type": "string",
"match": "*_tag"
}
},
{
"msg_field": {
"mapping": {
"anaylzer": "ik_smart_analyzer",
"norms": false,
"type": "text"
},
"match_mapping_type": "string",
"match": "*_msg"
}
},
{
"str_fields": {
"mapping": {
"anaylzer": "ik_smart_analyzer",
"norms": false,
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"match_mapping_type": "string",
"match": "*"
}
}
]
}
}
在上面的模板設(shè)置中仑性,它設(shè)置了以下內(nèi)容:
1)"index_patterns" 中定義將索引名稱前綴為 "pgsql-" 的索引在創(chuàng)建時應(yīng)用此模板。
2) "settings" 中定義了索引使用的分詞器右蹦、分片和刷新時間等參數(shù)诊杆。
3) "mappings" 中定義了以下規(guī)則:
- 名稱以 "_tag" 結(jié)尾的屬性,其類型為不支持分詞的 keyword 數(shù)組何陆;
- 名稱以 "_msg" 結(jié)尾的屬性晨汹,其類型為支持中文分詞、拼音分詞的文本贷盲,且對全部分詞進行索引淘这;
- 其他所有屬性,其類型為支持中文分詞、拼音分詞的 text 文本铝穷,但只對前256個字符進行索引钠怯。
2、使用 Logstash 創(chuàng)建索引和插入曙聂、更新文檔晦炊。
Logstash 可以高效的從數(shù)據(jù)庫層面完成從業(yè)務(wù)數(shù)據(jù)庫抽取數(shù)據(jù),經(jīng)過簡單的過濾處理之后筹陵,向 Elasticsearch 中根據(jù)文檔ID來插入或更新文檔刽锤,但不能物理刪除文檔。在插入文檔時朦佩,如果索引不存在則自動創(chuàng)建索引并思。
3、使用 Jest 刪除索引语稠、文檔宋彼。
因為 Logstash 不能完成物理刪除文檔的處理,因此應(yīng)用在業(yè)務(wù)數(shù)據(jù)庫中進行物理數(shù)據(jù)刪除操作時仙畦,需要使用 Jest 調(diào)用 API 同步將這些數(shù)據(jù)在 Elasticsearch 中刪除输涕。