kafka丟數(shù)據(jù)問(wèn)題分析

大致可以通過(guò)上述情況進(jìn)行排除

1.?kafka服務(wù)器問(wèn)題

查看日志是否有報(bào)錯(cuò),網(wǎng)絡(luò)訪問(wèn)問(wèn)題等纽绍。

2. kafka producter 發(fā)送問(wèn)題

程序通過(guò)消息回調(diào)查詢發(fā)送狀態(tài)粘衬,是否有異常信息

import org.apache.kafka.clients.producer.*;

import org.apache.kafka.common.serialization.StringSerializer;

import org.apache.log4j.Logger;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.stereotype.Service;

import java.util.Properties;

import java.util.concurrent.Future;

/**

* KAFKA Producer

*/

public class ApmKafkaProducer {

private static Loggerlogger = Logger.getLogger("ApmKafkaProducer.class");

? ? ? ? public? static Producerproducer;

? ? @Autowired

? ? public DisConfigconfig;

? ? public ApmKafkaProducer(String broker) {

Properties props =new Properties();

? ? ? ? props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,broker);

? ? ? ? props.put(ProducerConfig.ACKS_CONFIG, "all");

? ? ? ? props.put(ProducerConfig.RETRIES_CONFIG, 10);

? ? ? ? props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 10000);

? ? ? ? //成批處理

? ? ? ? props.put(ProducerConfig.BATCH_SIZE_CONFIG, 250);

? ? ? ? props.put(ProducerConfig.LINGER_MS_CONFIG, 60000);

? ? ? ? props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

? ? ? ? props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 3600000);

? ? ? ? props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

? ? ? ? props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152);

? ? ? ? props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");

? ? ? ? props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");

? ? ? ? props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

? ? ? ? props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

? ? ? ? producer =new KafkaProducer<>(props);

? ? }

/**

*? 發(fā)送 KAFKA 方法

? ? * @param

? ? * @param

? ? */

? ? public void sendMsg(String topic,String key, String value) {

if(DisConfig.isDebug) {

logger.info("發(fā)送:topic:" + topic +" key:" + key +" value:" + value);

? ? ? ? }

long startTime = System.currentTimeMillis();

? ? ? ? try {

logger.info("producer:"+producer);

? ? ? ? ? ? //異步處理

? ? ? ? ? ? producer.send(new ProducerRecord(topic, key, value),new ProducerAckCallback(startTime,key,value,topic));

? ? ? ? ? ? if(DisConfig.isDebug) {

logger.info("發(fā)送成功:" + topic +" key:" + key +" value:" + value);

? ? ? ? ? ? }

}catch (Throwable ex) {

/**

* 當(dāng)生產(chǎn)者出現(xiàn)異常時(shí)

*/

? ? ? ? ? ? producer.close();

? ? ? ? ? ? logger.error("發(fā)送異常:topic:" + topic +" key:" + key +" value:" + value);

? ? ? ? ? ? logger.error(ex);

? ? ? ? ? ? ex.printStackTrace();

? ? ? ? }

}

static class?ProducerAckCallbackimplements?Callback {

private final LongstartTime;

? ? ? ? private final Stringtopic;

? ? ? ? private final Stringkey;

? ? ? ? private final Stringvalue;

? ? ? ? private Stringmessage ="";

? ? ? ? public ProducerAckCallback(Long startTime, String key, String value, String topic) {

this.startTime = startTime;

? ? ? ? ? ? this.key = key;

? ? ? ? ? ? this.value = value;

? ? ? ? ? ? this.topic = topic;

? ? ? ? }

public?ProducerAckCallback(Long startTime, String key, String value, String topic,String message) {

this.startTime = startTime;

? ? ? ? ? ? this.key = key;

? ? ? ? ? ? this.value = value;

? ? ? ? ? ? this.topic = topic;

? ? ? ? ? ? this.message = message;

? ? ? ? }

@Override

public void?onCompletion(RecordMetadata metadata, Exception e) {

long spendTime = System.currentTimeMillis() -startTime;

? ? ? ? ? ? if (null != metadata) {

logger.info("消息回調(diào)花費(fèi)時(shí)間 (" +key +"," +value +")" +

"send to topic (" +topic +

") and partition (" + metadata.partition()

+") and offset (" + metadata.offset() +") and spend? (" + spendTime +" ms)" +

(message.equals("") ?"":"retry")

);

? ? ? ? ? ? }

if(e!=null) {

logger.error("嘗試重發(fā)------------------------------------------->");

? ? ? ? ? ? ? ? producer.send(new ProducerRecord(topic, key, value),new ProducerAckCallback(startTime,key,value,topic,"retry"));

? ? ? ? ? ? ? ? logger.error("消息嘗試重發(fā)花費(fèi)時(shí)間 (" +key +"," +value +")" +

"send to topic (" +topic +

") and partition (" + metadata.partition()

+") and offset (" + metadata.offset() +") and spend? (" + spendTime +" ms)"

? ? ? ? ? ? ? ? );

? ? ? ? ? ? ? ? logger.error("回調(diào)異常:" + e);

? ? ? ? ? ? }

}

}

}

3. 配置問(wèn)題

配置問(wèn)題有很多橄仆,服務(wù)器問(wèn)題和發(fā)送參數(shù)問(wèn)題摧玫,配置可以參考:

http://orchome.com/451

4. 消息長(zhǎng)度問(wèn)題

當(dāng)消息發(fā)送的時(shí)候如果key和value的長(zhǎng)度過(guò)長(zhǎng)也會(huì)導(dǎo)致消息的丟失耳奕,

大家如果有消息大量丟失的話,可以通過(guò)程序發(fā)送kafka測(cè)試诬像。

5. topic參數(shù)

topic創(chuàng)建的時(shí)候會(huì)有兩個(gè)參數(shù)比較重要參數(shù)

partitions(3)屋群,?replication(> 2)

響應(yīng)的配置請(qǐng)根據(jù)集群的實(shí)際情況進(jìn)行配置

如果有消息異常的話可以參考topic信息

./kafka-topics.sh desc --zookeeper localhost:2181看看kafka的topic信息有沒(méi)有異常

差不多就是kafka丟失消息的大部分原因,希望對(duì)分析問(wèn)題有所幫助颅停。


-----------------------------------------------華麗的分割線----------------------------------------

大致了解kafka相關(guān)

kafka入門:簡(jiǎn)介谓晌、使用場(chǎng)景、設(shè)計(jì)原理癞揉、主要配置及集群搭建(轉(zhuǎn))

一、入門

? ? 1溺欧、簡(jiǎn)介

Kafka is a distributed,partitioned,replicated commit logservice喊熟。它提供了類似于JMS的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同姐刁,此外它并不是JMS規(guī)范的實(shí)現(xiàn)芥牌。kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類,發(fā)送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個(gè)kafka實(shí)例組成聂使,每個(gè)實(shí)例(server)成為broker壁拉。無(wú)論是kafka集群谬俄,還是producer和consumer都依賴于zookeeper來(lái)保證系統(tǒng)可用性集群保存一些meta信息。


? ?2弃理、Topics/logs

? ? 一個(gè)Topic可以認(rèn)為是一類消息溃论,每個(gè)topic將被分成多個(gè)partition(區(qū)),每個(gè)partition在存儲(chǔ)層面是append log文件。任何發(fā)布到此partition的消息都會(huì)被直接追加到log文件的尾部痘昌,每條消息在文件中的位置稱為offset(偏移量)钥勋,offset為一個(gè)long型數(shù)字,它是唯一標(biāo)記一條消息辆苔。它唯一的標(biāo)記一條消息算灸。kafka并沒(méi)有提供其他額外的索引機(jī)制來(lái)存儲(chǔ)offset,因?yàn)樵趉afka中幾乎不允許對(duì)消息進(jìn)行“隨機(jī)讀寫”驻啤。



? ? kafka和JMS(Java Message Service)實(shí)現(xiàn)(activeMQ)不同的是:即使消息被消費(fèi),消息仍然不會(huì)被立即刪除.日志文件將會(huì)根據(jù)broker中的配置要求,保留一定的時(shí)間之后刪除;比如log文件保留2天,那么兩天后,文件會(huì)被清除,無(wú)論其中的消息是否被消費(fèi).kafka通過(guò)這種簡(jiǎn)單的手段,來(lái)釋放磁盤空間,以及減少消息消費(fèi)之后對(duì)文件內(nèi)容改動(dòng)的磁盤IO開支.


? ? 對(duì)于consumer而言,它需要保存消費(fèi)消息的offset,對(duì)于offset的保存和使用,有consumer來(lái)控制;當(dāng)consumer正常消費(fèi)消息時(shí),offset將會(huì)"線性"的向前驅(qū)動(dòng),即消息將依次順序被消費(fèi).事實(shí)上consumer可以使用任意順序消費(fèi)消息,它只需要將offset重置為任意值..(offset將會(huì)保存在zookeeper中,參見(jiàn)下文)


kafka集群幾乎不需要維護(hù)任何consumer和producer狀態(tài)信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實(shí)現(xiàn)非常輕量級(jí),它們可以隨意離開,而不會(huì)對(duì)集群造成額外的影響.


partitions的設(shè)計(jì)目的有多個(gè).最根本原因是kafka基于文件存儲(chǔ).通過(guò)分區(qū),可以將日志內(nèi)容分散到多個(gè)server上,來(lái)避免文件尺寸達(dá)到單機(jī)磁盤的上限,每個(gè)partiton都會(huì)被當(dāng)前server(kafka實(shí)例)保存;可以將一個(gè)topic切分多任意多個(gè)partitions,來(lái)消息保存/消費(fèi)的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力.(具體原理參見(jiàn)下文).


? ? 3菲驴、Distribution

? ? 一個(gè)Topic的多個(gè)partitions,被分布在kafka集群中的多個(gè)server上;每個(gè)server(kafka實(shí)例)負(fù)責(zé)partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個(gè)數(shù)(replicas),每個(gè)partition將會(huì)被備份到多臺(tái)機(jī)器上,以提高可用性.


基于replicated方案,那么就意味著需要對(duì)多個(gè)備份進(jìn)行調(diào)度;每個(gè)partition都有一個(gè)server為"leader";leader負(fù)責(zé)所有的讀寫操作,如果leader失效,那么將會(huì)有其他follower來(lái)接管(成為新的leader);follower只是單調(diào)的和leader跟進(jìn),同步消息即可..由此可見(jiàn)作為leader的server承載了全部的請(qǐng)求壓力,因此從集群的整體考慮,有多少個(gè)partitions就意味著有多少個(gè)"leader",kafka會(huì)將"leader"均衡的分散在每個(gè)實(shí)例上,來(lái)確保整體的性能穩(wěn)定.


? ??Producers

? ? Producer將消息發(fā)布到指定的Topic中,同時(shí)Producer也能決定將此消息歸屬于哪個(gè)partition;比如基于"round-robin"方式或者通過(guò)其他的一些算法等.


? ??Consumers

? ? 本質(zhì)上kafka只支持Topic.每個(gè)consumer屬于一個(gè)consumer group;反過(guò)來(lái)說(shuō),每個(gè)group中可以有多個(gè)consumer.發(fā)送到Topic的消息,只會(huì)被訂閱此Topic的每個(gè)group中的一個(gè)consumer消費(fèi).


? ? 如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會(huì)在consumers之間負(fù)載均衡.

? ? 如果所有的consumer都具有不同的group,那這就是"發(fā)布-訂閱";消息將會(huì)廣播給所有的消費(fèi)者.

? ? 在kafka中,一個(gè)partition中的消息只會(huì)被group中的一個(gè)consumer消費(fèi);每個(gè)group中consumer消息消費(fèi)互相獨(dú)立;我們可以認(rèn)為一個(gè)group是一個(gè)"訂閱"者,一個(gè)Topic中的每個(gè)partions,只會(huì)被一個(gè)"訂閱者"中的一個(gè)consumer消費(fèi),不過(guò)一個(gè)consumer可以消費(fèi)多個(gè)partitions中的消息.kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí),消息是順序的.事實(shí)上,從Topic角度來(lái)說(shuō),消息仍不是有序的.


kafka的設(shè)計(jì)原理決定,對(duì)于一個(gè)topic,同一個(gè)group中不能有多于partitions個(gè)數(shù)的consumer同時(shí)消費(fèi),否則將意味著某些consumer將無(wú)法得到消息.


? ??Guarantees

? ? 1) 發(fā)送到partitions中的消息將會(huì)按照它接收的順序追加到日志中

? ? 2) 對(duì)于消費(fèi)者而言,它們消費(fèi)消息的順序和日志中消息順序一致.

? ? 3) 如果Topic的"replicationfactor"為N,那么允許N-1個(gè)kafka實(shí)例失效.


二、使用場(chǎng)景


? ? 1骑冗、Messaging? ?

? ? 對(duì)于一些常規(guī)的消息系統(tǒng),kafka是個(gè)不錯(cuò)的選擇;partitons/replication和容錯(cuò),可以使kafka具有良好的擴(kuò)展性和性能優(yōu)勢(shì).不過(guò)到目前為止,我們應(yīng)該很清楚認(rèn)識(shí)到,kafka并沒(méi)有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息確認(rèn)機(jī)制)""消息分組"等企業(yè)級(jí)特性;kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對(duì)可靠(比如,消息重發(fā),消息發(fā)送丟失等)


? ? 2赊瞬、Websit activity tracking

? ? kafka可以作為"網(wǎng)站活性跟蹤"的最佳工具;可以將網(wǎng)頁(yè)/用戶操作等信息發(fā)送到kafka中.并實(shí)時(shí)監(jiān)控,或者離線統(tǒng)計(jì)分析等


? ? 3、Log Aggregation

? ? kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對(duì)producer端而言,幾乎感覺(jué)不到性能的開支.此時(shí)consumer端可以使hadoop等其他系統(tǒng)化的存儲(chǔ)和分析系統(tǒng).


三沐旨、設(shè)計(jì)原理


kafka的設(shè)計(jì)初衷是希望作為一個(gè)統(tǒng)一的信息收集平臺(tái),能夠?qū)崟r(shí)的收集反饋信息,并需要能夠支撐較大的數(shù)據(jù)量,且具備良好的容錯(cuò)能力.


? ? 1森逮、持久性

? ? kafka使用文件存儲(chǔ)消息,這就直接決定kafka在性能上嚴(yán)重依賴文件系統(tǒng)的本身特性.且無(wú)論任何OS下,對(duì)文件系統(tǒng)本身的優(yōu)化幾乎沒(méi)有可能.文件緩存/直接內(nèi)存映射等是常用的手段.因?yàn)閗afka是對(duì)日志文件進(jìn)行append操作,因此磁盤檢索的開支是較小的;同時(shí)為了減少磁盤寫入的次數(shù),broker會(huì)將消息暫時(shí)buffer起來(lái),當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再flush到磁盤,這樣減少了磁盤IO調(diào)用的次數(shù).

2、性能

? ? 需要考慮的影響性能點(diǎn)很多,除磁盤IO之外,我們還需要考慮網(wǎng)絡(luò)IO,這直接關(guān)系到kafka的吞吐量問(wèn)題.kafka并沒(méi)有提供太多高超的技巧;對(duì)于producer端,可以將消息buffer起來(lái),當(dāng)消息的條數(shù)達(dá)到一定閥值時(shí),批量發(fā)送給broker;對(duì)于consumer端也是一樣,批量fetch多條消息.不過(guò)消息量的大小可以通過(guò)配置文件來(lái)指定.對(duì)于kafka broker端,似乎有個(gè)sendfile系統(tǒng)調(diào)用可以潛在的提升網(wǎng)絡(luò)IO的性能:將文件的數(shù)據(jù)映射到系統(tǒng)內(nèi)存中,socket直接讀取相應(yīng)的內(nèi)存區(qū)域即可,而無(wú)需進(jìn)程再次copy和交換. 其實(shí)對(duì)于producer/consumer/broker三者而言,CPU的開支應(yīng)該都不大,因此啟用消息壓縮機(jī)制是一個(gè)良好的策略;壓縮需要消耗少量的CPU資源,不過(guò)對(duì)于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮.可以將任何在網(wǎng)絡(luò)上傳輸?shù)南⒍冀?jīng)過(guò)壓縮.kafka支持gzip/snappy等多種壓縮方式.


? ? 3磁携、生產(chǎn)者

負(fù)載均衡: producer將會(huì)和Topic下所有partition leader保持socket連接;消息由producer直接通過(guò)socket發(fā)送到broker,中間不會(huì)經(jīng)過(guò)任何"路由層".事實(shí)上,消息被路由到哪個(gè)partition上,有producer客戶端決定.比如可以采用"random""key-hash""輪詢"等,如果一個(gè)topic中有多個(gè)partitions,那么在producer端實(shí)現(xiàn)"消息均衡分發(fā)"是必要的.


? ? 其中partition leader的位置(host:port)注冊(cè)在zookeeper中,producer作為zookeeper client,已經(jīng)注冊(cè)了watch用來(lái)監(jiān)聽(tīng)partition leader的變更事件.

? ? 異步發(fā)送:將多條消息暫且在客戶端buffer起來(lái)褒侧,并將他們批量的發(fā)送到broker,小數(shù)據(jù)IO太多谊迄,會(huì)拖慢整體的網(wǎng)絡(luò)延遲闷供,批量延遲發(fā)送事實(shí)上提升了網(wǎng)絡(luò)效率。不過(guò)這也有一定的隱患统诺,比如說(shuō)當(dāng)producer失效時(shí)歪脏,那些尚未發(fā)送的消息將會(huì)丟失。


? ? 4粮呢、消費(fèi)者

? ? consumer端向broker發(fā)送"fetch"請(qǐng)求,并告知其獲取消息的offset;此后consumer將會(huì)獲得一定條數(shù)的消息;consumer端也可以重置offset來(lái)重新消費(fèi)消息.


? ? 在JMS實(shí)現(xiàn)中,Topic模型基于push方式,即broker將消息推送給consumer端.不過(guò)在kafka中,采用了pull方式,即consumer在和broker建立連接之后,主動(dòng)去pull(或者說(shuō)fetch)消息;這中模式有些優(yōu)點(diǎn),首先consumer端可以根據(jù)自己的消費(fèi)能力適時(shí)的去fetch消息并處理,且可以控制消息消費(fèi)的進(jìn)度(offset);此外,消費(fèi)者可以良好的控制消息消費(fèi)的數(shù)量,batch fetch.


其他JMS實(shí)現(xiàn),消息消費(fèi)的位置是有prodiver保留,以便避免重復(fù)發(fā)送消息或者將沒(méi)有消費(fèi)成功的消息重發(fā)等,同時(shí)還要控制消息的狀態(tài).這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個(gè)consumer在消費(fèi),且不存在消息狀態(tài)的控制,也沒(méi)有復(fù)雜的消息確認(rèn)機(jī)制,可見(jiàn)kafka broker端是相當(dāng)輕量級(jí)的.當(dāng)消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper注冊(cè)offset.由此可見(jiàn),consumer客戶端也很輕量級(jí).


? ? 5婿失、消息傳送機(jī)制

? ? 對(duì)于JMS實(shí)現(xiàn),消息傳輸擔(dān)保非常直接:有且只有一次(exactly once).在kafka中稍有不同:

? ? 1) at most once: 最多一次,這個(gè)和JMS中"非持久化"消息類似.發(fā)送一次,無(wú)論成敗,將不會(huì)重發(fā).

? ? 2) at least once: 消息至少發(fā)送一次,如果消息未能接受成功,可能會(huì)重發(fā),直到接收成功.

? ? 3) exactly once: 消息只會(huì)發(fā)送一次.

? ? at most once: 消費(fèi)者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過(guò)程中出現(xiàn)了異常,導(dǎo)致部分消息未能繼續(xù)處理.那么此后"未處理"的消息將不能被fetch到,這就是"at most once".

? ? at least once: 消費(fèi)者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常導(dǎo)致保存操作未能執(zhí)行成功,這就導(dǎo)致接下來(lái)再次fetch時(shí)可能獲得上次已經(jīng)處理過(guò)的消息,這就是"at least once",原因offset沒(méi)有及時(shí)的提交給zookeeper啄寡,zookeeper恢復(fù)正常還是之前offset狀態(tài).

? ? exactly once: kafka中并沒(méi)有嚴(yán)格的去實(shí)現(xiàn)(基于2階段提交,事務(wù)),我們認(rèn)為這種策略在kafka中是沒(méi)有必要的.

? ? 通常情況下"at-least-once"是我們搜選.(相比at most once而言,重復(fù)接收數(shù)據(jù)總比丟失數(shù)據(jù)要好).


? ? 6豪硅、復(fù)制備份

? ? kafka將每個(gè)partition數(shù)據(jù)復(fù)制到多個(gè)server上,任何一個(gè)partition有一個(gè)leader和多個(gè)follower(可以沒(méi)有);備份的個(gè)數(shù)可以通過(guò)broker配置文件來(lái)設(shè)定.leader處理所有的read-write請(qǐng)求,follower需要和leader保持同步.Follower和consumer一樣,消費(fèi)消息并保存在本地日志中;leader負(fù)責(zé)跟蹤所有的follower狀態(tài),如果follower"落后"太多或者失效,leader將會(huì)把它從replicas同步列表中刪除.當(dāng)所有的follower都將一條消息保存成功,此消息才被認(rèn)為是"committed",那么此時(shí)consumer才能消費(fèi)它.即使只有一個(gè)replicas實(shí)例存活,仍然可以保證消息的正常發(fā)送和接收,只要zookeeper集群存活即可.(不同于其他分布式存儲(chǔ),比如hbase需要"多數(shù)派"存活才行)

當(dāng)leader失效時(shí),需在followers中選取出新的leader,可能此時(shí)follower落后于leader,因此需要選擇一個(gè)"up-to-date"的follower.選擇follower時(shí)需要兼顧一個(gè)問(wèn)題,就是新leaderserver上所已經(jīng)承載的partition leader的個(gè)數(shù),如果一個(gè)server上有過(guò)多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負(fù)載均衡".


? ? 7.日志

? ? 如果一個(gè)topic的名稱為"my_topic",它有2個(gè)partitions,那么日志將會(huì)保存在my_topic_0和my_topic_1兩個(gè)目錄中;日志文件中保存了一序列"log entries"(日志條目),每個(gè)log entry格式為"4個(gè)字節(jié)的數(shù)字N表示消息的長(zhǎng)度" + "N個(gè)字節(jié)的消息內(nèi)容";每個(gè)日志都有一個(gè)offset來(lái)唯一的標(biāo)記一條消息,offset的值為8個(gè)字節(jié)的數(shù)字,表示此消息在此partition中所處的起始位置..每個(gè)partition在物理存儲(chǔ)層面,有多個(gè)log file組成(稱為segment).segmentfile的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

? ? 其中每個(gè)partiton中所持有的segments列表信息會(huì)存儲(chǔ)在zookeeper中.

當(dāng)segment文件尺寸達(dá)到一定閥值時(shí)(可以通過(guò)配置文件設(shè)定,默認(rèn)1G),將會(huì)創(chuàng)建一個(gè)新的文件;當(dāng)buffer中消息的條數(shù)達(dá)到閥值時(shí)將會(huì)觸發(fā)日志信息flush到日志文件中,同時(shí)如果"距離最近一次flush的時(shí)間差"達(dá)到閥值時(shí),也會(huì)觸發(fā)flush到日志文件.如果broker失效,極有可能會(huì)丟失那些尚未flush到文件的消息.因?yàn)?a target="_blank" rel="nofollow">server意外實(shí)現(xiàn),仍然會(huì)導(dǎo)致log文件格式的破壞(文件尾部),那么就要求當(dāng)server啟東是需要檢測(cè)最后一個(gè)segment的文件結(jié)構(gòu)是否合法并進(jìn)行必要的修復(fù).

? ? 獲取消息時(shí),需要指定offset和最大chunk尺寸,offset用來(lái)表示消息的起始位置,chunk size用來(lái)表示最大獲取消息的總長(zhǎng)度(間接的表示消息的條數(shù)).根據(jù)offset,可以找到此消息所在segment文件,然后根據(jù)segment的最小offset取差值,得到它在file中的相對(duì)位置,直接讀取輸出即可.

? ? 日志文件的刪除策略非常簡(jiǎn)單:啟動(dòng)一個(gè)后臺(tái)線程定期掃描log file列表,把保存時(shí)間超過(guò)閥值的文件直接刪除(根據(jù)文件的創(chuàng)建時(shí)間).為了避免刪除文件時(shí)仍然有read操作(consumer消費(fèi)),采取copy-on-write方式.


? ? 8、分配

? ? kafka使用zookeeper來(lái)存儲(chǔ)一些meta信息,并使用了zookeeper watch機(jī)制來(lái)發(fā)現(xiàn)meta信息的變更并作出相應(yīng)的動(dòng)作(比如consumer失效,觸發(fā)負(fù)載均衡等)

? ? 1) Broker node registry: 當(dāng)一個(gè)kafkabroker啟動(dòng)后,首先會(huì)向zookeeper注冊(cè)自己的節(jié)點(diǎn)信息(臨時(shí)znode),同時(shí)當(dāng)broker和zookeeper斷開連接時(shí),此znode也會(huì)被刪除.

? ? 格式: /broker/ids/[0...N]? ?-->host:port;其中[0..N]表示broker id,每個(gè)broker的配置文件中都需要指定一個(gè)數(shù)字類型的id(全局不可重復(fù)),znode的值為此broker的host:port信息.

? ? 2) Broker Topic Registry: 當(dāng)一個(gè)broker啟動(dòng)時(shí),會(huì)向zookeeper注冊(cè)自己持有的topic和partitions信息,仍然是一個(gè)臨時(shí)znode.

? ? 格式: /broker/topics/[topic]/[0...N]??其中[0..N]表示partition索引號(hào).

3) Consumer and Consumer group: 每個(gè)consumer客戶端被創(chuàng)建時(shí),會(huì)向zookeeper注冊(cè)自己的信息;此作用主要是為了"負(fù)載均衡".

? ? 一個(gè)group中的多個(gè)consumer可以交錯(cuò)的消費(fèi)一個(gè)topic的所有partitions;簡(jiǎn)而言之,保證此topic的所有partitions都能被此group所消費(fèi),且消費(fèi)時(shí)為了性能考慮,讓partition相對(duì)均衡的分散到每個(gè)consumer上.

? ? 4) Consumer id Registry: 每個(gè)consumer都有一個(gè)唯一的ID(host:uuid,可以通過(guò)配置文件指定,也可以由系統(tǒng)生成),此id用來(lái)標(biāo)記消費(fèi)者信息.

? ? 格式:/consumers/[group_id]/ids/[consumer_id]

? ? 仍然是一個(gè)臨時(shí)的znode,此節(jié)點(diǎn)的值為{"topic_name":#streams...},即表示此consumer目前所消費(fèi)的topic + partitions列表.

? ? 5) Consumer offset Tracking: 用來(lái)跟蹤每個(gè)consumer目前所消費(fèi)的partition中最大的offset.

? ? 格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value

? ? 此znode為持久節(jié)點(diǎn),可以看出offset跟group_id有關(guān),以表明當(dāng)group中一個(gè)消費(fèi)者失效,其他consumer可以繼續(xù)消費(fèi).

? ? 6) Partition Owner registry: 用來(lái)標(biāo)記partition被哪個(gè)consumer消費(fèi).臨時(shí)znode

? ? 格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id當(dāng)consumer啟動(dòng)時(shí),所觸發(fā)的操作:

? ? A) 首先進(jìn)行"Consumer id Registry";

? ? B) 然后在"Consumer id Registry"節(jié)點(diǎn)下注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)當(dāng)前group中其他consumer的"leave"和"join";只要此znode path下節(jié)點(diǎn)列表變更,都會(huì)觸發(fā)此group下consumer的負(fù)載均衡.(比如一個(gè)consumer失效,那么其他consumer接管partitions).

? ? C) 在"Broker id registry"節(jié)點(diǎn)下,注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)broker的存活情況;如果broker列表變更,將會(huì)觸發(fā)所有的groups下的consumer重新balance.

? ? 1) Producer端使用zookeeper用來(lái)"發(fā)現(xiàn)"broker列表,以及和Topic下每個(gè)partition leader建立socket連接并發(fā)送消息.

? ? 2) Broker端使用zookeeper用來(lái)注冊(cè)broker信息,已經(jīng)監(jiān)測(cè)partitionleader存活性.

? ? 3) Consumer端使用zookeeper用來(lái)注冊(cè)consumer信息,其中包括consumer消費(fèi)的partition列表等,同時(shí)也用來(lái)發(fā)現(xiàn)broker列表,并和partition leader建立socket連接,并獲取消息.


四挺物、主要配置


? ? 1懒浮、Broker配置



? ? 2.Consumer主要配置



3.Producer主要配置




以上是關(guān)于kafka一些基礎(chǔ)說(shuō)明,在其中我們知道如果要kafka正常運(yùn)行识藤,必須配置zookeeper砚著,否則無(wú)論是kafka集群還是客戶端的生存者和消費(fèi)者都無(wú)法正常的工作的次伶,以下是對(duì)zookeeper進(jìn)行一些簡(jiǎn)單的介紹:


五、zookeeper集群

zookeeper是一個(gè)為分布式應(yīng)用提供一致性服務(wù)的軟件稽穆,它是開源的Hadoop項(xiàng)目的一個(gè)子項(xiàng)目冠王,并根據(jù)google發(fā)表的一篇論文來(lái)實(shí)現(xiàn)的。zookeeper為分布式系統(tǒng)提供了高笑且易于使用的協(xié)同服務(wù)秧骑,它可以為分布式應(yīng)用提供相當(dāng)多的服務(wù)版确,諸如統(tǒng)一命名服務(wù),配置管理乎折,狀態(tài)同步和組服務(wù)等绒疗。zookeeper接口簡(jiǎn)單,我們不必過(guò)多地糾結(jié)在分布式系統(tǒng)編程難于處理的同步和一致性問(wèn)題上骂澄,你可以使用zookeeper提供的現(xiàn)成(off-the-shelf)服務(wù)來(lái)實(shí)現(xiàn)來(lái)實(shí)現(xiàn)分布式系統(tǒng)額配置管理吓蘑,組管理,Leader選舉等功能坟冲。

? ? zookeeper集群的安裝,準(zhǔn)備三臺(tái)服務(wù)器server1:192.168.0.1,server2:192.168.0.2,

? ? server3:192.168.0.3.

? ? 1)下載zookeeper

http://zookeeper.apache.org/releases.html去下載最新版本Zookeeper-3.4.5的安裝包zookeeper-3.4.5.tar.gz.將文件保存server1的~目錄下

? ? 2)安裝zookeeper

先在服務(wù)器server分別執(zhí)行a-c步驟

? ? a)解壓??

? ? tar -zxvf zookeeper-3.4.5.tar.gz

? ? 解壓完成后在目錄~下會(huì)發(fā)現(xiàn)多出一個(gè)目錄zookeeper-3.4.5,重新命令為zookeeper

? ? b)配置

? ? 將conf/zoo_sample.cfg拷貝一份命名為zoo.cfg磨镶,也放在conf目錄下。然后按照如下值修改其中的配置:


? ? # The number of milliseconds of each tick

? ? tickTime=2000

? ? # The number of ticks that the initial

? ? # synchronization phase can take

? ? initLimit=10

? ? # The number of ticks that can pass between

? ? # sending a request and getting an acknowledgement

? ? syncLimit=5

? ? # the directory where the snapshot is stored.

? ? # do not use /tmp for storage, /tmp here is just

? ? # example sakes.

? ? dataDir=/home/wwb/zookeeper /data

? ? dataLogDir=/home/wwb/zookeeper/logs

? ? # the port at which the clients will connect

? ? clientPort=2181

? ? #

? ? # Be sure to read the maintenance section of the

? ? # administrator guide before turning on autopurge.

#http://zookeeper.apache.org/doc/ ... html#sc_maintenance

? ? #

? ? # The number of snapshots to retain in dataDir

? ? #autopurge.snapRetainCount=3

? ? # Purge task interval in hours

? ? # Set to "0" to disable auto purge feature

? ? #autopurge.purgeInterval=1

? ? server.1=192.168.0.1:3888:4888

? ? server.2=192.168.0.2:3888:4888

server.3=192.168.0.3:3888:4888

? ? tickTime:這個(gè)時(shí)間是作為 Zookeeper 服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時(shí)間間隔健提,也就是每個(gè) tickTime 時(shí)間就會(huì)發(fā)送一個(gè)心跳琳猫。

? ? dataDir:顧名思義就是 Zookeeper 保存數(shù)據(jù)的目錄,默認(rèn)情況下私痹,Zookeeper 將寫數(shù)據(jù)的日志文件也保存在這個(gè)目錄里脐嫂。

? ? clientPort:這個(gè)端口就是客戶端連接 Zookeeper 服務(wù)器的端口,Zookeeper 會(huì)監(jiān)聽(tīng)這個(gè)端口紊遵,接受客戶端的訪問(wèn)請(qǐng)求账千。

initLimit:這個(gè)配置項(xiàng)是用來(lái)配置 Zookeeper 接受客戶端(這里所說(shuō)的客戶端不是用戶連接 Zookeeper 服務(wù)器的客戶端,而是 Zookeeper 服務(wù)器集群中連接到 Leader 的 Follower 服務(wù)器)初始化連接時(shí)最長(zhǎng)能忍受多少個(gè)心跳時(shí)間間隔數(shù)暗膜。當(dāng)已經(jīng)超過(guò) 5個(gè)心跳的時(shí)間(也就是 tickTime)長(zhǎng)度后 Zookeeper 服務(wù)器還沒(méi)有收到客戶端的返回信息匀奏,那么表明這個(gè)客戶端連接失敗⊙眩總的時(shí)間長(zhǎng)度就是 5*2000=10 秒

? ? syncLimit:這個(gè)配置項(xiàng)標(biāo)識(shí) Leader 與Follower 之間發(fā)送消息娃善,請(qǐng)求和應(yīng)答時(shí)間長(zhǎng)度,最長(zhǎng)不能超過(guò)多少個(gè) tickTime 的時(shí)間長(zhǎng)度瑞佩,總的時(shí)間長(zhǎng)度就是2*2000=4 秒

? ? server.A=B:C:D:其中 A 是一個(gè)數(shù)字会放,表示這個(gè)是第幾號(hào)服務(wù)器;B 是這個(gè)服務(wù)器的 ip 地址钉凌;C 表示的是這個(gè)服務(wù)器與集群中的 Leader 服務(wù)器交換信息的端口;D 表示的是萬(wàn)一集群中的 Leader 服務(wù)器掛了捂人,需要一個(gè)端口來(lái)重新進(jìn)行選舉御雕,選出一個(gè)新的 Leader矢沿,而這個(gè)端口就是用來(lái)執(zhí)行選舉時(shí)服務(wù)器相互通信的端口。如果是偽集群的配置方式酸纲,由于 B 都是一樣捣鲸,所以不同的 Zookeeper 實(shí)例通信端口號(hào)不能一樣,所以要給它們分配不同的端口號(hào)

注意:dataDir,dataLogDir中的wwb是當(dāng)前登錄用戶名闽坡,data栽惶,logs目錄開始是不存在,需要使用mkdir命令創(chuàng)建相應(yīng)的目錄疾嗅。并且在該目錄下創(chuàng)建文件myid,serve1,server2,server3該文件內(nèi)容分別為1,2,3外厂。

針對(duì)服務(wù)器server2,server3可以將server1復(fù)制到相應(yīng)的目錄,不過(guò)需要注意dataDir,dataLogDir目錄,并且文件myid內(nèi)容分別為2,3

3)依次啟動(dòng)server1代承,server2,server3的zookeeper.

? ? /home/wwb/zookeeper/bin/zkServer.sh start,出現(xiàn)類似以下內(nèi)容

? ? JMX enabled by default

? ? Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg

? ? Starting zookeeper ... STARTED

? ?4) 測(cè)試zookeeper是否正常工作汁蝶,在server1上執(zhí)行以下命令

? ? /home/wwb/zookeeper/bin/zkCli.sh -server192.168.0.2:2181,出現(xiàn)類似以下內(nèi)容

? ? JLine support is enabled

2013-11-27 19:59:40,560 - INFO? ?? ?[main-SendThread(localhost.localdomain:2181):ClientCnxn$SendThread@736]- Session? ?establishmentcomplete onserverlocalhost.localdomain/127.0.0.1:2181, sessionid =? ? 0x1429cdb49220000, negotiatedtimeout = 30000


? ? WATCHER::


? ? WatchedEvent state:SyncConnected type:None path:null

? ? [zk: 127.0.0.1:2181(CONNECTED) 0] [root@localhostzookeeper2]#??

? ? 即代表集群構(gòu)建成功了,如果出現(xiàn)錯(cuò)誤那應(yīng)該是第三部時(shí)沒(méi)有啟動(dòng)好集群,

運(yùn)行论悴,先利用

? ? ps aux | grep zookeeper查看是否有相應(yīng)的進(jìn)程的掖棉,沒(méi)有話,說(shuō)明集群?jiǎn)?dòng)出現(xiàn)問(wèn)題膀估,可以在每個(gè)服務(wù)器上使用

? ? ./home/wwb/zookeeper/bin/zkServer.sh stop幔亥。再依次使用./home/wwb/zookeeper/binzkServer.sh start,這時(shí)在執(zhí)行4一般是沒(méi)有問(wèn)題察纯,如果還是有問(wèn)題帕棉,那么先stop再到bin的上級(jí)目錄執(zhí)行./bin/zkServer.shstart試試。


注意:zookeeper集群時(shí),zookeeper要求半數(shù)以上的機(jī)器可用畜疾,zookeeper才能提供服務(wù)捆憎。


六、kafka集群

(利用上面server1,server2,server3,下面以server1為實(shí)例)

1)下載kafka0.8(http://kafka.apache.org/downloads.html),保存到服務(wù)器/home/wwb目錄下kafka-0.8.0-beta1-src.tgz(kafka_2.8.0-0.8.0-beta1.tgz)

? ? 2)解壓 tar -zxvf kafka-0.8.0-beta1-src.tgz,產(chǎn)生文件夾kafka-0.8.0-beta1-src更改為kafka01? ?

3)配置

修改kafka01/config/server.properties,其中broker.id,log.dirs,zookeeper.connect必須根據(jù)實(shí)際情況進(jìn)行修改瞒窒,其他項(xiàng)根據(jù)需要自行斟酌。大致如下:

? ???broker.id=1??

? ???port=9091??

? ???num.network.threads=2??

? ???num.io.threads=2??

? ???socket.send.buffer.bytes=1048576??

? ? socket.receive.buffer.bytes=1048576??

? ???socket.request.max.bytes=104857600??

? ? log.dir=./logs??

? ? num.partitions=2??

? ? log.flush.interval.messages=10000??

? ? log.flush.interval.ms=1000??

? ? log.retention.hours=168??

? ? #log.retention.bytes=1073741824??

? ? log.segment.bytes=536870912??

? ? num.replica.fetchers=2??

? ? log.cleanup.interval.mins=10??

? ? zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183??

? ? zookeeper.connection.timeout.ms=1000000??

? ? kafka.metrics.polling.interval.secs=5??

? ? kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter??

? ? kafka.csv.metrics.dir=/tmp/kafka_metrics??

? ? kafka.csv.metrics.reporter.enabled=false


4)初始化因?yàn)閗afka用scala語(yǔ)言編寫乡洼,因此運(yùn)行kafka需要首先準(zhǔn)備scala相關(guān)環(huán)境崇裁。

? ? > cd kafka01??

? ? > ./sbt update??

? ? > ./sbt package??

? ? > ./sbt assembly-package-dependency

在第二個(gè)命令時(shí)可能需要一定時(shí)間,由于要下載更新一些依賴包束昵。所以請(qǐng)大家 耐心點(diǎn)拔稳。

5) 啟動(dòng)kafka01

>JMX_PORT=9997 bin/kafka-server-start.sh config/server.properties &

a)kafka02操作步驟與kafka01雷同,不同的地方如下

? ? 修改kafka02/config/server.properties

? ? broker.id=2

? ? port=9092

? ? ##其他配置和kafka-0保持一致

? ? 啟動(dòng)kafka02

? ? JMX_PORT=9998 bin/kafka-server-start.shconfig/server.properties &??

b)kafka03操作步驟與kafka01雷同锹雏,不同的地方如下

? ? 修改kafka03/config/server.properties

? ? broker.id=3

? ? port=9093

? ? ##其他配置和kafka-0保持一致

? ? 啟動(dòng)kafka02

JMX_PORT=9999 bin/kafka-server-start.shconfig/server.properties &

6)創(chuàng)建Topic(包含一個(gè)分區(qū)巴比,三個(gè)副本)

? ? >bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic

7)查看topic情況

? ? >bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181

? ? topic: my-replicated-topic??partition: 0 leader: 1??replicas: 1,2,0??isr: 1,2,0

8)創(chuàng)建發(fā)送者

? ?>bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic

? ? my test message1

? ? my test message2

? ? ^C

9)創(chuàng)建消費(fèi)者

? ? >bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic

? ? ...

? ? my test message1

? ? my test message2

^C

10)殺掉server1上的broker

>pkill -9 -f config/server.properties

11)查看topic

??>bin/kafka-list-top.sh --zookeeper192.168.0.1:2181

??topic: my-replicated-topic??partition: 0 leader: 1??replicas: 1,2,0??isr: 1,2,0

發(fā)現(xiàn)topic還正常的存在

11)創(chuàng)建消費(fèi)者,看是否能查詢到消息

? ? >bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic

? ? ...

? ? my test message 1

? ? my test message 2

? ? ^C

說(shuō)明一切都是正常的。


OK,以上就是對(duì)Kafka個(gè)人的理解轻绞,不對(duì)之處請(qǐng)大家及時(shí)指出采记。



補(bǔ)充說(shuō)明:

1、public Map>> createMessageStreams(Map topicCountMap)政勃,其中該方法的參數(shù)Map的key為topic名稱唧龄,value為topic對(duì)應(yīng)的分區(qū)數(shù),譬如說(shuō)如果在kafka中不存在相應(yīng)的topic時(shí)奸远,則會(huì)創(chuàng)建一個(gè)topic既棺,分區(qū)數(shù)為value,如果存在的話懒叛,該處的value則不起什么作用


2丸冕、關(guān)于生產(chǎn)者向指定的分區(qū)發(fā)送數(shù)據(jù),通過(guò)設(shè)置partitioner.class的屬性來(lái)指定向那個(gè)分區(qū)發(fā)送數(shù)據(jù)芍瑞,如果自己指定必須編寫相應(yīng)的程序晨仑,默認(rèn)是kafka.producer.DefaultPartitioner,分區(qū)程序是基于散列的鍵。


3拆檬、在多個(gè)消費(fèi)者讀取同一個(gè)topic的數(shù)據(jù)洪己,為了保證每個(gè)消費(fèi)者讀取數(shù)據(jù)的唯一性,必須將這些消費(fèi)者group_id定義為同一個(gè)值竟贯,這樣就構(gòu)建了一個(gè)類似隊(duì)列的數(shù)據(jù)結(jié)構(gòu)答捕,如果定義不同,則類似一種廣播結(jié)構(gòu)的屑那。


4拱镐、在consumerapi中,參數(shù)設(shè)計(jì)到數(shù)字部分持际,類似Map,

numStream,指的都是在topic不存在的時(shí)沃琅,會(huì)創(chuàng)建一個(gè)topic,并且分區(qū)個(gè)數(shù)為Integer,numStream,注意如果數(shù)字大于broker的配置中num.partitions屬性蜘欲,會(huì)以num.partitions為依據(jù)創(chuàng)建分區(qū)個(gè)數(shù)的益眉。


5、producerapi姥份,調(diào)用send時(shí)郭脂,如果不存在topic,也會(huì)創(chuàng)建topic澈歉,在該方法中沒(méi)有提供分區(qū)個(gè)數(shù)的參數(shù)展鸡,在這里分區(qū)個(gè)數(shù)是由服務(wù)端broker的配置中num.partitions屬性決定的


環(huán)境出現(xiàn)問(wèn)題:

由于項(xiàng)目當(dāng)中運(yùn)用到相關(guān)kafka大量消息發(fā)送,但是在消息發(fā)送的時(shí)候總是有消息丟失的情況

問(wèn)題分析:

1.kafka服務(wù)器問(wèn)題

2. kafka producter 發(fā)送問(wèn)題

3. 配置問(wèn)題

4. 消息長(zhǎng)度問(wèn)題

5. topic參數(shù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末埃难,一起剝皮案震驚了整個(gè)濱河市莹弊,隨后出現(xiàn)的幾起案子涤久,更是在濱河造成了極大的恐慌,老刑警劉巖箱硕,帶你破解...
    沈念sama閱讀 216,324評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拴竹,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡剧罩,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門座泳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)惠昔,“玉大人,你說(shuō)我怎么就攤上這事挑势≌蚍溃” “怎么了?”我有些...
    開封第一講書人閱讀 162,328評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵潮饱,是天一觀的道長(zhǎng)来氧。 經(jīng)常有香客問(wèn)我,道長(zhǎng)香拉,這世上最難降的妖魔是什么啦扬? 我笑而不...
    開封第一講書人閱讀 58,147評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮凫碌,結(jié)果婚禮上扑毡,老公的妹妹穿的比我還像新娘。我一直安慰自己盛险,他們只是感情好瞄摊,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著苦掘,像睡著了一般换帜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上鹤啡,一...
    開封第一講書人閱讀 51,115評(píng)論 1 296
  • 那天惯驼,我揣著相機(jī)與錄音,去河邊找鬼揉忘。 笑死跳座,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的泣矛。 我是一名探鬼主播疲眷,決...
    沈念sama閱讀 40,025評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼您朽!你這毒婦竟也來(lái)了狂丝?” 一聲冷哼從身側(cè)響起换淆,我...
    開封第一講書人閱讀 38,867評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎几颜,沒(méi)想到半個(gè)月后倍试,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,307評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蛋哭,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評(píng)論 2 332
  • 正文 我和宋清朗相戀三年县习,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谆趾。...
    茶點(diǎn)故事閱讀 39,688評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡躁愿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出沪蓬,到底是詐尸還是另有隱情彤钟,我是刑警寧澤,帶...
    沈念sama閱讀 35,409評(píng)論 5 343
  • 正文 年R本政府宣布跷叉,位于F島的核電站逸雹,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏云挟。R本人自食惡果不足惜梆砸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望植锉。 院中可真熱鬧辫樱,春花似錦、人聲如沸俊庇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)辉饱。三九已至搬男,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間彭沼,已是汗流浹背缔逛。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留姓惑,地道東北人褐奴。 一個(gè)月前我還...
    沈念sama閱讀 47,685評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像于毙,于是被迫代替她去往敵國(guó)和親敦冬。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容