elk安裝部署指南

組件介紹

Elasticsearch

是一個(gè)基于Lucene的搜索服務(wù)器,提供搜集、分析直焙、存儲(chǔ)數(shù)據(jù)三大功能,它提供了一個(gè)分布式多用戶能力的全文搜索引擎砂轻,基于restful web接口奔誓,Elasticsearch是用java開(kāi)發(fā)的,能夠達(dá)到實(shí)時(shí)搜索、穩(wěn)定厨喂、可靠和措、快速,安裝使用方便蜕煌。

Logstash

主要是用來(lái)日志的搜集派阱、分析、過(guò)濾日志的工具斜纪,用于管理日志和事件的工具贫母,你可以用它去搜集日志、轉(zhuǎn)換日志盒刚、解析日志并將它們作為數(shù)據(jù)提供給其他模塊調(diào)用腺劣。

Kibana

是一個(gè)優(yōu)秀的前端日志展示框架,他可以非常詳細(xì)的將日志轉(zhuǎn)換為各種圖表因块,為用戶提供強(qiáng)大的數(shù)據(jù)可視話支持橘原。它能夠搜索、展示存儲(chǔ)在Elasticsearch中索引數(shù)據(jù)涡上,使用它可以很方便的用圖標(biāo)趾断、表格、地圖展示和分析數(shù)據(jù)吓懈。

Kafka

數(shù)據(jù)緩沖隊(duì)列歼冰,作為消息隊(duì)列解耦了處理過(guò)程,同時(shí)提高了可擴(kuò)展性耻警,具有峰值處理能力隔嫡,使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰甘穿。

Filebeat

隸屬于Beats腮恩,是一個(gè)輕量級(jí)數(shù)據(jù)收集引擎,基于原先的Logstash-fowarder的源碼改造温兼,也會(huì)是ELK Stack的第一選擇秸滴。

目前Beats包含四種工具:

  • Packetbeat(搜索網(wǎng)絡(luò)流量數(shù)據(jù))
  • Metricbeat(搜集系統(tǒng)、進(jìn)程和文件系統(tǒng)級(jí)別的CPU和內(nèi)存使用情況等數(shù)據(jù)募判,通過(guò)從操作系統(tǒng)和服務(wù)器收集指標(biāo)荡含,幫助監(jiān)控福區(qū)群以及其托管的服務(wù))
  • Filebeat(收集文件數(shù)據(jù)-常用文件日志收集分析)
  • Winlogbeat(搜集windows事件日志數(shù)據(jù))

版本說(shuō)明

elk相關(guān)的組件版本最好保持一致

  • jdk1.8+
  • elasticsearch: 6.6.1
  • logstash: 6.6.1
  • kibana: 6.6.1
  • Filebeat: 6.6.1
  • kafka: 2.11-1

組件下載地址
官網(wǎng)搭建

組件架構(gòu)圖

elk架構(gòu)圖

依賴組件安裝

jdk安裝

jdk安裝包自行下載解壓

環(huán)境變量配置

linux中jdk環(huán)境變量:
echo '
JAVA_HOME=/usr/local/java
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH
' >>/etc/profile

source /etc/profile

elk部署前環(huán)境的檢查

服務(wù)器設(shè)置成固定ip

通過(guò)命令查看

/etc/sysconfig/network-scripts/ifcfg-ens33
服務(wù)器ip的查看

主機(jī)名修改

方便集群配置時(shí)服務(wù)器的區(qū)分
hostname : 顯示默認(rèn)主機(jī)名

  • 使用vim命令修改”/etc/hostname“主機(jī)名稱文件
  • 將原始主機(jī)名稱刪除后追加”新主機(jī)名
  • 重啟主機(jī)

防火墻是否關(guān)閉

systemctl status firewalld

確保每個(gè)節(jié)點(diǎn)防火墻高狀態(tài)是關(guān)閉的,這樣每個(gè)節(jié)點(diǎn)才能被相互訪問(wèn)

selinux狀態(tài)

通過(guò)命令:getenforce
如果狀態(tài)不是disabled時(shí)通過(guò)下面命令進(jìn)行設(shè)置

vim /etc/selinux/config
selinux=disabled

iptables規(guī)則檢查

不管你在安裝linux時(shí)是否啟動(dòng)了防火墻,如果你想配置屬于自己的防火墻,那就清除現(xiàn)在filter的所有規(guī)則,為了保證后面elk部署節(jié)點(diǎn)通信正常
通過(guò)命令進(jìn)行查看

iptables -nL
iptables

如果不是上圖所示届垫,則需要進(jìn)行iptables規(guī)則的清除

iptables -F      清除預(yù)設(shè)表filter中的所有規(guī)則鏈的規(guī)則
iptables -X      清除預(yù)設(shè)表filter中使用者自定鏈中的規(guī)則

服務(wù)器上文件的傳遞

為了方便各個(gè)服務(wù)器上進(jìn)行文件的拷貝可以進(jìn)行如下設(shè)置释液。

公鑰的生成

ssh-keygen  服務(wù)器公鑰生成,直接回車(chē)即可

ip的解析

多個(gè)服務(wù)器上傳遞該公鑰的時(shí)候可以做ip和主機(jī)名映射装处,方便傳輸

#中添加ip和主機(jī)名的映射如下, 下面操作需要在每個(gè)節(jié)點(diǎn)進(jìn)行配置误债,復(fù)制即可
vim /etc/hosts
172.16.80.159 lph_node1
172.16.80.160 lph_node2

各節(jié)點(diǎn)映射完成后可以進(jìn)行數(shù)據(jù)和文件的拷貝

公鑰的拷貝-各節(jié)點(diǎn)

ssh-copy-id -i #{ip映射的主機(jī)名}
占位符可以用真實(shí)的主機(jī)名替換

軟件包的拷貝

公鑰準(zhǔn)備好之后進(jìn)行組件軟件包的拷貝

// `pwd`獲取軟件包素在路徑,將該軟件拷貝到指定節(jié)點(diǎn)下的對(duì)應(yīng)路徑下,#{占位符}
scp #{kafka_2.11-2.2.2.tgz} lph_client:`pwd` 

elasticsearch安裝部署

在安裝elasticsearch前可以先添加一個(gè)普通用戶用來(lái)后續(xù)elasticsearch的啟動(dòng)操作寝蹈,因?yàn)閑lasticsearch不能用root用戶啟動(dòng)李命。
// 集群環(huán)境下每個(gè)節(jié)點(diǎn)都需要配置
useradd lphelk
// 設(shè)置密碼,密碼可以隨意設(shè)值
echo "1" | passwd --stdin lphelk

安裝包的解壓

// 集群環(huán)境下每個(gè)節(jié)點(diǎn)都需要解壓
tar -zxvf #{安裝包位置} -C #{鎮(zhèn)定解壓到哪個(gè)位置}

配置文件的修改

命令:vim /elk/elasticsearch-6.6.1/config/elasticsearch.yml

# 注釋掉的在是在集群環(huán)境下配置
#cluster.name: clustername
node.name: lph_node
node.master: true
node.data: true
# 需要?jiǎng)?chuàng)建相應(yīng)的數(shù)據(jù)和日志路徑
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
# 方便每個(gè)節(jié)點(diǎn)正常通行
network.host: 0.0.0.0
http.port: 9200
#discovery.zen.ping.unicast.hosts: ["172.16.80.160"]
#discovery.zen.minimum_master_nodes: 1
#discovery.zen.ping_timeout: 150s
#discovery.zen.fd.ping_retries: 10
#client.transprt.ping_timeout: 60s
http.cors.enabled: true
http.cors.allow-origin: "*"

配置項(xiàng)聲明

配置 描述
cluster.name 集群名稱箫老,各個(gè)幾點(diǎn)配置相同的集群名稱封字。
node.name 節(jié)點(diǎn)名稱,各個(gè)節(jié)點(diǎn)配置不同槽惫。
node.master 指示某個(gè)節(jié)點(diǎn)是否符合成為主節(jié)點(diǎn)的條件周叮。
node.data 指示節(jié)點(diǎn)是否為數(shù)據(jù)節(jié)點(diǎn),數(shù)據(jù)節(jié)點(diǎn)包含并管理索引的一部分界斜。
path.data 數(shù)據(jù)存儲(chǔ)目錄仿耽。
path.logs 日志存儲(chǔ)目錄。
bootstrap.memory_lock 內(nèi)存鎖定各薇,是否禁用交換项贺。
bootstrap.system_call_filter 系統(tǒng)調(diào)用過(guò)濾器。
network.host 綁定節(jié)點(diǎn)ip峭判。
http.port rest api端口开缎。
discovery.zen.ping.unicast.hosts 提供其他elasticsearch服務(wù)節(jié)點(diǎn)單點(diǎn)廣播發(fā)現(xiàn)功能。
discovery.zen.minimum_master_nodes 集群中可工作的具有master節(jié)點(diǎn)資格的最下數(shù)量林螃,官方的推薦值是(N/2)+1奕删,其中N是具有master資格的節(jié)點(diǎn)的數(shù)量。
discovery.zen.ping_timeout 節(jié)點(diǎn)在發(fā)現(xiàn)過(guò)程中的等待時(shí)間疗认。
discovery.zen.fd.ping_retries 節(jié)點(diǎn)發(fā)現(xiàn)重試次數(shù)完残。
http.cors.enabled 是否允許跨源rest請(qǐng)求,用允許head插件訪問(wèn)elasticsearch横漏。
http.cors.allow-origin 允許的源地址谨设。

設(shè)置jvm堆大小

可以根據(jù)服務(wù)器本身配置進(jìn)行優(yōu)化

設(shè)置jvm堆大小(根據(jù)服務(wù)器內(nèi)存大小進(jìn)行設(shè)置,用2g替換1g)
sed -i 's/-Xms1g/-Xms2g/' /elk/elasticsearch-6.6.1/config/jvm.options
sed -i 's/-Xmx1g/-Xmx2g/' /elk/elasticsearch-6.6.1/config/jvm.options

注意:
確保堆內(nèi)存最大和最小一樣缎浇,防止程序運(yùn)行時(shí)改變堆內(nèi)存大小

創(chuàng)建ES數(shù)據(jù)及日志存儲(chǔ)目錄

mkdir -pv /data/elasticsearch/data
mkdir -pv /data/elasticsearch/logs

修改安裝目錄及存儲(chǔ)目錄權(quán)限

查看用戶:cat /etc/passwd|grep #{上面創(chuàng)建啟動(dòng)es的普通用戶}

chown -R lphelk:lphelk /data/elasticsearch
chown -R lphelk:lphelk /elk/elasticsearch-6.6.1

elasticsearch啟動(dòng)報(bào)錯(cuò)

elasticsearch啟動(dòng)報(bào)錯(cuò)
elasticsearch啟動(dòng)報(bào)錯(cuò)

上面如果修改了/etc/sysctl.conf文件后需要輸入下面命令進(jìn)行重新加載
sysctl -p

啟動(dòng)elasticsearch

// 切換到es普通用戶扎拣,集群環(huán)境下多個(gè)節(jié)點(diǎn)需要同時(shí)啟動(dòng)(間隔時(shí)間切記太長(zhǎng))
su - lphelk
cd /elk/elasticsearch-6.6.1 
nohup bin/elasticsearch &

kibana安裝部署

安裝包的解壓

tar -zxvf #{安裝包位置} -C #{鎮(zhèn)定解壓到哪個(gè)位置}

配置文件修改

vim /elk/kibana-6.6.1-linux-x86_64/config/kibana.yml
echo '
server.port: 5601
server.host: "172.16.80.159"
elasticsearch.url: "http://172.16.80.159:9200"
kibana.index: ".kibana"
' >> /elk/kibana-6.6.1-linux-x86_64/config/kibana.yml

配置項(xiàng)聲明

配置 描述
server.port kibana服務(wù)端口,默認(rèn)為5601素跺。
server.host kibana主機(jī)ip地址二蓝,默認(rèn)localhost。
elasticsearch.url 用來(lái)做查詢的es節(jié)點(diǎn)的url指厌,默認(rèn)http://localhost:9200刊愚。
kibana.index kibana在elasticsearch中使用的索引來(lái)存儲(chǔ)保存的searches,visualizations和dashboards,默認(rèn)是 .kibana仑乌。

其他配置項(xiàng)可以參考

kibana啟動(dòng)

cd /elk/kibana-6.6.1-linux-x86_64/
nohup ./bin/kibana &

kibana啟動(dòng)后是可以直接訪問(wèn)的百拓,不需要進(jìn)行認(rèn)證,測(cè)試環(huán)境可行

需要認(rèn)證的話可以通過(guò)nginx和nginx中http-tools
http-tools用于生成nginx認(rèn)證訪問(wèn)的用戶密碼文件晰甚,需要可自行百度

kafka安裝部署

本文檔是kafka單節(jié)點(diǎn)部署衙传,集群自行百度。

安裝配置jdk8

由于kafka厕九、zookeeper(簡(jiǎn)稱:zk)運(yùn)行依賴于jdk,jdk已安裝可跳過(guò)蓖捶。

安裝配置zk

kafka運(yùn)行依賴zk,kafka官網(wǎng)提供的tar包中包含了zk,這里不需要額外下載zk包。

安裝包的解壓
tar -zxvf #{安裝包位置} -C #{鎮(zhèn)定解壓到哪個(gè)位置}
配置文件修改
#vim /elk/kafka_2.11-2.2.2/config/zookeeper.properties

# 通過(guò)echo方式注入的話需要將對(duì)應(yīng)配置文件中的相同配置項(xiàng)注釋掉

echo '
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
#kafka集群ip:port
server.1=172.16.80.159:2888:3888
#server.1=172.16.80.160:2888:3888
#server.1=172.16.80.161:2888:3888

'>>/elk/kafka_2.11-2.2.2/config/zookeeper.properties

配置項(xiàng)聲明
配置 描述
dataDir zk數(shù)據(jù)存放目錄扁远。
dataLogDir zk日志存放目錄俊鱼。
clientPort 客戶端連接zk服務(wù)器的端口。
tickTime zk服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時(shí)間間隔畅买。
initLimit 允許follower連接并同步到Leader的初始化連接時(shí)間并闲,以tickTime為單位,當(dāng)舒適化連接時(shí)間超過(guò)該值谷羞,則表示連接失敗帝火。
syncLimit Leader與Follower之間發(fā)生消息時(shí),請(qǐng)求和應(yīng)答時(shí)間長(zhǎng)度湃缎,如果 follower在設(shè)置數(shù)據(jù)啊in內(nèi)不能與leader通信犀填,那么此follower將會(huì)被丟棄。
server.1 2888是follower與leader交換信息的端口嗓违,38888是當(dāng)leader掛了時(shí)用來(lái)執(zhí)行選舉時(shí)服務(wù)器相互通信的端口九巡。
創(chuàng)建數(shù)據(jù)和日志目錄
mkdir -pv /data/zookeeper/data
mkdir -pv /data/zookeeper/logs
創(chuàng)建myid文件
# 每臺(tái)kafka服務(wù)器都需要做成唯一的id,myid中對(duì)應(yīng)的值對(duì)應(yīng)上面配置文件中server.1中的值
echo 1 > /data/zookeeper/data/myid

配置kafka

修改配置文件
修改前可以先將原配置文件中的有效配置注釋掉蹂季,可以通過(guò)下面命令:
sed -i 's/^[^#]/#&/' /elk/kafka_2.11-2.2.2/config/server.properties
echo '
broker.id=1
listeners=PLAINTEXT://172.16.80.159:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
# 單節(jié)點(diǎn)這個(gè)必須為1冕广,否則可能會(huì)導(dǎo)致kafka消費(fèi)失敗
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
#zookeeper.connect=172.16.80.159:2181,172.16.80.160:2181
zookeeper.connect=172.16.80.159:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
' >>/elk/kafka_2.11-2.2.2/config/server.properties

配置項(xiàng)聲明
配置 描述
broker.id 每個(gè)server需要單獨(dú)配置broker id,如果不配置系統(tǒng)會(huì)自動(dòng)配置乏盐,需要和上一步id一致佳窑。
listeners 監(jiān)聽(tīng)地址,格式PLAINTEXT://IP:PORT父能。
num.network.threads 接收和發(fā)送網(wǎng)絡(luò)信息的線程數(shù)神凑。
num.io.threads 服務(wù)器用于處理請(qǐng)求的線程數(shù),其中肯能包括磁盤(pán)i/o何吝。
socket.send.buffer.bytes 套接字服務(wù)器使用的發(fā)送緩沖區(qū)溉委。
socket.receive.buffer.bytes 套接字服務(wù)器使用接收緩沖區(qū)。
socket.request.max.bytes 套接字服務(wù)器請(qǐng)求的最大大邪拧(防止oom)瓣喊。
log.dirs 日志文件目錄。
num.partitions partition數(shù)量黔酥。
num.recovery.threads.per.data.dir 在啟動(dòng)時(shí)恢復(fù)日志藻三、關(guān)閉時(shí)刷盤(pán)日志每個(gè)數(shù)據(jù)目錄的線程的數(shù)量洪橘。默認(rèn)。
offsets.topic.replication.factor 偏移量話題的復(fù)制因子棵帽。
transaction.state.log.replication.factor -
transaction.state.log.min.isr -
log.retention.hours 日志文件刪除之前保留的時(shí)間單位小時(shí)熄求,默認(rèn)168。
log.segment.bytes 單個(gè)日志文件的大小逗概,默認(rèn)1073741824 弟晚。
log.retention.check.interval.ms 檢查日志段以查看是否可以根據(jù)保留策略刪除他們的時(shí)間間隔。
zookeeper.connect zk的主機(jī)地址逾苫,如果zookeeper是集群的話則以逗號(hào)隔開(kāi)卿城。
zookeeper.connection.timeout.ms 接收到zk的超時(shí)時(shí)間。
group.initial.rebalance.delay.ms -
創(chuàng)建log目錄
mkdir -pv /data/kafka/logs
其他節(jié)點(diǎn)配置

針對(duì)集群環(huán)境下铅搓,只需要把配置好的安裝包分發(fā)到其他節(jié)點(diǎn)瑟押,然后修改zk的myid,kafkade broker.id和listeners就可以了,單節(jié)點(diǎn)不用配置這一步星掰。

啟動(dòng)勉耀,驗(yàn)證zk

(1) 啟動(dòng)

cd /elk/kafka_2.11-2.2.2/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

集群的話需要每個(gè)節(jié)點(diǎn)依次執(zhí)行啟動(dòng)操作

(2)驗(yàn)證

yum -y install nc

echo conf | nc 127.0.0.1 2181
zk啟動(dòng)驗(yàn)證

(3)查看zk的狀態(tài)

echo stat | nc 127.0.0.1 2181
查看zk的狀態(tài)
查看zk的狀態(tài)
啟動(dòng),驗(yàn)證kafka

(1) 啟動(dòng)

cd /elk/kafka_2.11-2.2.2
nohup bin/kafka-server-start.sh config/server.properties &

集群環(huán)境下依次啟動(dòng)即可

(2) 驗(yàn)證
在kafka服務(wù)器上創(chuàng)建topic

cd /elk/kafka_2.11-2.2.2

bin/kafka-topics.sh --create --zookeeper 172.16.80.159:2181 --replication-factor 1 --partitions 1 --topic testtopic


查看topic:
bin/kafka-topics.sh --zookeeper 172.16.80.159:2181 --list


kafka集群環(huán)境下的驗(yàn)證topic
監(jiān)控kafka manager

elk中主要收集日志蹋偏,不需要kafka manager便斥,需要的話可以自行官網(wǎng)下載

logstash安裝部署

Logstash運(yùn)行同樣依賴jdk,需要在服務(wù)器上安裝jdk版本。
(1) 安裝

tar -zxvf #{安裝包位置} -C #{指定解壓到哪個(gè)位置}

(2) 配置
創(chuàng)建目錄,我將所有的input威始、filter枢纠、output配置文件全部放到該目錄中。

mkdir -pv /elk/logstash-6.6.1/etc/conf.d

vim /elk/logstash-6.6.1/etc/conf.d/input.conf

input {
  # 從kafka獲取日志數(shù)據(jù)到logstash
  kafka {
    bootstrap_servers => "172.16.4.35:9092"
    auto_offset_reset => "latest"
    consumer_threads => 1
    decorate_events => true
    topics => ["emp_log_topic"]
    codec => "json"
  }
}



vim /elk/logstash-6.6.1/etc/conf.d/output.conf

output {
        # 輸出到elasticsearch中
        elasticsearch {
        index => "emp-log-%{+YYYY-MM-dd}"
        hosts => ["172.16.4.35:9200"]
        }

        # 輸出到控制臺(tái)黎棠,在測(cè)試的時(shí)候可以配置
        stdout {
        codec => rubydebug
        }

}

vim /elk/logstash-6.6.1/etc/conf.d/filter.conf

filter {
  # 過(guò)濾日志中不包含指定信息:【lph123578】的日志輸出到es中晋渺,看需求配置
   if ([message] =~  "^(?!.*?lph123578).*$") {
       drop {}
   }

    # 通過(guò)grok 正則的方式進(jìn)行日志內(nèi)容的提取并賦值到指定的字段上,如下面的messageDate脓斩、appName等
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]+)\] \[(?<thread>[^\[\]]+)\] \[(?<tradeId>[^\[\]]+)\] \[(?<keyword1>[^\[\]]*)\] %{LOGLEVEL:level}" }
    }
}

具體grok配置可以參考下面網(wǎng)頁(yè)
http://www.reibang.com/p/443f1ea7b640
http://www.reibang.com/p/5df5055070b2?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation


(3) 啟動(dòng)

cd /elk/logstash-6.6.1

nohup bin/logstash -f etc/conf.d/ --config.reload.automatic &

(4) 報(bào)錯(cuò)信息
啟動(dòng)logstash有如下錯(cuò)誤時(shí)可以按照下面的方式嘗試解決木西,沒(méi)有報(bào)錯(cuò)既可以跳過(guò)。

logstash報(bào)錯(cuò)信息

解決方案


解決方案

解決方案

Filebeat安裝部署

為什么用Filebeat,而不用原來(lái)的Logstash呢?原因很簡(jiǎn)單随静,每個(gè)服務(wù)上按照一個(gè)logstash對(duì)資源消耗比較大八千。

解壓

tar -zxvf #{安裝包位置} -C #{指定解壓到哪個(gè)位置}

cd /elk/filebeat-6.6.1-linux-x86_64

修改配置

修改filebeat配置,支持收集本地目錄日志,并輸出日志到kafka中。

cd /elk/filebeat
// 之前的filebeat.yml可以備份下
mv filebeat.yml filebeat.yml.bak
vim filebeat.yml

filebeat.prospectors:
- input_type: log
  paths:
    - /usr/local/easymainance/easymaintenance-log-manager-1.0-SNAPSHOT/D:/log/emp-log-manager/config-service.log

 # 解析的是text文本內(nèi)容燎猛,不規(guī)則的
  multiline:
     # 指的是每行以時(shí)間格式開(kāi)始的字符串
     pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' 
     negate: true
     match: after
     max_lines: 2000
     timeout: 2s

  # 解析的是json格式的內(nèi)容
  #json.keys_under_root: true
  # json.add_error_key: true
output.kafka:
  enabled: true
  hosts: ["172.16.4.35:9092"]
  topic: 'emp_log_topic'

Filebeat6.0之后一些配置參數(shù)變動(dòng)比較大恋捆,比如document_type就不支持,需要用fields來(lái)代替等等重绷。

配置項(xiàng)聲明

參數(shù) 描述
json.keys_under_root 可以讓字段位于根節(jié)點(diǎn)沸停,默認(rèn)為false
json.overwrite_keys 對(duì)于同名的key,覆蓋原有key值
json.message_key -
json.add_error_key 將錯(cuò)誤消息記錄存儲(chǔ)在error.message字段中

檢查配置是否正確

cd /elk/filebeat
./filebeat -c filebeat.yml -configtest

啟動(dòng)

cd /elk/filebeat

nohup ./filebeat -e -c filebeat.yml &

日志收集

日志格式

%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appname:-}] [%thread] [%X{traceId:-}] [%X{keyword1:-}] %-5level %logger{50}:%L - %msg%n

格式說(shuō)明

  • %date{yyyy-MM-dd HH:mm:ss.SSS}:日志的時(shí)間2020-11-16 17:14:58.661
  • %X{appname:-} : 應(yīng)用服務(wù)名稱
  • %thread: 輸出產(chǎn)生該日志事件的線程名
  • %X{traceId:-} : 自定義流程追蹤Id
  • %X{keyword1:-}: 關(guān)鍵字1
  • %-5level: 輸出日志級(jí)別昭卓,-5表示左對(duì)齊并且固定輸出5個(gè)字符如果不足在右邊補(bǔ)空格愤钾,類型有DEBUG,INFO,WARN,ERROR
  • %logger{50}: 為輸出日志的class命名 限定長(zhǎng)度50
  • %msg%n:輸出的日志內(nèi)容

日志格式輸出內(nèi)容展示

2020-11-16 17:14:58.661 [emp-system-manager] [main] [tradeId] [keyword1] INFO com.hzsun.emp.config.ConfigApplication:61 - Started ConfigApplication in 8.757 seconds (JVM running for 9.172)

日志埋點(diǎn)

單服務(wù)\單節(jié)點(diǎn)日志埋點(diǎn)

package com.hzsun.emp.aop;

import com.hzsun.emp.constant.FileLogContants;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.UUID;

// 處理的單服務(wù)日志埋點(diǎn)
@Aspect
@Component
public class FileLogAspect {

    public static final String exepress = "execution(public * com.hzsun.emp.*.controllers.*.*(..))";

    @Value("${spring.application.name}")
    private String applicationName;

    @Pointcut(exepress)
    public void setUpBuriedPoint() {
    }

    @Around("setUpBuriedPoint()")
    public Object markFileLog(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        System.err.println("服務(wù)名稱:"+ applicationName);
        // 日志埋點(diǎn)
        insertMDC();
        Object result = proceedingJoinPoint.proceed();
        MDC.clear();

        return result;
    }

    private void insertMDC() {
        String traceId = UUID.randomUUID().toString().replace("-", "");
        MDC.put(FileLogContants.TRADE_ID, traceId + FileLogContants.FILTER_FIELD);
        MDC.put(FileLogContants.SERVICE_NAME, applicationName);
        MDC.put(FileLogContants.APP_NAME, FileLogContants.EASY_MAINTENANCE);
    }
}

分布式服務(wù)\多服務(wù)調(diào)用日志埋點(diǎn)

LogTrackInterceptorConfig.class
LogTrackInterceptor.class
EmpFeignConfig.class
LogService.class
package com.hzsun.emp.jwt;

import interceptor.LogTrackInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
 * 添加攔截器
 * @author lph
 */
@Configuration
public class LogTrackInterceptorConfig implements WebMvcConfigurer {

   @Bean
   public LogTrackInterceptor getLogTrackInterceptor() {
       return new LogTrackInterceptor();
   }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        /**
         *  根據(jù)具體情況設(shè)置
         *  LoginInterceptor是自定義的攔截器
         *  addPathPatterns參數(shù)/**是通配符表示攔截所有的請(qǐng)求
         *  excludePathPatterns方法的參數(shù)是可變參數(shù),可以輸n個(gè)字符串類型的參數(shù),用來(lái)添加白名單
         */
        String[] excludePath = {
                "/doc.html",
                "/swagger-resources/**",
                "/webjars/**",
                "/v2/**",
                "/swagger-ui.html/**"
        };
        registry.addInterceptor(getLogTrackInterceptor()).addPathPatterns("/**").excludePathPatterns(excludePath);
    }
}

package interceptor;

import com.hzsun.emp.constant.FileLogContants;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Optional;
import java.util.UUID;


/**
 * 日志追蹤攔截器瘟滨,feign服務(wù)間調(diào)用確保traceId一致
 *
 * @version V1.0
 * @author: lph
 */
@Component
public class LogTrackInterceptor implements HandlerInterceptor {
    private static final ThreadLocal<String> TRACE_ID_THREAD_LOCAL = new ThreadLocal<>();

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 請(qǐng)求頭中不存在則生成
        String traceId = Optional.ofNullable(request.getHeader(FileLogContants.TRACE_ID)).orElse(UUID.randomUUID().toString().replaceAll("-", ""));

        TRACE_ID_THREAD_LOCAL.set(traceId);
        MDC.put(FileLogContants.TRACE_ID, getTraceId());
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 完成后移除,防止內(nèi)存占用過(guò)大
        TRACE_ID_THREAD_LOCAL.remove();
        MDC.clear();
    }

    public static String getTraceId() {
        return TRACE_ID_THREAD_LOCAL.get();
    }

    public static void setTraceId(String traceId) {
        TRACE_ID_THREAD_LOCAL.set(traceId);
    }
}

package com.hzsun.emp.configuration;

import com.hzsun.emp.constant.FileLogContants;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import interceptor.LogTrackInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.util.Optional;
import java.util.UUID;

/**
 * Feign調(diào)用時(shí)添加追蹤id到請(qǐng)求頭
 *
 * @author :lph
 * @version: 1.0
 */
@Configuration
public class EmpFeignConfig implements RequestInterceptor {

    @Override
    public void apply(RequestTemplate requestTemplate) {
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        // 非空校驗(yàn)的目的是防止在啟動(dòng)類中通過(guò)feign調(diào)用第三方接口時(shí)拋出空指針異常
        if (attributes != null) {
            // 將traceId傳遞到下個(gè)服務(wù)
            String traceId = Optional.ofNullable(LogTrackInterceptor.getTraceId()).orElse(UUID.randomUUID().toString().replaceAll("-",""));
            requestTemplate.header(FileLogContants.TRACE_ID, traceId);
        }
    }
}

package com.hzsun.emp.serviceapi;

import com.hzsun.emp.configuration.EmpFeignConfig;
import com.hzsun.emp.pojo.po.ScOperationLogsPO;
import com.hzsun.emp.response.HZSunResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;

/**
 * feign調(diào)用時(shí)在注解中配置
 * value: 指的是被調(diào)用方項(xiàng)目在nacos唯一標(biāo)識(shí)  configuration:指的是feign請(qǐng)求的攔截器
 * @FeignClient(value = "emp-log-manager", configuration = EmpFeignConfig.class)
 * @author :lph
 * @version: 1.0
 */
@FeignClient(value = "emp-log-manager", configuration = EmpFeignConfig.class)
public interface LogService {

    /**
     * 獲取數(shù)據(jù)并插入消息記錄表
     *
     * @param logsPo 封裝請(qǐng)求參數(shù)對(duì)象
     */
    @PostMapping("/logmanager/log/insert")
    HZSunResponse insertLog(ScOperationLogsPO logsPo);
}

grok提取日志內(nèi)容

在vim /elk/logstash-6.6.1/etc/conf.d/filter.conf配置

方式一:
處理的日志格式:
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appName:-${app_name}}] [%X{serviceName:-${processName}}] [%thread|%X{traceId:-}] [%X{key1:-}] [%X{key2:-}] %-5level %logger{50}:%L - %msg%n

grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]+)\] \[(?<thread>[^\[\]]+)\] \[(?<tradeId>[^\[\]]+)\] \[(?<keyword1>[^\[\]]*)\] %{LOGLEVEL:level}" }
    }
方式二:
處理的日志格式:
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appName:-${app_name}}|%X{serviceName:-${processName}}|%thread|%X{traceId:-}|%X{key1:-}|%X{key2:-}] %-5level %logger{50}:%L - %msg%n

grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]*)\|(?<serviceName>[^\[\]]*)\|(?<thread>[^\[\]]*)\|(?<traceId>[^\[\]]*)\|(?<key1>[^\[\]]*)\|(?<key2>[^\[\]]*)\] %{LOGLEVEL:level}"}
    }

# 可以選擇性的使用remove_field去掉原始內(nèi)容message這一行的信息能颁。
mutate{
            remove_field => ["message","timestamp"]
        }
kibana中對(duì)grok中filter的測(cè)試
es中日志在kibana中展示

springboot整合elasticsearch

導(dǎo)入依賴

 <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>6.6.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>transport</artifactId>
                <version>6.6.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client-->
            <dependency>
                <groupId>org.elasticsearch.plugin</groupId>
                <artifactId>transport-netty4-client</artifactId>
                <version>6.6.1</version>
            </dependency>

常量類

public class FileLogContants {
    public static final String APP_NAME = "appName";
    public static final String SERVICE_NAME = "serviceName";
    public static final String TRADE_ID = "tradeId";
    public static final String KEY_WORD_1 = "keyword1";
    public static final String EASY_MAINTENANCE = "易維";
    public static final String FILTER_FIELD = "lph123578";
    public static final String MESSAGEDATE = "messageDate";

    // 通配符
    public static final String STARS_SYMBOL = "*";

    // 排序
    public static final String TIMESTAMP = "@timestamp";

    // total用于es記錄總數(shù)的查詢
    public static final Integer TOTAL = 0;

    // 關(guān)鍵字標(biāo)識(shí)室奏,es字符串凡是標(biāo)識(shí)為關(guān)鍵字則不會(huì)進(jìn)行分詞處理
    public static final String ES_KEYWORD_SYMBOL = ".keyword";

}

elasticsearch配置類

package com.hzsun.emp.log.config;

import com.hzsun.emp.utils.StringUtil;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;

@Configuration
public class ElasticSearchConfig {

    public static final Logger logger = LoggerFactory.getLogger(ElasticSearchConfig.class);

    @Value("${elasticsearch.host}")
    private String host;

    @Value("${elasticsearch.port}")
    private Integer port;

    @Value("${elasticsearch.cluster-name}")
    private String clusterName;

    private TransportClient transportClient;

    @Bean
    public TransportClient transportClient(){
        Settings settings = Settings.EMPTY;
        if(!StringUtil.isEmpty(clusterName)){
            settings = Settings.builder()
                    .put("cluster.name", clusterName)
                    .build();
        }
        try {
            transportClient = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
            // 集群環(huán)境可以嘗試用addTransportAddresses()方法
        } catch (UnknownHostException e) {
            logger.error("創(chuàng)建elasticsearch客戶端失敗");
            e.printStackTrace();
        }
        logger.info("創(chuàng)建elasticsearch客戶端成功");
        return transportClient;
    }


    @Bean
    public BulkProcessor bulkProcessor() throws UnknownHostException {

        Settings settings = Settings.EMPTY;
        if(!StringUtil.isEmpty(clusterName)){
            settings = Settings.builder()
                    .put("cluster.name", clusterName)
                    .build();
        }

        TransportClient transportClient = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));

        return BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
            }

        }).setBulkActions(1000)//分批,每10000條請(qǐng)求當(dāng)成一批請(qǐng)求劲装。默認(rèn)值為1000
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//每次5MB,刷新一次bulk昌简。默認(rèn)為5m
                .setFlushInterval(TimeValue.timeValueSeconds(5))//每5秒一定執(zhí)行占业,不管已經(jīng)隊(duì)列積累了多少。默認(rèn)不設(shè)置這個(gè)值
                .setConcurrentRequests(1)//設(shè)置并發(fā)請(qǐng)求數(shù)纯赎,如果是0谦疾,那表示只有一個(gè)請(qǐng)求就可以被執(zhí)行,如果為1犬金,則可以積累并被執(zhí)行念恍。默認(rèn)為1.
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//這里有個(gè)backoff策略,最初等待100ms,然后按照指數(shù)增長(zhǎng)并重試3次晚顷。每當(dāng)一個(gè)或者多個(gè)bulk請(qǐng)求失敗,并出現(xiàn)EsRejectedExecutionException異常時(shí).就會(huì)嘗試重試峰伙。這個(gè)異常表示用于處理請(qǐng)求的可用計(jì)算資源太少。如果要禁用這個(gè)backoff策略该默,需要用backoff.nobackoff()瞳氓。
                .build();
    }

    @PostConstruct
    void init() {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
    }

}

elasticsearch檢索

package com.hzsun.emp.log.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hzsun.emp.constant.FileLogContants;
import com.hzsun.emp.log.bean.FileLogDTO;
import com.hzsun.emp.log.bean.FileLogQO;
import com.hzsun.emp.log.service.IFileLogService;
import com.hzsun.emp.utils.DateUtil;
import com.hzsun.emp.utils.ObjectMapperUtil;
import com.hzsun.emp.utils.StringUtil;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

@Service
public class FileLogServiceImpl implements IFileLogService {

    public static final Logger logger = LoggerFactory.getLogger(FileLogServiceImpl.class);

    @Autowired
    private TransportClient transportClient;

    @Value("${elasticsearch.index}")
    private String elasticIndex;

    @Value("${elasticsearch.type}")
    private String elasticType;

    @Override
    public JSONObject selectLogFromElastic(FileLogQO logQo) {
        logger.info("進(jìn)行日志埋點(diǎn)");
        // 進(jìn)行日志埋點(diǎn)操作
        MDC.put(FileLogContants.KEY_WORD_1, "關(guān)鍵字");

        // 根據(jù)傳入的參數(shù)進(jìn)行查詢條件的拼接
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
        if (!StringUtil.isEmpty(logQo.getAppName())) {
            // termQuery中參數(shù)加上FileLogContants.ES_KEYWORD_SYMBOL是防止es分詞器進(jìn)行分詞影響查詢
            queryBuilder.must(QueryBuilders.termQuery(FileLogContants.APP_NAME + FileLogContants.ES_KEYWORD_SYMBOL, logQo.getAppName()));
        }
        if (!StringUtil.isEmpty(logQo.getServiceName())) {
            queryBuilder.must(QueryBuilders.termQuery(FileLogContants.SERVICE_NAME + FileLogContants.ES_KEYWORD_SYMBOL, logQo.getServiceName()));
        }
        if (!StringUtil.isEmpty(logQo.getKeyword1())) {
            queryBuilder.must(QueryBuilders.wildcardQuery(FileLogContants.KEY_WORD_1, FileLogContants.STARS_SYMBOL + logQo.getKeyword1() + FileLogContants.STARS_SYMBOL));
        }
        if (!StringUtil.isEmpty(logQo.getTradeId())) {
            queryBuilder.must(QueryBuilders.wildcardQuery(FileLogContants.TRADE_ID, FileLogContants.STARS_SYMBOL + logQo.getTradeId() + FileLogContants.STARS_SYMBOL));
        }
        if (!StringUtil.isEmpty(logQo.getBeginTime()) && !StringUtil.isEmpty(logQo.getEndTime())) {
            // rangeQuery中參數(shù)加上FileLogContants.ES_KEYWORD_SYMBOL是防止es分詞器進(jìn)行分詞影響時(shí)間范圍查詢
            queryBuilder.must(QueryBuilders.rangeQuery(FileLogContants.MESSAGEDATE + FileLogContants.ES_KEYWORD_SYMBOL).from(logQo.getBeginTime()).to(logQo.getEndTime()));
        }

        int offset = logQo.getOffset();
        SearchRequestBuilder builder = transportClient.prepareSearch(elasticIndex).setTypes(elasticType)
                .setQuery(queryBuilder)
                .setFrom(offset)
                .setSize(logQo.getPageSize())
                // 根據(jù)指定字段進(jìn)行排序
                .addSort(FileLogContants.TIMESTAMP, SortOrder.DESC)
                //explain為true表示根據(jù)數(shù)據(jù)相關(guān)度排序,和關(guān)鍵字匹配最高的排在前面
                .setExplain(true);


        List<FileLogDTO> result = new ArrayList<>();
        SearchResponse searchResponse = builder.get();

        for (SearchHit hit : searchResponse.getHits()) {
            String messageJson = ObjectMapperUtil.writeValueAsString(hit.getSourceAsMap());
            FileLogDTO fileLogDTO = ObjectMapperUtil.readValue(messageJson, FileLogDTO.class);
            assert fileLogDTO != null;
            fileLogDTO.setId(hit.getId());
            result.add(fileLogDTO);
        }

        JSONObject object = new JSONObject();
        object.put("file_log_list", result);
        return object;
    }

    @Override
    public long getTotalSize() {
        // 查詢總條數(shù)
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        SearchHits hits = transportClient.prepareSearch(elasticIndex).setTypes(elasticType)
                .setQuery(boolQueryBuilder)
                // 設(shè)置為0的時(shí)候即查詢所有數(shù)據(jù)總數(shù)
                .setSize(FileLogContants.TOTAL)
                .get().getHits();

        return hits.getTotalHits();
    }
}

備注

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末栓袖,一起剝皮案震驚了整個(gè)濱河市匣摘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌裹刮,老刑警劉巖音榜,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異捧弃,居然都是意外死亡赠叼,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門(mén)违霞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)梅割,“玉大人,你說(shuō)我怎么就攤上這事葛家』Т牵” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵癞谒,是天一觀的道長(zhǎng)底燎。 經(jīng)常有香客問(wèn)我刃榨,道長(zhǎng),這世上最難降的妖魔是什么双仍? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任枢希,我火速辦了婚禮,結(jié)果婚禮上朱沃,老公的妹妹穿的比我還像新娘苞轿。我一直安慰自己,他們只是感情好逗物,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布搬卒。 她就那樣靜靜地躺著,像睡著了一般翎卓。 火紅的嫁衣襯著肌膚如雪契邀。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,301評(píng)論 1 301
  • 那天失暴,我揣著相機(jī)與錄音坯门,去河邊找鬼。 笑死逗扒,一個(gè)胖子當(dāng)著我的面吹牛古戴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播矩肩,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼允瞧,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了蛮拔?” 一聲冷哼從身側(cè)響起述暂,我...
    開(kāi)封第一講書(shū)人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎建炫,沒(méi)想到半個(gè)月后畦韭,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肛跌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年艺配,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片衍慎。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡转唉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤对雪,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布馏鹤,位于F島的核電站砖织,受9級(jí)特大地震影響款侵,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜侧纯,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一新锈、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧眶熬,春花似錦妹笆、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至牍白,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間抖棘,已是汗流浹背茂腥。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留切省,地道東北人最岗。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像朝捆,于是被迫代替她去往敵國(guó)和親般渡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容