數(shù)據(jù)庫架構(gòu)之【ELK(Elasticsearch+Logstash+Kibana)】OLAP 集群方案

ELK 是三個開源項目的首字母縮寫满粗,這三個項目分別是:Elasticsearch爱谁、Logstash 和 Kibana晒喷。它是一套以 Elasticsearch 為核心的 OLAP 集成解決方案。

本方案基于CentOS8系統(tǒng)設(shè)計访敌,建議在RedHat/CentOS系統(tǒng)中使用凉敲。方案使用服務(wù)器及網(wǎng)絡(luò)資源較多,建議在實施前做好規(guī)劃工作寺旺,有利于部署工作順利爷抓、有序進行。

目錄

1.前言

2.集群部署拓撲圖

3.Elasticsearch 單點安裝和配置
3.1.安裝和配置
3.2.安全性擴展
3.3.中文分詞擴展

4.Elasticsearch 集群化
4.1.Elasticsearch 集群的概念
4.2.Elasticsearch 集群的示例

5.Kibana 安裝和配置

6.Logstash 安裝和作業(yè)部署

7.Java 開發(fā)集成
7.1.ES 編程語言
7.2.ES + Jest 可復(fù)用類庫
7.3.ELK + Jest 最佳集成方案


1.前言

1阻塑、"集群"指的是 Elasticsearch 組成的集群蓝撇,實現(xiàn)負載均衡,故障轉(zhuǎn)移陈莽,實時熱備和讀寫分離渤昌。但與 Logstash 和 Kibana 無關(guān)。

2传透、核心組件簡介

1)Elasticsearch: 一個搜索和分析引擎耘沼,提供搜集、分析朱盐、存儲數(shù)據(jù)三大功能群嗤。具有分布式,零配置兵琳,自動發(fā)現(xiàn)狂秘,索引自動分片,索引副本機制躯肌,Restful風(fēng)格接口者春,多數(shù)據(jù)源,自動搜索負載等特點清女。Elasticsearch 能夠?qū)B級海量數(shù)據(jù)進行近實時的處理(從寫入到查詢的延時≈1秒)钱烟,且能夠支持數(shù)百臺服務(wù)器節(jié)點的分布式集群架構(gòu)。

2)IKAnalyzer :一個開源的,基于Java語言開發(fā)的輕量級的中文分詞工具包拴袭。

2)Logstash: 一個服務(wù)器端數(shù)據(jù)處理管道读第,能夠同時從多個來源采集數(shù)據(jù),轉(zhuǎn)換數(shù)據(jù)拥刻,然后將數(shù)據(jù)發(fā)送到 Elasticsearch 中怜瞒。Logstash 通過部署JOB來完成工作,JOB工作流程如下:

Logstash JOB 工作流程

3)Kibana: 一個 Elasticsearch 的可視化管理用戶接口般哼,使用圖形和圖表對數(shù)據(jù)進行可視化吴汪。

3、ELK 適用于以下應(yīng)用場景:

1)普通用于具有全文檢索蒸眠、海量數(shù)據(jù)檢索類型的應(yīng)用或功能漾橙,作為OLAP數(shù)據(jù)庫支撐業(yè)務(wù);

2)可以用于數(shù)據(jù)分析類型的應(yīng)用或功能黔宛,且能夠滿足較高的實時性要求近刘,作為OLAP數(shù)據(jù)庫支撐業(yè)務(wù);

3)可以用于具有高并發(fā)讀寫臀晃,弱ACID管理觉渴,弱安全性控制,小型文檔型數(shù)據(jù)特點的應(yīng)用或功能徽惋,作為數(shù)據(jù)庫支撐業(yè)務(wù)案淋。

補充知識:ACID是數(shù)據(jù)庫事務(wù)的4個特征,分別是原子性、一致性险绘、隔離性和持久性踢京。


2.集群部署拓撲圖

ELK集群部署拓撲圖

網(wǎng)絡(luò)資源規(guī)劃:

1、ES Node 1 : OLAP 數(shù)據(jù)庫服務(wù)器

1)操作系統(tǒng):CentOS8宦棺;

2)IP地址和端口號:192.168.216.128:9200瓣距;

3)主機名:ES-1;

4)數(shù)據(jù)庫:Elasticsearch代咸;

5)角色:OLAP 數(shù)據(jù)庫集群節(jié)點蹈丸。

2、ES Node 2 :OLAP 數(shù)據(jù)庫服務(wù)器

1)操作系統(tǒng):CentOS8呐芥;

2)IP地址和端口號:192.168.216.129:9200逻杖;

3)主機名:ES-2;

4)數(shù)據(jù)庫:Elasticsearch思瘟;

5)角色:OLAP 數(shù)據(jù)庫集群節(jié)點荸百。

3、ES Node 3 :OLAP 數(shù)據(jù)庫服務(wù)器

1)操作系統(tǒng):CentOS8滨攻;

2)IP地址和端口號:192.168.216.130:9200够话;

3)主機名:ES-3蓝翰;

4)數(shù)據(jù)庫:Elasticsearch;

5)角色:OLAP 數(shù)據(jù)庫集群節(jié)點更鲁。

4霎箍、Kinaba: 可視化管理客戶端

1)操作系統(tǒng):CentOS8;

2)IP地址和端口號:192.168.216.131:5601澡为;

3)主機名:ESVM;

4)中間件:Kibana景埃;

5)角色:OLAP 數(shù)據(jù)庫可視化管理器媒至。

5、Logstash : 數(shù)據(jù) ETL 服務(wù)器

1)操作系統(tǒng):CentOS8谷徙;

2)IP地址:192.168.216.132拒啰;

3)主機名:ETL;

4)中間件:Logstash完慧;

5)角色:數(shù)據(jù)采集器谋旦。

6、外部數(shù)據(jù)源: 作為 OLAP 數(shù)據(jù)分析的外部數(shù)據(jù)來源數(shù)據(jù)庫屈尼,如:PostgreSQL册着、MySQL 等 RDBMS 數(shù)據(jù)庫 。IP地址:192.168.216.133


3.Elasticsearch 單點安裝和配置

3.1.安裝和配置

以"ES Node 1"節(jié)點為例:

1脾歧、打開 Elasticsearch 官方網(wǎng)站下載頁面【https://www.elastic.co/cn/downloads/elasticsearch】甲捏,下載 Elasticsearch 的編譯程序 tar 包到用戶主目錄中。

Elasticsearch 官方下載頁面

2鞭执、解壓縮編譯程序 tar 包到"/usr/local"目錄中司顿。

[centos@ES-1 ~]$ sudo tar zxvf elasticsearch-7.6.2-linux-x86_64.tar.gz -C /usr/local
[centos@ES-1 ~]$ ll /usr/local
drwxr-xr-x. 9 root root 155 3月  26 14:36 elasticsearch-7.6.2

3、創(chuàng)建 Elastcsearch 的數(shù)據(jù)存儲目錄和日志存儲目錄兄纺。

[centos@ES-1 ~]$ sudo mkdir -p /data/elasticsearch/data
[centos@ES-1 ~]$ sudo mkdir -p /data/elasticsearch/logs

4大溜、創(chuàng)建 ELK 管理用戶和組,并設(shè)置為程序安裝目錄估脆、數(shù)據(jù)存儲目錄的擁有者钦奋。

[centos@ES-1 ~ ]$ sudo id elastic
id: “elastic”:無此用戶
[centos@ES-1 ~ ]$ sudo groupadd elastic
[centos@ES-1 ~ ]$ sudo useradd -g elastic -s /bin/false elastic
[centos@ES-1 ~ ]$ sudo chown -R elastic:elastic /usr/local/elasticsearch-7.6.2
[centos@ES-1 ~ ]$ sudo chown -R elastic:elastic /data/elasticsearch

ELK組件的管理用戶和組(如:"elastic")可以共享使用,創(chuàng)建用戶和組之前首先查詢一下賬戶是否存在旁蔼。如果賬戶和組已經(jīng)存在锨苏,可能是因為在安裝其他ELK組件時已創(chuàng)建,這種情況下不必重復(fù)創(chuàng)建用戶和組棺聊,只需要更改程序安裝目錄伞租、數(shù)據(jù)存儲目錄的擁有者即可。

5限佩、設(shè)置 Elastcsearch 的配置文件參數(shù)葵诈。

使用文本編輯器打開配置文件:

[centos@ES-1 ~]$ sudo gedit /usr/local/elasticsearch-7.6.2/config/elasticsearch.yml

修改或驗證文件中的以下參數(shù)并保存:

# --常用設(shè)置項--

node.name: es-node-1
# 設(shè)置當前ES節(jié)點的名稱裸弦,默認隨機指定一個預(yù)置列表中名字。

path.data: /data/elasticsearch/data
# 設(shè)置索引數(shù)據(jù)的存儲路徑作喘,默認是ES根目錄下的data文件夾理疙。可以設(shè)置多個存儲路徑泞坦,用逗號隔開窖贤,例:
# path.data: /path/to/data1,/path/to/data2

path.logs: /data/elasticsearch/logs
# 設(shè)置日志文件的存儲路徑,默認是ES根目錄下的logs文件夾 贰锁。

network.host: 0.0.0.0
# 設(shè)置ES服務(wù)的監(jiān)聽網(wǎng)絡(luò)地址(IPv4 或 IPv6)赃梧,默認是示例地址逮刨。

http.port: 9200
# 設(shè)置ES服務(wù)的HTTP端口反惕,默認為9200。

transport.tcp.port: 9300
# 設(shè)置節(jié)點之間交互的TCP端口诫钓,默認是9300锣险。

http.cors.enabled: true
http.cors.allow-origin: "*"
# 設(shè)置允許HTTP跨域訪問

discovery.seed_hosts: ["127.0.0.1", "[::1]"]
# 設(shè)置集群節(jié)點發(fā)現(xiàn)清單蹄皱,默認為IPv4和IPv6回路地址。

cluster.initial_master_nodes: ["es-node-1"]
# 設(shè)置集群初始化主要節(jié)點清單芯肤,默認是node-1巷折。


# --其他可用設(shè)置項--

# cluster.name: elasticsearch-cluster
# 設(shè)置的集群名稱,默認是elasticsearch纷妆。ES通過廣播方式自動發(fā)現(xiàn)同一網(wǎng)段其他節(jié)點盔几,若同一網(wǎng)段下有多個集群,則通過集群名稱區(qū)分不同的集群掩幢。

# node.master: true
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點逊拍,默認是true。ES默認集群中的第一個節(jié)點為主要節(jié)點际邻,如果這個節(jié)點故障就會重新選舉主要節(jié)點芯丧。

# node.data: true
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù),默認為true世曾。

# path.conf: /path/to/conf
# 設(shè)置配置文件的存儲路徑缨恒,默認是ES根目錄下的config文件夾。

# path.work: /path/to/work
# 設(shè)置臨時文件的存儲路徑轮听,默認是ES根目錄下的work文件夾骗露。

# path.plugins: /path/to/plugins
# 設(shè)置插件的存放路徑,默認是ES根目錄下的plugins文件夾, 插件在ES里面普遍使用血巍,用來增強原系統(tǒng)核心功能萧锉。

# bootstrap.mlockall: true
# 設(shè)置為true來鎖住JVM內(nèi)存。

# network.bind_host: 0.0.0.0
# 設(shè)置監(jiān)聽的ip地址述寡,可以是IPv4或IPv6的柿隙,默認為0.0.0.0叶洞。可以綁定這臺機器的任何一個IP地址禀崖。

# network.publish_host: 0.0.0.0
# 設(shè)置其它節(jié)點和該節(jié)點交互的IP地址衩辟,如果不設(shè)置它會自動判斷,值必須是個真實的IP地址波附。

# transport.tcp.compress: true
# 設(shè)置是否壓縮TCP傳輸時的數(shù)據(jù)艺晴,默認為false。

# http.max_content_length: 100mb
# 設(shè)置HTTP協(xié)議請求內(nèi)容的最大容量掸屡,默認100mb财饥。

# http.enabled: true
# 設(shè)置是否使用HTTP協(xié)議對外提供服務(wù),默認為true折晦。

# gateway.type: local
# 設(shè)置gateway的類型,默認為local即為本地文件系統(tǒng)沾瓦÷牛可以設(shè)置為本地文件系統(tǒng),分布式文件系統(tǒng)贯莺,hadoop的HDFS风喇,和amazon的s3服務(wù)器等。

# gateway.recover_after_nodes: 1
# 設(shè)置集群中N個節(jié)點啟動時進行數(shù)據(jù)恢復(fù)缕探,默認為1魂莫。

# gateway.recover_after_time: 5m
# 設(shè)置初始化數(shù)據(jù)恢復(fù)進程的超時時間,默認是5分鐘爹耗。

# gateway.expected_nodes: 2
# 設(shè)置這個集群中節(jié)點的數(shù)量耙考,默認為2,一旦這N個節(jié)點啟動潭兽,就會立即進行數(shù)據(jù)恢復(fù)倦始。

# cluster.routing.allocation.node_initial_primaries_recoveries: 4
# 初始化數(shù)據(jù)恢復(fù)時,并發(fā)恢復(fù)線程的個數(shù)山卦,默認為4鞋邑。

# cluster.routing.allocation.node_concurrent_recoveries: 2
# 添加刪除節(jié)點或負載均衡時并發(fā)恢復(fù)線程的個數(shù),默認為4账蓉。

# indices.recovery.max_size_per_sec: 0
# 設(shè)置數(shù)據(jù)恢復(fù)時限制的帶寬枚碗,如入100mb,默認為0铸本,即無限制肮雨。

# indices.recovery.concurrent_streams: 5
# 設(shè)置這個參數(shù)來限制從其它分片恢復(fù)數(shù)據(jù)時最大同時打開并發(fā)流的個數(shù),默認為5归敬。

# discovery.zen.ping.timeout: 3s
# 設(shè)置集群中自動發(fā)現(xiàn)其它節(jié)點時ping連接超時時間酷含,默認為3秒鄙早,對于比較差的網(wǎng)絡(luò)環(huán)境可以高點的值來防止自動發(fā)現(xiàn)時出錯。

# discovery.zen.ping.multicast.enabled: true
# 設(shè)置是否打開多播發(fā)現(xiàn)節(jié)點椅亚,默認是true限番。

# discovery.zen.ping.unicast.hosts: ["127.0.0.1", "[::1]"]
# 設(shè)置集群中主要節(jié)點的初始列表,可以通過這些節(jié)點來自動發(fā)現(xiàn)新加入集群的節(jié)點呀舔。

# discovery.zen.minimum_master_nodes: 1
# 設(shè)置集群有主要節(jié)點資格的數(shù)量弥虐,默認為1。對于大的集群來說媚赖,可以設(shè)置大一點的值(2-4)霜瘪。

# xpack.security.enabled: true
# xpack.security.transport.ssl.enabled: true
# xpack.security.transport.ssl.verification_mode: certificate
# xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
# xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
# 啟用 X-Pack 安全認證模塊,啟用前需要導(dǎo)出證書文件惧磺,如:elastic-certificates.p12颖对。

6、設(shè)置 Elasticsearch 運行的Linux系統(tǒng)參數(shù)磨隘。

1)在"/etc/sysctl.conf"文件中設(shè)置內(nèi)核參數(shù)缤底。

使用文本編輯器創(chuàng)建配置文件:

[centos@ES-1 ~ ]$ sudo gedit /etc/sysctl.conf

在文件中追加內(nèi)容并保存如下:

vm.max_map_count=655360

2)在"/etc/security/limits.conf"文件中設(shè)置用戶 SHELL 參數(shù)。
使用文本編輯器創(chuàng)建配置文件:

[centos@ES-1 ~ ]$ sudo gedit /etc/security/limits.conf

在文件中追加內(nèi)容并保存如下:

elastic soft nofile 65536
elastic hard nofile 65536

3)重新啟動操作系統(tǒng)番捂。

[centos@ES-1 ~ ]$ sudo reboot

7个唧、配置 Elasticsearch 服務(wù)開機自啟動。

使用文本編輯器創(chuàng)建配置文件:

[centos@ES-1 ~ ]$ sudo gedit /usr/lib/systemd/system/elasticsearch-server.service

編寫文件內(nèi)容并保存如下:

[Unit]
Description=Elasticsearch Server
After=syslog.target network.target

[Service]
User=elastic
Group=elastic

ExecStart=/usr/local/elasticsearch-7.6.2/bin/elasticsearch
ExecStop=/bin/kill -HUP $MAINPID
ExecReload=/bin/kill -HUP $MAINPID

KillMode=mixed
KillSignal=SIGINT

LimitNOFILE=100000
LimitNPROC=100000

[Install]
WantedBy=multi-user.target

設(shè)置開機啟動:

[centos@ES-1 ~]$ sudo systemctl daemon-reload
[centos@ES-1 ~]$ sudo systemctl enable elasticsearch-server.service

8设预、啟動 Elasticsearch 服務(wù)徙歼。

[centos@ES-1 ~]$ sudo systemctl start elasticsearch-server.service

9、設(shè)置防火墻端口(CentOS8默認安裝firewall防火墻)鳖枕,允許"9200"魄梯,"9300"端口(Elasticsearch 默認端口)訪問服務(wù)器。

[centos@ES-1 ~]$ sudo firewall-cmd --zone=public --add-port=9200/tcp --permanent
[centos@ES-1 ~]$ sudo firewall-cmd --zone=public --add-port=9300/tcp --permanent
[centos@ES-1 ~]$ sudo firewall-cmd --reload

10耕魄、驗證 Elasticsearch 服務(wù)正常運行画恰。使用瀏覽器客戶端訪問 Elasticsearch 的 Http 服務(wù)接口進行測試,輸入:http://<IP>:<PORT>

Elastisearch 服務(wù)正常運行反饋消息

11吸奴、Elasticsearch 運維管理

1)啟動 Elasticsearch 服務(wù)(任選一種方式)

[centos@ES-1 ~]$ sudo systemctl start elasticsearch-server.service

或者

[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch

2)停止 Elasticsearch 服務(wù)(任選一種方式)

[centos@ES-1 ~]$ sudo systemctl stop elasticsearch-server.service

或者

[centos@ES-1 ~]$ sudo ps -ef | grep Elasticsearch
elastic    3377      1  2 13:15 ?        00:00:37 /usr/local/elasticsearch-7.6.2/jdk/bin/java ...
[centos@ES-1 ~]$ sudo kill -9 3377

3)重啟 Elasticsearch 服務(wù)

[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service

或者

[centos@ES-1 ~]$ sudo ps -ef | grep Elasticsearch
elastic    3377      1  2 13:15 ?        00:00:37 /usr/local/elasticsearch-7.6.2/jdk/bin/java...
[centos@ES-1 ~]$ sudo kill -9 3377
[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch

4)查看 Elasticseach 服務(wù)狀態(tài)

[centos@ES-1 ~]$ sudo systemctl status elasticsearch-server.service

或者

[centos@ES-1 ~]$ sudo netstat -ntap | grep -E "9200|9300"
tcp6       0      0 :::9200                 :::*                    LISTEN      3377/java           
tcp6       0      0 :::9300                 :::*                    LISTEN      3377/java  

5)開啟 Elasticsearch 服務(wù)開機自啟動

[centos@ES-1 ~]$ sudo systemctl enable elasticsearch-server.service

6)禁用 Elasticsearch 服務(wù)開機自啟動

[centos@ES-1 ~]$ sudo systemctl disable elasticsearch-server.service

在多個實例并行的情況下允扇,全部實例共享使用管理用戶和組、Linux內(nèi)核配置则奥,但每個實例獨立安裝(包括獨立的程序安裝目錄和數(shù)據(jù)存儲目錄)考润、獨立端口、獨立啟動服務(wù)读处。

注意:其他"ES Node"節(jié)點上全部需要按照以上步驟配置糊治。


3.2.安全性擴展

Elasticsearch 集成了 X-Pack 作為安全性組件,直接開啟使用即可罚舱。

首先完成 Elasticsearch 的安裝和配置井辜。以"ES Node 1"節(jié)點為例:

1绎谦、導(dǎo)出 Elasticsearch X-Pack 證書文件。導(dǎo)出文件命令需要設(shè)置證書口令為空粥脚。

[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch-certutil cert -out /usr/local/elasticsearch-7.6.2/config/elastic-certificates.p12 -pass ""

2窃肠、修改 Elastcsearch 的配置文件參數(shù)。

使用文本編輯器打開配置文件:

[centos@ES-1 ~]$ sudo gedit /usr/local/elasticsearch-7.6.2/config/elasticsearch.yml

追加或驗證文件中的以下參數(shù)并保存:

xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12

3刷允、重新啟動 Elasticsearch 服務(wù)冤留。

[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service

4、設(shè)置 Elasticsearch 服務(wù)內(nèi)置賬號的口令树灶。

[centos@ES-1 ~]$ sudo -u elastic  /usr/local/elasticsearch-7.6.2/bin/elasticsearch-setup-passwords interactive

Initiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.
You will be prompted to enter passwords as the process progresses.
Please confirm that you would like to continue [y/N]y

Enter password for [elastic]: 
Reenter password for [elastic]: 
Enter password for [apm_system]: 
Reenter password for [apm_system]: 
Enter password for [kibana]: 
Reenter password for [kibana]: 
Enter password for [logstash_system]: 
Reenter password for [logstash_system]: 
Enter password for [beats_system]: 
Reenter password for [beats_system]: 
Enter password for [remote_monitoring_user]: 
Reenter password for [remote_monitoring_user]: 

Changed password for user [apm_system]
Changed password for user [kibana]
Changed password for user [logstash_system]
Changed password for user [beats_system]
Changed password for user [remote_monitoring_user]
Changed password for user [elastic]

5纤怒、驗證 X-Pack 認證。使用瀏覽器客戶端訪問 Elasticsearch 的 Http 服務(wù)接口進行測試天通,輸入:http://<IP>:<PORT>

X-Pack 認證界面

注意:其他 "ES Node" 節(jié)點上不需要執(zhí)行此步驟泊窘。但是需要將此服務(wù)器上已經(jīng)創(chuàng)建的 "elastic-certificates.p12" 文件和 "elasticsearch.keystore" 文件拷貝到其他"ES Node"節(jié)點的同一位置上,一致使用像寒。即:集群中所有節(jié)點的安全認證文件是一致的州既。


3.3.中文分詞擴展

在中文環(huán)境下使用 Elasticsearch 一般是需要擴展中文分詞器,建議使用 IkAnalyzer 萝映。IKAnalyzer 是一個開源的,基于 Java 語言開發(fā)的輕量級的中文分詞工具包阐虚。

首先完成 Elasticsearch 的安裝和配置序臂。以"ES Node 1"節(jié)點為例:

1、從【https://github.com/medcl/elasticsearch-analysis-ik/releases】下載 IKAnalyzer 的編譯程序 zip 包到用戶主目錄中实束。使用的 IKAnalyzer 分詞器的版本應(yīng)與 Elasticsearch 的版本保持一致奥秆。

IKAnalyzer 下載頁面

2、解壓縮編譯程序 zip 包到 Elasticsearch 程序安裝目錄下的"plugins"目錄下咸灿,并設(shè)置 ELK 管理用戶和組為擁有者构订。

[centos@ES-1 ~]$ sudo unzip elasticsearch-analysis-ik-7.6.2.zip -d /usr/local/elasticsearch-7.6.2/plugins/ik
[centos@ES-1 ~]$ sudo chown -R elastic:elastic /usr/local/elasticsearch-7.6.2/plugins/ik

3、重新啟動 Elasticsearch 服務(wù)避矢。

[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service

4悼瘾、驗證 IkAnalyzer 插件集成。使用瀏覽器客戶端訪問 Elasticsearch 的 Http 服務(wù)接口進行測試审胸,輸入:http://<IP>:<PORT>/_cat/plugins

IkAnalyzer 正確集成反饋消息

注意:其他"ES Node"節(jié)點上全部需要按照以上步驟配置亥宿。


4.Elasticsearch 集群化

4.1.Elasticsearch 集群的概念

1、Elasticsearch 集群的組成

1)節(jié)點(Node)和 集群(Cluster):節(jié)點是單個 Elastcsearch 服務(wù)砂沛,一般部署在單獨的服務(wù)器上烫扼。如果這些單個 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件) 中具有相同的"cluster.name"設(shè)置值,那就自動發(fā)現(xiàn)這些節(jié)點并組成一個集群碍庵,集群中允許有1個或者大于1個的單數(shù)節(jié)點組成映企。

2)文檔(Document)和 索引(Index):文檔是一組由 JSON 結(jié)構(gòu)組成的結(jié)構(gòu)化數(shù)據(jù)悟狱,它由若干屬性和值組成,而索引是存儲文檔的庫堰氓〖方ィ可以將索引比作 RDBMS 中僅有一個"表"的數(shù)據(jù)庫,文檔是"表"中的"行"豆赏,而文檔屬性是"行"中的"列"挣菲。

3)shard(分片):Elasticsearch 將一個索引切分為多個分片,在集群的節(jié)點中分布存儲掷邦,構(gòu)成分布式搜索機制白胀。因此 ES 具有橫向擴展的能力,可以存儲大量數(shù)據(jù)并且讓搜索和分析等操作分布到多臺服務(wù)器上去執(zhí)行抚岗,提升吞吐量和性能或杠。分片數(shù)量在建立索引時一次設(shè)置,不能修改宣蔚,默認5個向抢。

4)shard replica(分片副本):Elasticsearch 允許每個分片創(chuàng)建多個副本。分片副本可以在多個節(jié)點中冗余數(shù)據(jù)胚委,以保障在節(jié)點故障時數(shù)據(jù)不被丟失挟鸠,同時可以提升搜索操作的吞吐量和性能。分片副本數(shù)量可以隨時修改亩冬,默認1個艘希。

2、Elasticsearch 集群節(jié)點的類型

1)主節(jié)點:主節(jié)點負責創(chuàng)建索引硅急、刪除索引覆享、分配分片、追蹤集群中的節(jié)點狀態(tài)等工作营袜。Elasticsearch 中的主節(jié)點的工作量相對較輕撒顿,用戶的請求可以發(fā)往集群中任何一個節(jié)點,由該節(jié)點負責分發(fā)和返回結(jié)果荚板,而不需要經(jīng)過主節(jié)點轉(zhuǎn)發(fā)凤壁。而主節(jié)點是由候選主節(jié)點通過 ZenDiscovery 機制選舉出來的。

2)候選主節(jié)點:在 Elasticsearch 集群初始化或者主節(jié)點宕機的情況下跪另,從候選主節(jié)點中選舉其中一個作為主節(jié)點客扎。候選主節(jié)點的 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件)設(shè)置項特點:

node.master: true
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點,默認是true罚斗。ES默認集群中的第一個節(jié)點為主要節(jié)點徙鱼,如果這個節(jié)點故障就會重新選舉主要節(jié)點。

node.data: false
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù),默認為true袱吆。

3)數(shù)據(jù)節(jié)點:數(shù)據(jù)節(jié)點負責數(shù)據(jù)的存儲和相關(guān)具體操作厌衙,比如CRUD、搜索绞绒、聚合婶希。所以,數(shù)據(jù)節(jié)點對機器配置要求比較高蓬衡,首先需要有足夠的磁盤空間來存儲數(shù)據(jù)喻杈,其次數(shù)據(jù)操作對系統(tǒng)CPU、Memory和IO的性能消耗都很大狰晚。數(shù)據(jù)節(jié)點的 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件)設(shè)置項特點:

node.master: false
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點筒饰,默認是true。ES默認集群中的第一個節(jié)點為主要節(jié)點壁晒,如果這個節(jié)點故障就會重新選舉主要節(jié)點瓷们。

node.data: true
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù),默認為true秒咐。

4)候選主節(jié)點+數(shù)據(jù)節(jié)點:一個節(jié)點可以同時成為候選主節(jié)點和數(shù)據(jù)節(jié)點谬晕,主需要在候選主節(jié)點配置的基礎(chǔ)上開啟數(shù)據(jù)節(jié)點即可。候選主節(jié)點+數(shù)據(jù)節(jié)點的 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件)設(shè)置項特點:

node.master: true
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點携取,默認是true攒钳。ES默認集群中的第一個節(jié)點為主要節(jié)點,如果這個節(jié)點故障就會重新選舉主要節(jié)點雷滋。

node.data: true
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù)夕玩,默認為true。

5)客戶端節(jié)點:客戶端節(jié)點就是既不做候選主節(jié)點也不做數(shù)據(jù)節(jié)點的節(jié)點惊豺,只負責請求的分發(fā)、匯總禽作。這是所有節(jié)點的共性能力尸昧,單獨增加這樣的節(jié)點一般是為了負載均衡。數(shù)據(jù)節(jié)點的 Elasticsearch 服務(wù)的配置文件(elasticsearch.yml 文件)設(shè)置項特點:

node.master: false
# 設(shè)置當前節(jié)點是否有資格被選舉成為主要節(jié)點旷偿,默認是true烹俗。ES默認集群中的第一個節(jié)點為主要節(jié)點,如果這個節(jié)點故障就會重新選舉主要節(jié)點萍程。

node.data: false
# 設(shè)置當前節(jié)點是否存儲索引數(shù)據(jù)幢妄,默認為true。

4.2.Elasticsearch 集群的示例

首先完成 Elasticsearch 的安裝和配置茫负。假設(shè)按照本文規(guī)劃的集群拓撲圖蕉鸳,所有集群節(jié)點都是"候選主節(jié)點+數(shù)據(jù)節(jié)點"的類型,以"ES Node 1"節(jié)點為例:

1、設(shè)置 Elastcsearch 的配置文件參數(shù)潮尝。

使用文本編輯器打開配置文件:

[centos@ES-1 ~]$ sudo gedit /usr/local/elasticsearch-7.6.2/config/elasticsearch.yml

修改或驗證文件中的以下參數(shù)并保存:

# -- 必須一致的配置 --

cluster.name: elasticsearch-cluster
# 設(shè)置的集群名稱榕吼,同一集群的節(jié)點應(yīng)該設(shè)置相同的集群名稱。

cluster.initial_master_nodes: ["es-node-1","es-node-2"]
# 設(shè)置集群初始化主節(jié)點清單勉失。本例中將【es-node-1】和【es-node-2】節(jié)點設(shè)置為初始化主節(jié)點羹蚣。

discovery.send_hosts: ["192.168.216.128:9300", "192.168.216.129:9300"]
# 設(shè)置集群節(jié)點發(fā)現(xiàn)清單,一般是候選主節(jié)點IP地址乱凿,不寫端口號時默認9300端口顽素。

discovery.zen.minimum_master_nodes: 2
# 設(shè)置集群候選主節(jié)點的最少數(shù)量。

# -- 各節(jié)點的個性化配置 --

node.name: es-node-1
# 設(shè)置當前ES節(jié)點的名稱徒蟆,每個節(jié)點應(yīng)設(shè)置不同的名稱胁出。

node.master: true
# 設(shè)置當前節(jié)點是候選主節(jié)點。

node.data: true
# 設(shè)置當前節(jié)點是數(shù)據(jù)節(jié)點后专。

path.data: /data/elasticsearch/data
# 設(shè)置索引數(shù)據(jù)的存儲路徑划鸽,默認是ES根目錄下的data文件夾∑莅ィ可以設(shè)置多個存儲路徑裸诽,用逗號隔開,例:
# path.data: /path/to/data1,/path/to/data2

path.logs: /data/elasticsearch/logs
# 設(shè)置日志文件的存儲路徑型凳,默認是ES根目錄下的logs文件夾 丈冬。

network.host: 0.0.0.0
# 設(shè)置ES服務(wù)的監(jiān)聽網(wǎng)絡(luò)地址(IPv4 或 IPv6),默認是示例地址甘畅。

http.port: 9200
# 設(shè)置ES服務(wù)的HTTP端口埂蕊,默認為9200。

transport.tcp.port: 9300
# 設(shè)置節(jié)點之間交互的TCP端口疏唾,默認是9300蓄氧。

http.cors.enabled: true
http.cors.allow-origin: "*"
# 設(shè)置允許HTTP跨域訪問

# xpack.security.enabled: true
# xpack.security.transport.ssl.enabled: true
# xpack.security.transport.ssl.verification_mode: certificate
# xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
# xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
# 啟用 X-Pack 安全認證模塊,集群中所有節(jié)點的安全認證文件是一致的槐脏。
# 只需要在其中一個節(jié)點中創(chuàng)建認證文件(elastic-certificates.p12 文件和 elasticsearch.keystore 文件)喉童,其他節(jié)點只需要拷貝這兩個文件即可,注意要存放在一致的目錄下顿天。

2堂氯、認證文件一致化設(shè)置。

假設(shè)在 "ES Node 1" 上已經(jīng)創(chuàng)建安全認證文件 "elastic-certificates.p12" 和 "elasticsearch.keystore" 牌废,這些文件存放在 Elasticsearch 程序安裝目錄下的 "config" 目錄中(本例為:/usr/local/elasticsearch-7.6.2/config 目錄)咽白。

將兩個文件分別拷貝至其他 "ES Node" 節(jié)點上的 Elasticsearch 程序安裝目錄下的 "config" 目錄中(本例為:/usr/local/elasticsearch-7.6.2/config 目錄)。

注意:除上述操作以外鸟缕,無需進行任何有關(guān)安全性擴展的配置晶框。

3、重啟啟動集群。

首先啟動"ES Node 1"節(jié)點三妈,然后依次啟動其他"ES Node"節(jié)點畜埋。

[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service

[centos@ES-2 ~]$ sudo systemctl restart elasticsearch-server.service

[centos@ES-3 ~]$ sudo systemctl restart elasticsearch-server.service

4、驗證集群畴蒲。使用瀏覽器客戶端訪問 Elasticsearch 的 Http 服務(wù)接口進行測試悠鞍,輸入:http://<IP>:<PORT>/_cluster/health

Elasticsearch 集群驗證頁面

5.Kibana 安裝和配置

1、打開 Kibana 官方網(wǎng)站下載頁面【https://www.elastic.co/cn/downloads/kibana】模燥,下載 Kibana 的編譯程序 tar 包到用戶主目錄中咖祭。

Kibana 官方下載頁面

2、解壓縮編譯程序 tar 包到"/usr/local"目錄中蔫骂。

[centos@ESVM ~]$ sudo tar zxvf kibana-7.6.2-linux-x86_64.tar.gz -C /usr/local
[centos@ESVM ~]$ ll /usr/local
drwxr-xr-x. 13 root    root    266 4月  20 15:25 kibana-7.6.2-linux-x86_64

3么翰、創(chuàng)建 ELK 管理用戶和組,并設(shè)置為程序安裝目錄的擁有者辽旋。

[centos@ESVM ~ ]$ sudo id elastic
id: “elastic”:無此用戶
[centos@ESVM ~ ]$ sudo groupadd elastic
[centos@ESVM ~ ]$ sudo useradd -g elastic -s /bin/false elastic
[centos@ESVM ~ ]$ sudo chown -R elastic:elastic /usr/local/kibana-7.6.2-linux-x86_64

ELK組件的管理用戶和組(如:"elastic")可以共享使用很洋,創(chuàng)建用戶和組之前首先查詢一下賬戶是否存在峡竣。如果賬戶和組已經(jīng)存在,可能是因為在安裝其他ELK組件時已創(chuàng)建,這種情況下不必重復(fù)創(chuàng)建用戶和組仑撞,只需要更改程序安裝目錄艘刚、數(shù)據(jù)存儲目錄的擁有者即可凝垛。

4策橘、設(shè)置 Kibana 的配置文件參數(shù)。

使用文本編輯器打開配置文件:

[centos@ESVM ~]$ sudo gedit /usr/local/kibana-7.6.2-linux-x86_64/config/kibana.yml

修改或驗證文件中的以下參數(shù)并保存:

server.port: 5601
# 服務(wù)監(jiān)聽端口瓶逃。

server.host: "192.168.216.131"
# 服務(wù)監(jiān)聽地址束铭。

server.name: "ESVM Kibana"
# 服務(wù)的顯示名稱。

elasticsearch.hosts: ["http://192.168.216.128:9200","http://192.168.216.129:9200","http://192.168.216.130:9200"]
# ElasticSearch 集群 URLs 集合厢绝。

kibana.index: ".kibana"
# Kibana 在 ElasticSearch 中存儲搜索契沫、可視化和儀表板數(shù)據(jù)的索引名字。

elasticsearch.username: "elastic"
# Kibana 訪問 ElasticSearch 的賬號昔汉,"elastic" 賬號是 Elasticsearch 的預(yù)置用戶懈万。

elasticsearch.password: "password"
# Kibana 訪問 ElasticSearch 賬號的口令。

i18n.locale: "zh-CN"
# 本地化區(qū)域語言設(shè)置挤庇,支持英文和中文,默認是中文贷掖。

5嫡秕、配置 Kibana 服務(wù)開機自啟動。

使用文本編輯器創(chuàng)建配置文件:

[centos@ESVM ~ ]$ sudo gedit /usr/lib/systemd/system/kibana-server.service

編寫文件內(nèi)容并保存如下:

[Unit]
Description=Kibana Server
After=syslog.target network.target

[Service]
User=elastic
Group=elastic

ExecStart=/usr/local/kibana-7.6.2-linux-x86_64/bin/kibana
ExecStop=/bin/kill -HUP $MAINPID
ExecReload=/bin/kill -HUP $MAINPID

KillMode=mixed
KillSignal=SIGINT

LimitNOFILE=100000
LimitNPROC=100000

[Install]
WantedBy=multi-user.target

設(shè)置開機啟動:

[centos@ESVM ~]$ sudo systemctl daemon-reload
[centos@ESVM ~]$ sudo systemctl enable kibana-server.service

6苹威、啟動 Kibana 服務(wù)昆咽。

[centos@ESVM ~]$ sudo systemctl start kibana-server.service

7、設(shè)置防火墻端口(CentOS8默認安裝firewall防火墻),允許"5601"端口(Kibana 默認端口)訪問服務(wù)器掷酗。

[centos@ESVM ~]$ sudo firewall-cmd --zone=public --add-port=5601/tcp --permanent
[centos@ESVM ~]$ sudo firewall-cmd --reload

8调违、驗證 Kibana 服務(wù)正常運行。使用瀏覽器客戶端訪問 Kibana 的 Http 服務(wù)接口進行測試泻轰,輸入:http://<IP>:<PORT>

Kibana 登錄頁面
Kibana 主頁面

9技肩、Kibana 運維管理

1)啟動 Kibana 服務(wù)(任選一種方式)

[centos@ESVM ~]$ sudo systemctl start kibana-server.service

或者

[centos@ESVM ~]$ sudo -u elastic /usr/local/kibana-7.6.2-linux-x86_64/bin/kibana

2)停止 Kibana 服務(wù)(任選一種方式)

[centos@ESVM ~]$ sudo systemctl stop kibana-server.service

或者

[centos@ESVM ~]$ sudo ps -ef | grep kibana
elastic   12268      1  1 4月20 ?       00:12:00 /usr/local/kibana-7.6.2-linux-x86_64/bin/..
[centos@ESVM ~]$ sudo kill -9 12268

3)重啟 Kibana 服務(wù)

[centos@ESVM ~]$ sudo systemctl restart kibana-server.service

或者

[centos@ESVM ~]$ sudo ps -ef | grep kibana
elastic   12268      1  1 4月20 ?       00:12:00 /usr/local/kibana-7.6.2-linux-x86_64/bin/..
[centos@ESVM ~]$ sudo kill -9 12268
[centos@ESVM ~]$ sudo -u elastic /usr/local/kibana-7.6.2-linux-x86_64/bin/kibana

4)查看 Kibana 服務(wù)狀態(tài)

[centos@ESVM ~]$ sudo systemctl status kibana-server.service

或者

[centos@ESVM ~]$ sudo netstat -ntap | grep 5601
tcp        0      0 192.168.216.131:5601    0.0.0.0:*               LISTEN      12268/node

5)啟動 Kibana 服務(wù)開機自啟動

[centos@ESVM ~]$ sudo systemctl enable kibana-server.service

6)禁用 Kibana 服務(wù)開機自啟動

[centos@ESVM ~]$ sudo systemctl disable kibana-server.service

6.Logstash 安裝和作業(yè)部署

1、安裝 OpenJDK 1.8 或 OracleJDK 1.8浮声。

1)使用 YUM 源安裝OpenJDK 1.8虚婿。

[centos@ETL ~]$ sudo dnf install java-1.8.0-openjdk

2)驗證Java運行環(huán)境。

[centos@ETL ~]$ java -vserion
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)

2泳挥、打開 Logstash 官方網(wǎng)站下載頁面【https://www.elastic.co/cn/downloads/logstash】然痊,下載 Logstash 的編譯程序 tar 包到用戶主目錄中。

Logstash 官方下載頁面

3屉符、解壓縮編譯程序 tar 包到"/usr/local"目錄中剧浸。

[centos@ETL ~]$ sudo tar zxvf logstash-7.6.2.tar.gz -C /usr/local
[centos@ETL ~]$ ll /usr/local
drwxr-xr-x. 12     631     503 255 3月  26 17:42 logstash-7.6.2

4、創(chuàng)建 Logstash 的數(shù)據(jù)矗钟、日志唆香、作業(yè)配置存儲目錄。

[centos@ETL ~]$ sudo mkdir -p /data/logstash/data
[centos@ETL ~]$ sudo mkdir -p /data/logstash/logs
[centos@ETL ~]$ sudo mkdir -p /data/logstash/config

5真仲、創(chuàng)建 ELK 管理用戶和組袋马,并設(shè)置為程序安裝目錄、數(shù)據(jù)存儲目錄的擁有者秸应。

[centos@ETL ~ ]$ sudo id elastic
id: “elastic”:無此用戶
[centos@ETL ~ ]$ sudo groupadd elastic
[centos@ETL ~ ]$ sudo useradd -g elastic -s /bin/false elastic
[centos@ETL ~ ]$ sudo chown -R elastic:elastic /usr/local/logstash-7.6.2
[centos@ETL ~ ]$ sudo chown -R elastic:elastic /data/logstash

ELK組件的管理用戶和組(如:"elastic")可以共享使用虑凛,創(chuàng)建用戶和組之前首先查詢一下賬戶是否存在。如果賬戶和組已經(jīng)存在软啼,可能是因為在安裝其他ELK組件時已創(chuàng)建桑谍,這種情況下不必重復(fù)創(chuàng)建用戶和組,只需要更改程序安裝目錄祸挪、數(shù)據(jù)存儲目錄的擁有者即可锣披。

6、設(shè)置 Logstash 的配置文件參數(shù)贿条。

使用文本編輯器打開配置文件:

[centos@ETL ~]$ sudo gedit /usr/local/logstash-7.6.2/config/logstash.yml

修改或驗證文件中的以下參數(shù)并保存:

# --常用設(shè)置項--

path.data: /data/logstash/data
# 插件數(shù)據(jù)持久化目錄雹仿,默認為程序安裝目錄下的"data"目錄。

path.logs: /data/logstash/logs
# 設(shè)置日志文件目錄整以,默認輸出到控制臺胧辽。

path.config: /data/logstash/config
# 設(shè)置管道配置文件目錄。

config.reload.automatic: true
# 開啟管道配置文件自動加載公黑,默認為false邑商。

config.reload.interval: 3s
# 設(shè)置管道配置文件檢查更新的時間摄咆。

# --其他可用設(shè)置項--

# node.name: ETL Logstash
# 節(jié)點名稱,默認為主機名稱人断。

# pipeline.workers: 2
# 輸出通道的工作workers數(shù)據(jù)量(提升輸出效率)默認為CPU核數(shù)吭从。

# pipeline.batch.size: 125
# 每批次輸入數(shù)據(jù)的條數(shù),默認125條恶迈。

# pipeline.batch.delay: 50
# 每批次延時等待的時間(單位毫秒)涩金,默認50毫秒。

# http.host: "127.0.0.1"
# 用戶指標收集REST服務(wù)綁定主機地址蝉绷,默認為"127.0.0.1"鸭廷。

# http.port: 9600-9700
# 用戶指標收集REST服務(wù)綁定主機端口,默認為9600-9700熔吗。

# log.level: info
# 日志輸出級別辆床,如果config.debug開啟,這里一定要是debug日志 桅狠,默認為 info讼载。

# path.plugins: []
# 自定義插件目錄

7、配置 ETL 作業(yè)中跌。

以從 PostgreSQL 數(shù)據(jù)庫中抽取數(shù)據(jù)咨堤,并寫入 Elasticsearch 中為例:

1)下載 PostgreSQL JDBC 的 jar 包到指定位置。打開 PostgreSQL JDBC 官方網(wǎng)站下載頁面【https://jdbc.postgresql.org/download.html】漩符,下載 PostgreSQL JDBC 的 jar 包到用戶主目錄中一喘。**

PostgreSQL JDBC 的 jar 包下載頁面

2)將 PostgreSQL JDBC 的 jar 包拷貝到 Logstash 程序目錄中的指定位置:

[centos@ETL ~]$ sudo -u elastic cp postgresql-42.2.12.jar /usr/local/logstash-7.6.2/logstash-core/lib/jars

3)在 Logstash 的作業(yè)配置存儲目錄中創(chuàng)建 JOB 配置文件。

使用文本編輯器創(chuàng)建配置文件:

[centos@ETL ~]$ sudo -u elastic gedit /data/logstash/config/pgsql.conf

編寫以下配置腳本并保存:

input {
    stdin {
    }
    jdbc {
          # 數(shù)據(jù)庫 JDBC 的 jar 包位置嗜暴,JDBC 的 jar 包需要自行下載準備
          jdbc_driver_library => "/usr/local/logstash-7.6.2/logstash-core/lib/jars/postgresql-42.2.12.jar"
          # 數(shù)據(jù)庫驅(qū)動
          jdbc_driver_class => "org.postgresql.Driver"
          # 數(shù)據(jù)庫連接字符串
          jdbc_connection_string => "jdbc:postgresql://192.168.216.133:5432/pgsql"
          # 數(shù)據(jù)庫用戶名密碼
          jdbc_user => "postgres"
          jdbc_password => "postgres"
          # 數(shù)據(jù)庫重連嘗試次數(shù)
          connection_retry_attempts => "3"
          # 判斷數(shù)據(jù)庫連接是否可用凸克,默認false不開啟
          jdbc_validate_connection => "true"
          # 數(shù)據(jù)庫連接可用校驗超時時間,默認3600S
          jdbc_validation_timeout => "3600"
          # 開啟分頁查詢(默認false不開啟)
          jdbc_paging_enabled => "true"
          # 單次分頁查詢條數(shù)(默認100000,若字段較多且更新頻率較高闷沥,建議調(diào)低此值)
          jdbc_page_size => "5000"
          # sql腳本文件或腳本
          # statement_filepath => "pgsql.sql"
          statement => "select row_number() over(order by last_modify_dt  asc) as rn,* from table where last_modify_dt >= :sql_last_value"
          # 是否將列名轉(zhuǎn)為小寫萎战,默認為true
          lowercase_column_names => "false"
          # 定期執(zhí)行周期,"*"從左到右依次代表: 分 時 天 月 年  
          schedule => "* * * * *"
          # 是否記錄上次執(zhí)行結(jié)果, 如果為真,將會把上次執(zhí)行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
          record_last_run => "true"
          # 是否需要記錄某個column 的值,如果record_last_run為真,可以自定義我們需要 track 的 column 名稱舆逃,此時該參數(shù)就要為 true. 否則默認 track 的是 timestamp 的值.
          use_column_value => "true"
          # 如果 use_column_value 為真,需配置數(shù)據(jù)表增量字段的類型蚂维,只能是"numeric"或"timestamp"
          tracking_column_type => "timestamp"
              # 如果 use_column_value 為真,需配置數(shù)據(jù)表增量字段名,該字段必須是遞增的. 一般序號或時間戳
          tracking_column => "last_modify_dt"
          last_run_metadata_path => "/data/logstash/config/sql_last_value"
    }
}
filter {
    mutate {
        # 將 keyword_tag 字段的內(nèi)容通過 "," 分割成數(shù)組
        split => { "keyword_tag" => "," }
    }
    mutate {
        # 將 taxonomy 字段的內(nèi)容中的前綴"/"和"http://"替換為""。
        gsub => ["taxonomy", "^/(/)?", ""]
    }
}
output {
    elasticsearch {
        # Elasticsearch 服務(wù)IP和端口,賬號和密碼
        hosts => ["192.168.216.128:9200", "192.168.216.129:9200", "192.168.216.130:9200"]
        user => "elastic"
        password => "elastic"
        # 索引名
        index => "pgsql_index"
        # 需要關(guān)聯(lián)的數(shù)據(jù)表中有一個uuid字段路狮,對應(yīng)索引文檔的id號
        document_id => "%{uuid}"
    }
    stdout {
        codec => rubydebug
    }
}

4)驗證 JOB 配置文件虫啥。

[centos@ETL ~]$ sudo -u elastic /usr/local/logstash-7.6.2/bin/logstash -f /data/logstash/config/pgsql.conf -t

其他配置請參加官方文檔:

注意:每個作業(yè)編寫?yīng)毩⒌呐渲媚_本。

8奄妨、配置 Logstash 開機自啟動涂籽。

使用文本編輯器創(chuàng)建配置文件:

[centos@ETL ~ ]$ sudo gedit /usr/lib/systemd/system/logstash.service

編寫文件內(nèi)容并保存如下:

[Unit]
Description=Logstash
After=syslog.target network.target

[Service]
User=elastic
Group=elastic

ExecStart=/usr/local/logstash-7.6.2/bin/logstash
ExecStop=/bin/kill -HUP $MAINPID
ExecReload=/bin/kill -HUP $MAINPID

KillMode=mixed
KillSignal=SIGINT

LimitNOFILE=100000
LimitNPROC=100000

[Install]
WantedBy=multi-user.target

設(shè)置開機啟動:

[centos@ETL ~]$ sudo systemctl daemon-reload
[centos@ETL ~]$ sudo systemctl enable logstash.service

9、啟動 Logstash 服務(wù)展蒂。

[centos@ETL ~]$ sudo systemctl start logstash.service

10又活、Logstash 運維管理

1)啟動 Logstash 服務(wù)(任選一種方式)

[centos@ETL ~]$ sudo systemctl start logstash.service

或者

[centos@ETL ~]$ sudo -u elastic /usr/local/logstash-7.6.2/bin/logstash

2)停止 Logstash 服務(wù)(任選一種方式)

[centos@ETL ~]$ sudo systemctl stop logstash.service

或者

[centos@ETL ~]$ sudo ps -ef | grep logstash
elastic   26393  26391  5 11:01 ?        00:01:38 /bin/java...
[centos@ETL ~]$ sudo kill -9 26393

3)重啟 Logstash 服務(wù)

[centos@ETL ~]$ sudo systemctl restart logstash.service

或者

[centos@ETL ~]$ sudo ps -ef | grep logstash
elastic   26393  26391  5 11:01 ?        00:01:38 /bin/java...
[centos@ETL ~]$ sudo kill -9 26393
[centos@ETL ~]$ sudo -u elastic /usr/local/logstash-7.6.2/bin/logstash

4)查看 Logstash 服務(wù)狀態(tài)

[centos@ETL ~]$ sudo systemctl status logstash.service

或者

[centos@ETL ~]$ sudo -u elastic tail /data/logstash/logs/logstash-plain.log

5)啟動 Logstash 服務(wù)開機自啟動

[centos@ETL ~]$ sudo systemctl enable logstash.service

6)禁用 Logstash 服務(wù)開機自啟動

[centos@ETL ~]$ sudo systemctl disable logstash.service

7.Java 開發(fā)集成

7.1.ES 編程語言


7.2.ES + Jest 可復(fù)用類庫

JestClient 是 Elasticsearch 的 JavaAPI 包,以下是 Maven 項目的程序案例锰悼。

1柳骄、從 Maven 庫中引入 JestClient 包。

<dependency>
    <groupId>io.searchbox</groupId>
    <artifactId>jest-common</artifactId>
    <version>6.3.1</version>
</dependency>

2箕般、編寫 Elasticsearch 會話工廠類 "SearchSessionFacade.java"耐薯。

/**
 * 
 * ElasticSearch Libs
 * 
 * ?2020 張毅
 * 
 */

import com.google.gson.GsonBuilder;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;

/**
 * ElasticSearch 會話工廠類 (ElasticSearch Session Factory)
 */
public class SearchSessionFactory {

    /** 請求服務(wù)地址 */
    private String[] serverUris;

    /** 是否啟用多線程 */
    private boolean multiThreaded = true;

    /** 連接超時時長 */
    private int connTimeout = 10000;

    /** 數(shù)據(jù)讀取超時時長 */
    private int readTimeout = 10000;

    /** 日期格式 */
    private String dateFormat = "yyyy-MM-dd HH:mm:ss";

    /** 訪問賬號 */
    private String user;

    /** 訪問密碼 */
    private String pass;

    /** Jest客戶端 */
    private JestClient jestClient;

    /**
     * 獲取請求服務(wù)地址。
     * 
     * @return 請求服務(wù)地址丝里。
     */
    public String[] getServerUris() {
        return serverUris;
    }

    /**
     * 設(shè)置請求服務(wù)地址曲初。
     * 
     * @param serverUris 請求服務(wù)地址。
     */
    public void setServerUris(String[] serverUris) {
        this.serverUris = serverUris;
    }

    /**
     * 獲取是否啟用多線程杯聚。
     * 
     * @return 啟用多線程返回true臼婆,否則返回false。
     */
    public boolean isMultiThreaded() {
        return multiThreaded;
    }

    /**
     * 設(shè)置是否啟用多線程幌绍。
     * 
     * @param multiThreaded 設(shè)置為true啟用颁褂,false停用。
     */
    public void setMultiThreaded(boolean multiThreaded) {
        this.multiThreaded = multiThreaded;
    }

    /**
     * 獲取連接超時時長(毫秒)傀广。
     * 
     * @return 連接超時時長(毫秒)颁独。
     */
    public int getConnTimeout() {
        return connTimeout;
    }

    /**
     * 設(shè)置連接超時時長(毫秒)。
     * 
     * @param connTimeout 連接超時時長(毫秒)伪冰。
     */
    public void setConnTimeout(int connTimeout) {
        this.connTimeout = connTimeout;
    }

    /**
     * 獲取數(shù)據(jù)讀取超時時長(毫秒)誓酒。
     * 
     * @return 數(shù)據(jù)讀取超時時長(毫秒)。
     */
    public int getReadTimeout() {
        return readTimeout;
    }

    /**
     * 設(shè)置數(shù)據(jù)讀取超時時長(毫秒)贮聂。
     * 
     * @param readTimeout 數(shù)據(jù)讀取超時時長(毫秒)靠柑。
     */
    public void setReadTimeout(int readTimeout) {
        this.readTimeout = readTimeout;
    }

    /**
     * 獲取日期格式。
     * 
     * @return 日期格式寂汇。
     */
    public String getDateFormat() {
        return dateFormat;
    }

    /**
     * 設(shè)置日期格式病往。
     * 
     * @param dateFormat 日期格式。
     */
    public void setDateFormat(String dateFormat) {
        this.dateFormat = dateFormat;
    }

    /**
     * 獲取訪問賬號骄瓣。
     * 
     * @return 訪問賬號停巷。
     */
    public String getUser() {
        return user;
    }

    /**
     * 設(shè)置訪問賬號。
     * 
     * @param user 訪問賬號榕栏。
     */
    public void setUser(String user) {
        this.user = user;
    }

    /**
     * 獲取訪問密碼畔勤。
     * 
     * @return 訪問密碼。
     */
    public String getPass() {
        return pass;
    }

    /**
     * 設(shè)置訪問訪問密碼扒磁。
     * 
     * @param pass 訪問密碼庆揪。
     */
    public void setPass(String pass) {
        this.pass = pass;
    }

    /**
     * 獲取Jest連接客戶端。
     * 
     * @return Jest客戶端妨托。
     */
    public JestClient getJestClient() {

        if (jestClient == null) {
            jestClient = this.createJestClient();
        }
        return jestClient;
    }

    /**
     * 創(chuàng)建Jest客戶端實例缸榛。
     * 
     * @return Jest客戶端實例吝羞。
     */
    protected JestClient createJestClient() {

        JestClientFactory factory = new JestClientFactory();
        if (this.getUser() != null && !this.getUser().equals("") && this.getPass() != null
                && !this.getPass().equals("")) {
            factory.setHttpClientConfig(new HttpClientConfig.Builder(serverUris).multiThreaded(multiThreaded)
                    .connTimeout(connTimeout).readTimeout(readTimeout)
                    .gson(new GsonBuilder().setDateFormat(dateFormat).create()).defaultCredentials(user, pass).build());
            this.jestClient = factory.getObject();
        } else {
            factory.setHttpClientConfig(new HttpClientConfig.Builder(serverUris).multiThreaded(multiThreaded)
                    .connTimeout(connTimeout).readTimeout(readTimeout)
                    .gson(new GsonBuilder().setDateFormat(dateFormat).create()).build());
            this.jestClient = factory.getObject();
        }

        return this.jestClient;
    }
}

3、編寫 Elasticsearch 會話類 "SearchSession.java"内颗。

/**
 * 
 * ElasticSearch Libs
 * 
 * ?2020 張毅
 * 
 */

import java.util.Collection;
import java.util.List;
import java.util.Map;

import com.google.gson.JsonObject;

import io.searchbox.action.BulkableAction;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.cluster.GetSettings;
import io.searchbox.cluster.Health;
import io.searchbox.cluster.NodesInfo;
import io.searchbox.cluster.NodesStats;
import io.searchbox.core.Bulk;
import io.searchbox.core.Count;
import io.searchbox.core.CountResult;
import io.searchbox.core.Delete;
import io.searchbox.core.DeleteByQuery;
import io.searchbox.core.Get;
import io.searchbox.core.Index;
import io.searchbox.core.MultiGet;
import io.searchbox.core.Search;
import io.searchbox.core.Search.Builder;
import io.searchbox.core.SearchResult;
import io.searchbox.core.SearchScroll;
import io.searchbox.core.Update;
import io.searchbox.core.UpdateByQuery;
import io.searchbox.indices.ClearCache;
import io.searchbox.indices.CloseIndex;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.Flush;
import io.searchbox.indices.IndicesExists;
import io.searchbox.indices.OpenIndex;
import io.searchbox.indices.Optimize;
import io.searchbox.indices.aliases.AddAliasMapping;
import io.searchbox.indices.aliases.AliasExists;
import io.searchbox.indices.aliases.GetAliases;
import io.searchbox.indices.aliases.ModifyAliases;
import io.searchbox.indices.aliases.RemoveAliasMapping;
import io.searchbox.indices.mapping.DeleteMapping;
import io.searchbox.indices.mapping.GetMapping;
import io.searchbox.indices.mapping.PutMapping;
import io.searchbox.indices.settings.UpdateSettings;
import io.searchbox.indices.template.DeleteTemplate;
import io.searchbox.indices.template.GetTemplate;
import io.searchbox.indices.template.PutTemplate;
import io.searchbox.params.Parameters;

/**
 * ElasticSearch 會話類 (ElasticSearch Session)
 */
public class SearchSession {

    /** Jest客戶端 */
    protected final JestClient jestClient;

    /**
     * 獲取Jest連接客戶端钧排。
     * 
     * @return Jest客戶端。
     */
    protected JestClient getJestClient() {

        return jestClient;
    }

    /**
     * 構(gòu)造器均澳。
     * 
     * @param factory ElasticSearch創(chuàng)建工廠恨溜。
     */
    public SearchSession(SearchSessionFactory factory) {

        this.jestClient = factory.getJestClient();
    }

    /**
     * 執(zhí)行批量事務(wù)。
     * 
     * @param actions 事務(wù)動作集合找前。
     * @return 執(zhí)行結(jié)果模型糟袁。
     */
    @SuppressWarnings("rawtypes")
    public JestResult executeBulk(Collection<? extends BulkableAction> actions) {

        try {
            Bulk bulk = new Bulk.Builder().addAction(actions).build();
            JestResult jestResult = jestClient.execute(bulk);
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 優(yōu)化。
     * 
     * @return 執(zhí)行結(jié)果模型躺盛。
     */
    public JestResult optimize() {

        try {
            JestResult jestResult = jestClient.execute(new Optimize.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 刷新项戴。
     * 
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult flush() {

        try {
            JestResult jestResult = jestClient.execute(new Flush.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 清除緩存槽惫。
     * 
     * @return 執(zhí)行結(jié)果模型肯尺。
     */
    public JestResult clearCache() {

        try {
            JestResult jestResult = jestClient.execute(new ClearCache.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取集群健康信息。
     * 
     * @return 執(zhí)行結(jié)果模型躯枢。
     */
    public JestResult getClusterHealth() {

        try {
            JestResult jestResult = jestClient.execute(new Health.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取節(jié)點信息则吟。
     * 
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult getNodesInfo() {

        try {
            JestResult jestResult = jestClient.execute(new NodesInfo.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取節(jié)點狀態(tài)信息锄蹂。
     * 
     * @return 執(zhí)行結(jié)果模型氓仲。
     */
    public JestResult getNodesStats() {

        try {
            JestResult jestResult = jestClient.execute(new NodesStats.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新配置。
     * 
     * @param setting 配置更新腳本JSON字符串得糜。敬扛。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult updateSettings(String setting) {

        try {
            JestResult jestResult = jestClient.execute(new UpdateSettings.Builder(setting).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新配置朝抖。
     * 
     * @param setting 配置更新腳本JSON模型啥箭。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult updateSettings(JsonObject setting) {

        return this.updateSettings(setting.toString());
    }

    /**
     * 獲取配置治宣。
     * 
     * @return 執(zhí)行結(jié)果模型急侥。
     */
    public JestResult geSettings() {
        try {
            JestResult jestResult = jestClient.execute(new GetSettings.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 創(chuàng)建索引。
     * 
     * @param index 索引名字侮邀。
     * @return 執(zhí)行結(jié)果模型坏怪。
     */
    public JestResult createIndex(String index) {

        try {
            if (this.indicesExists(index).isSucceeded()) {
                this.deleteIndex(index);
            }
            JestResult jestResult = jestClient.execute(new CreateIndex.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 刪除索引。
     * 
     * @param index 索引名字绊茧。
     * @return 執(zhí)行結(jié)果模型铝宵。
     */
    public JestResult deleteIndex(String index) {

        try {
            JestResult jestResult = jestClient.execute(new DeleteIndex.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 判斷索引是否存在。
     * 
     * @param index 索引名字华畏。
     * @return 執(zhí)行結(jié)果模型鹏秋。
     */
    public JestResult indicesExists(String index) {
        try {
            JestResult jestResult = jestClient.execute(new IndicesExists.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 關(guān)閉索引尊蚁。
     * 
     * @param index 索引名字。
     * @return 執(zhí)行結(jié)果模型侣夷。
     */
    public JestResult closeIndex(String index) {

        try {
            JestResult jestResult = jestClient.execute(new CloseIndex.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 打開索引枝誊。
     * 
     * @param index 索引名字。
     * @return 執(zhí)行結(jié)果模型惜纸。
     */
    public JestResult openIndex(String index) {

        try {
            JestResult jestResult = jestClient.execute(new OpenIndex.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 創(chuàng)建映射。
     * 
     * @param index   索引名字绝骚。
     * @param type    索引類型耐版。
     * @param mapping 映射創(chuàng)建腳本JSON字符串。
     * @return 執(zhí)行結(jié)果模型压汪。
     */
    public JestResult putMappting(String index, String type, String mapping) {

        try {
            JestResult jestResult = jestClient.execute(new PutMapping.Builder(index, type, mapping).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 創(chuàng)建映射粪牲。
     * 
     * @param index   索引名字。
     * @param type    索引類型止剖。
     * @param mapping 映射創(chuàng)建腳本JSON模型腺阳。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult putMappting(String index, String type, JsonObject mapping) {

        return this.putMappting(index, type, mapping.toString());
    }

    /**
     * 刪除映射穿香。
     * 
     * @param index 索引名字亭引。
     * @param type  索引類型。
     * @return 執(zhí)行結(jié)果模型皮获。
     */
    public JestResult deleteMappting(String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new DeleteMapping.Builder(index, type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取映射焙蚓。
     * 
     * @param index 索引名字。
     * @param type  索引類型洒宝。
     * @return 執(zhí)行結(jié)果模型购公。
     */
    public JestResult getMappting(String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new GetMapping.Builder().addIndex(index).addType(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 創(chuàng)建別名。
     * 
     * @param indexes 索引名字列表雁歌。
     * @param alias   別名宏浩。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult addAliasMapping(List<String> indexes, String alias) {

        try {
            AddAliasMapping build = new AddAliasMapping.Builder(indexes, alias).build();
            JestResult jestResult = jestClient.execute(new ModifyAliases.Builder(build).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 刪除別名靠瞎。
     * 
     * @param indexes 索引名字列表比庄。
     * @param alias   別名。
     * @return 執(zhí)行結(jié)果模型乏盐。
     */
    public JestResult deleteAliasMapping(List<String> indexes, String alias) {

        try {
            RemoveAliasMapping build = new RemoveAliasMapping.Builder(indexes, alias).build();
            JestResult jestResult = jestClient.execute(new ModifyAliases.Builder(build).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 判斷別名是否存在印蔗。
     * 
     * @param alias 別名。
     * @return 執(zhí)行結(jié)果模型丑勤。
     */
    public JestResult aliasExists(String alias) {

        try {
            JestResult jestResult = jestClient.execute(new AliasExists.Builder().alias(alias).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取別名华嘹。
     * 
     * @param indexes 索引名字列表。
     * @param alias   別名法竞。
     * @return 執(zhí)行結(jié)果模型耙厚。
     */
    public JestResult getAliases(String index) {

        try {
            JestResult jestResult = jestClient.execute(new GetAliases.Builder().addIndex(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 創(chuàng)建模板强挫。
     * 
     * @param name     模板名字。
     * @param template 模板創(chuàng)建腳本JSON字符串薛躬。
     * @return 執(zhí)行結(jié)果模型俯渤。
     */
    public JestResult putTemplate(String name, String template) {

        try {
            JestResult jestResult = jestClient.execute(new PutTemplate.Builder(name, template).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 創(chuàng)建模板。
     * 
     * @param name     模板名字型宝。
     * @param template 模板創(chuàng)建腳本JSON模型八匠。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult putTemplate(String name, JsonObject template) {

        return this.putTemplate(name, template.toString());
    }

    /**
     * 刪除模板趴酣。
     * 
     * @param name 模板名字梨树。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult deleteTemplate(String name) {

        try {
            JestResult jestResult = jestClient.execute(new DeleteTemplate.Builder(name).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取模板岖寞。
     * 
     * @param name 模板名字抡四。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult getTemplate(String name) {

        try {
            JestResult jestResult = jestClient.execute(new GetTemplate.Builder(name).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 插入文檔仗谆。
     * 
     * @param doc   文檔指巡。
     * @param index 索引名字。
     * @param type  索引類型隶垮。
     * @return 執(zhí)行結(jié)果模型藻雪。
     */
    public JestResult insertDocuments(List<Map<String, Object>> docs, String index, String type) {

        try {
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index);
            for (Map<String, Object> doc : docs) {
                Index action;
                if (doc.get("id") != null) {
                    action = new Index.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build();
                    bulk.addAction(action);
                } else {
                    action = new Index.Builder(doc).index(index).build();
                    bulk.addAction(action);
                }
            }
            JestResult jestResult = jestClient.execute(bulk.build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 插入文檔。
     * 
     * @param doc   文檔狸吞。
     * @param id    文檔ID阔涉。
     * @param index 索引名字。
     * @param type  索引類型捷绒。
     * @return 執(zhí)行結(jié)果模型瑰排。
     */
    public JestResult insertDocument(Map<String, Object> doc, String id, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new Index.Builder(doc).id(id).index(index).type(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 插入文檔。
     * 
     * @param doc   文檔暖侨。
     * @param index 索引名字椭住。
     * @param type  索引類型。
     * @return 執(zhí)行結(jié)果模型字逗。
     */
    public JestResult insertDocument(Map<String, Object> doc, String index, String type) {

        try {
            if (doc.get("id") != null) {
                JestResult jestResult = jestClient
                        .execute(new Index.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build());
                if (!jestResult.isSucceeded()) {
                    throw new Exception(jestResult.getErrorMessage());
                }
                return jestResult;
            } else {
                JestResult jestResult = jestClient.execute(new Index.Builder(doc).index(index).type(type).build());
                if (!jestResult.isSucceeded()) {
                    throw new Exception(jestResult.getErrorMessage());
                }
                return jestResult;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新文檔京郑。
     * 
     * @param doc   文檔。
     * @param index 索引名字葫掉。
     * @param type  索引類型些举。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult updateDocuments(List<Map<String, Object>> docs, String index, String type) {

        try {
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index);
            for (Map<String, Object> doc : docs) {
                Update action;
                if (doc.get("id") != null) {
                    action = new Update.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build();
                    bulk.addAction(action);
                }
            }
            JestResult jestResult = jestClient.execute(bulk.build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新文檔俭厚。
     * 
     * @param doc   文檔户魏。
     * @param id    文檔ID。
     * @param index 索引名字。
     * @param type  索引類型叼丑。
     * @return 執(zhí)行結(jié)果模型关翎。
     */
    public JestResult updateDocument(Map<String, Object> doc, String id, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new Update.Builder(doc).id(id).index(index).type(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新文檔。
     * 
     * @param doc   文檔鸠信。
     * @param id    文檔ID纵寝。
     * @param index 索引名字。
     * @param type  索引類型星立。
     * @return 執(zhí)行結(jié)果模型爽茴。
     */
    public JestResult updateDocument(Map<String, Object> doc, String index, String type) {

        try {
            if (doc.get("id") != null) {
                JestResult jestResult = jestClient
                        .execute(new Update.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build());
                if (!jestResult.isSucceeded()) {
                    throw new Exception(jestResult.getErrorMessage());
                }
                return jestResult;
            }
            return null;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新文檔。
     * 
     * @param query 查詢腳本JSON字符串绰垂。
     * @param index 索引名字室奏。
     * @param type  索引類型。
     * @return 執(zhí)行結(jié)果模型辕坝。
     */
    public JestResult updateDocumentByQuery(String query, String index, String type) {

        try {
            JestResult jestResult = jestClient
                    .execute(new UpdateByQuery.Builder(query).addIndex(index).addType(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    /**
     * 更新文檔。
     * 
     * @param query 查詢腳本JSON模型荐健。
     * @param index 索引名字酱畅。
     * @param type  索引類型。
     * @return 執(zhí)行結(jié)果模型江场。
     */
    public JestResult updateDocumentByQuery(JsonObject query, String index, String type) {

        try {
            JestResult jestResult = jestClient
                    .execute(new UpdateByQuery.Builder(query.toString()).addIndex(index).addType(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 刪除文檔纺酸。
     * 
     * @param ids   文檔ID。
     * @param index 索引名字址否。
     * @param type  索引類型餐蔬。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult deleteDocuments(String[] ids, String index, String type) {

        try {
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
            for (String id : ids) {
                Delete action = new Delete.Builder(id).build();
                bulk.addAction(action);
            }
            JestResult jestResult = jestClient.execute(bulk.build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 刪除文檔佑附。
     * 
     * @param id    文檔ID樊诺。
     * @param index 索引名字。
     * @param type  索引類型音同。
     * @return 執(zhí)行結(jié)果模型词爬。
     */
    public JestResult deleteDocument(String id, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new Delete.Builder(id).index(index).type(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 刪除文檔。
     * 
     * @param query 查詢腳本权均。
     * @param index 索引名字顿膨。
     * @param type  索引類型。
     * @return 執(zhí)行結(jié)果模型叽赊。
     */
    public JestResult deleteDocumentByQuery(String query, String index, String type) {

        try {
            JestResult jestResult = jestClient
                    .execute(new DeleteByQuery.Builder(query).addIndex(index).addType(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 刪除文檔恋沃。
     * 
     * @param query 查詢腳本JSON模型。
     * @param index 索引名字必指。
     * @param type  索引類型囊咏。
     * @return 執(zhí)行結(jié)果模型。
     */
    public JestResult deleteDocumentByQuery(JsonObject query, String index, String type) {

        try {
            JestResult jestResult = jestClient
                    .execute(new DeleteByQuery.Builder(query.toString()).addIndex(index).addType(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取指定ID的文檔。
     * 
     * @param ids   文檔ID集合匆笤。
     * @param index 索引名字研侣。
     * @param type  索引類型。
     * @return 指定ID的文檔JSON模型集合炮捧。
     */
    public List<JsonObject> getDocumentsByIds(List<String> ids, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new MultiGet.Builder.ById(index, type).addId(ids).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            List<JsonObject> docs = jestResult.getSourceAsObjectList(JsonObject.class);
            return docs;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取指定ID庶诡。
     * 
     * @param id    文檔ID。
     * @param index 索引名字咆课。
     * @param type  索引類型末誓。
     * @return 指定ID的文檔JSON模型。
     */
    public JsonObject getDocumentsById(String id, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new Get.Builder(index, id).type(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            JsonObject doc = jestResult.getSourceAsObject(JsonObject.class);
            return doc;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取文檔數(shù)量书蚪。
     * 
     * @param search 查詢腳本JSON字符串喇澡。
     * @param index  索引名字。
     * @param type   索引類型殊校。
     * @return 文檔數(shù)量晴玖。
     */
    public Double getDocumentCount(String query, String index, String type) {

        try {
            CountResult jestResult = jestClient
                    .execute(new Count.Builder().query(query).addIndex(index).addType(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult.getCount();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 獲取文檔數(shù)量。
     * 
     * @param query 查詢腳本JSON模型为流。
     * @param index 索引名字呕屎。
     * @param type  索引類型。
     * @return 文檔數(shù)量敬察。
     */
    public Double getDocumentCount(JsonObject query, String index, String type) {

        return this.getDocumentCount(query.toString(), index, type);
    }

    /**
     * 查詢文檔秀睛。
     * 
     * @param query 查詢腳本JSON字符串。
     * @param from  頁起點莲祸。
     * @param size  頁長度蹂安。
     * @param index 索引名字。
     * @param type  索引類型锐帜。
     * @return 查詢結(jié)果模型田盈。
     */
    public SearchResult search(String query, Integer from, Integer size, String index, String type) {

        try {
            Builder builder = new Search.Builder(query).addIndex(index).addType(type)
                    .setParameter(Parameters.SIZE, size != null && size > 0 ? size : 15)
                    .setParameter(Parameters.FROM, from != null && from > 0 ? from : 0);
            SearchResult result = jestClient.execute(builder.build());
            if (!result.isSucceeded()) {
                throw new Exception(result.getErrorMessage());
            }
            return result;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 查詢文檔。
     * 
     * @param query 查詢腳本JSON模型缴阎。
     * @param from  頁起點缠黍。
     * @param size  頁長度。
     * @param index 索引名字药蜻。
     * @param type  索引類型阐滩。
     * @return 查詢結(jié)果模型抡锈。
     */
    public SearchResult search(JsonObject query, Integer from, Integer size, String index, String type) {

        return this.search(query.toString(), from, size, index, type);
    }

    /**
     * 向后滾動查詢文檔滤馍。
     * 
     * @param scrollId 向后滾動ID拓劝。
     * @param scroll   有效時間,如:"5m"踱卵。
     * @return 查詢結(jié)果模型廊驼。
     */
    public JestResult search(String scrollId, String scroll) {

        try {
            JestResult jestResult = jestClient.execute(new SearchScroll.Builder(scrollId, scroll).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 向后滾動查詢文檔据过。
     * 
     * @param scrollId 向后滾動ID。
     * @return 查詢結(jié)果模型妒挎。
     */
    public JestResult search(String scrollId) {

        return this.search(scrollId, "5m");
    }

    /**
     * 實例回收方法绳锅。
     */
    @Override
    protected void finalize() throws Throwable {

        if (this.jestClient != null) {
            this.jestClient.close();
        }
        super.finalize();
    }
}

4、編寫主程序酝掩。

1)編寫 Elasticsearch 連接文件 "elasticsearch.properties" 鳞芙。

es.serverUri=http://192.168.216.128:9200
es.multiThreaded=true
es.connTimeout=10000
es.readTimeout=10000
es.dateFormat=yyyy-MM-dd HH:mm:ss
es.user=elastic
es.pass=elastic

2)編寫主程序文件 "JestApp.java"。

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.ResourceBundle;

public class JestApp {
    
    public static void main( String[] args ) {
        
        // 獲取 Elasticsearch 連接文件信息
        ResourceBundle resource = ResourceBundle.getBundle("elasticsearch");
        
        String serverUri = resource.getString("serverUri");
        String multiThreaded = resource.getString("multiThreaded");
        String connTimeout = resource.getString("connTimeout");
        String readTimeout = resource.getString("readTimeout");
        String dateFormat = resource.getString("dateFormat");
        String user = resource.getString("user");
        String pass = resource.getString("pass");
        
        // 定義 Elasticsearch 連接工廠的實例
        SearchSessionFactory factory = new SearchSessionFactory();
        
        factory.setServerUri(serverUri);
        factory.setMultiThreaded(Boolean.valueOf(multiThreaded));
        factory.setConnTimeout(Integer.valueOf(connTimeout));
        factory.setReadTimeout(Integer.valueOf(readTimeout));
        factory.setDateFormat(dateFormat);
        factory.setUser(user);
        factory.setPass(pass);
        
        // 定義  Elasticsearch 連接會話
        SearchSession session = new SearchSession(factory);
        
        // 索引名稱
        String index = "pgsql_index";
        
        // 索引文檔類型期虾,固定值
        String type = "_doc";
        
        // 文檔ID
        String id = "document_id";
        
        // 文檔內(nèi)容原朝,一個空文檔。
        Map<String, Object> doc = new LinkedHashMap<String, Object>();
        
        // 創(chuàng)建索引
        session.createIndex(index);
        
        // 刪除索引
        session.deleteIndex(index);
        
        // 驗證索引是否存在
        session.indicesExists(index);
        
        // 打開索引
        session.openIndex(index);
        
        // 關(guān)閉索引
        session.closeIndex(index);
        
        // 插入文檔
        session.insertDocument(doc, id, index, type);
        
        // 更新文檔
        session.updateDocument(doc, id, index, type);
        
        // 刪除文檔
        session.deleteDocument(id, index, type);
        
        // 獲取指定ID的文檔
        session.getDocumentsById(id, index, type);
        
        // 查詢?nèi)繑?shù)據(jù)的前10個文檔
        String query = "{\"query\": {\"match_all\": {}}}";
        session.search(query, 0, 10, index, type);
    }
}

7.3.ELK + Jest 最佳集成方案

Elasticsearch 作為 OLAP 數(shù)據(jù)庫镶苞,它的數(shù)據(jù)一般都是來源于業(yè)務(wù)系統(tǒng)的持久化數(shù)據(jù)庫中喳坠,如:PostgreSQL、MySQL茂蚓、Oracle壕鹉、SQLServer 等 RDBMS 數(shù)據(jù)庫或 MongoDB 等 NoSQL數(shù)據(jù)庫×牵基于類似場景晾浴,建議通過以下方式來完成 Elasticsearch、Kibana牛郑、Logstash怠肋、Jest 的集成使用:

1敬鬓、使用 Kibana 創(chuàng)建和管理 Elasticsearch 模板淹朋。

Elasticsearch 支持通過模板來定義索引格式,模板可以應(yīng)用于索引名稱中包含指定字符的索引钉答,它實現(xiàn)在創(chuàng)建索引時础芍,將索引中包含指定字符的屬性設(shè)置為特定的格式。使用 Kibana 在 Elasticsearch 中一次性創(chuàng)建模板数尿。例如:

PUT _template/pgsql_template
{
  "version": 1,
  "order": 1,
  "index_patterns": [
    "pgsql-*"
  ],
  "settings": {
    "index": {
      "analysis": {
        "analyzer": {
          "ik_smart_analyzer": {
            "tokenizer": "ik_smart"
          }
        }
      },
      "number_of_shards": "3",
      "number_of_replicas": "3",
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "dynamic_templates": [
      {
        "tag_field": {
          "mapping": {
            "norms": false,
            "type": "keyword"
          },
          "match_mapping_type": "string",
          "match": "*_tag"
        }
      },
      {
        "msg_field": {
          "mapping": {
            "anaylzer": "ik_smart_analyzer",
            "norms": false,
            "type": "text"
          },
          "match_mapping_type": "string",
          "match": "*_msg"
        }
      },
      {
        "str_fields": {
          "mapping": {
            "anaylzer": "ik_smart_analyzer",
            "norms": false,
            "type": "text",
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            }
          },
          "match_mapping_type": "string",
          "match": "*"
        }
      }
    ]
  }
}

在上面的模板設(shè)置中仑性,它設(shè)置了以下內(nèi)容:

1)"index_patterns" 中定義將索引名稱前綴為 "pgsql-" 的索引在創(chuàng)建時應(yīng)用此模板。

2) "settings" 中定義了索引使用的分詞器右蹦、分片和刷新時間等參數(shù)诊杆。

3) "mappings" 中定義了以下規(guī)則:

  • 名稱以 "_tag" 結(jié)尾的屬性,其類型為不支持分詞的 keyword 數(shù)組何陆;
  • 名稱以 "_msg" 結(jié)尾的屬性晨汹,其類型為支持中文分詞、拼音分詞的文本贷盲,且對全部分詞進行索引淘这;
  • 其他所有屬性,其類型為支持中文分詞、拼音分詞的 text 文本铝穷,但只對前256個字符進行索引钠怯。

2、使用 Logstash 創(chuàng)建索引和插入曙聂、更新文檔晦炊。

Logstash 可以高效的從數(shù)據(jù)庫層面完成從業(yè)務(wù)數(shù)據(jù)庫抽取數(shù)據(jù),經(jīng)過簡單的過濾處理之后筹陵,向 Elasticsearch 中根據(jù)文檔ID來插入或更新文檔刽锤,但不能物理刪除文檔。在插入文檔時朦佩,如果索引不存在則自動創(chuàng)建索引并思。

3、使用 Jest 刪除索引语稠、文檔宋彼。

因為 Logstash 不能完成物理刪除文檔的處理,因此應(yīng)用在業(yè)務(wù)數(shù)據(jù)庫中進行物理數(shù)據(jù)刪除操作時仙畦,需要使用 Jest 調(diào)用 API 同步將這些數(shù)據(jù)在 Elasticsearch 中刪除输涕。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市慨畸,隨后出現(xiàn)的幾起案子莱坎,更是在濱河造成了極大的恐慌,老刑警劉巖寸士,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件檐什,死亡現(xiàn)場離奇詭異,居然都是意外死亡弱卡,警方通過查閱死者的電腦和手機乃正,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來婶博,“玉大人瓮具,你說我怎么就攤上這事》踩耍” “怎么了名党?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長挠轴。 經(jīng)常有香客問我传睹,道長,這世上最難降的妖魔是什么忠荞? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任蒋歌,我火速辦了婚禮帅掘,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘堂油。我一直安慰自己修档,他們只是感情好,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布府框。 她就那樣靜靜地躺著吱窝,像睡著了一般。 火紅的嫁衣襯著肌膚如雪迫靖。 梳的紋絲不亂的頭發(fā)上院峡,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機與錄音系宜,去河邊找鬼照激。 笑死,一個胖子當著我的面吹牛盹牧,可吹牛的內(nèi)容都是我干的俩垃。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼汰寓,長吁一口氣:“原來是場噩夢啊……” “哼口柳!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起有滑,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤跃闹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后毛好,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體望艺,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年睛榄,在試婚紗的時候發(fā)現(xiàn)自己被綠了荣茫。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片想帅。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡场靴,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出港准,到底是詐尸還是另有隱情旨剥,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布浅缸,位于F島的核電站轨帜,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏衩椒。R本人自食惡果不足惜蚌父,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一哮兰、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧苟弛,春花似錦喝滞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至缤削,卻和暖如春窘哈,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背亭敢。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工滚婉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人帅刀。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓满哪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親劝篷。 傳聞我的和親對象是個殘疾皇子哨鸭,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容