Kafka源碼分析-序列2 -Producer -Metadata的數(shù)據(jù)結(jié)構(gòu)與讀取、更新策略

在上一篇盖腕,我們從使用方式和策略上赫冬,對(duì)消息隊(duì)列做了一個(gè)宏觀描述浓镜。從本篇開始,我們將深入到源碼內(nèi)部劲厌,仔細(xì)分析Kafka到底是如何實(shí)現(xiàn)一個(gè)分布式消息隊(duì)列竖哩。我們的分析將從Producer端開始。

從Kafka 0.8.2開始脊僚,發(fā)布了一套新的Java版的client api, KafkaProducer/KafkaConsumer相叁,替代之前的scala版的api。本系列的分析將只針對(duì)這套Java版的api辽幌。

多線程異步發(fā)送模型

下圖是經(jīng)過源碼分析之后增淹,整理出來的Producer端的架構(gòu)圖:


1.png

在上一篇我們講過,Producer有同步發(fā)送和異步發(fā)送2種策略乌企。在以前的Kafka client api實(shí)現(xiàn)中虑润,同步和異步是分開實(shí)現(xiàn)的。而在0.9中加酵,同步發(fā)送其實(shí)是通過異步發(fā)送間接實(shí)現(xiàn)拳喻,其接口如下:

public class KafkaProducer<K, V> implements Producer<K, V> {  
...  
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)  //異步發(fā)送接口  
     {  
     ...  
     }  
}

要實(shí)現(xiàn)同步發(fā)送,只要在拿到返回的Future對(duì)象之后猪腕,直接調(diào)用get()就可以了冗澈。

基本思路

從上圖我們可以看出,異步發(fā)送的基本思路就是:send的時(shí)候陋葡,KafkaProducer把消息放到本地的消息隊(duì)列RecordAccumulator亚亲,然后一個(gè)后臺(tái)線程Sender不斷循環(huán),把消息發(fā)給Kafka集群腐缤。

要實(shí)現(xiàn)這個(gè)捌归,還得有一個(gè)前提條件:就是KafkaProducer/Sender都需要獲取集群的配置信息Metadata。所謂Metadata岭粤,也就是在上一篇所講的惜索,Topic/Partion與broker的映射關(guān)系:每一個(gè)Topic的每一個(gè)Partion,得知道其對(duì)應(yīng)的broker列表是什么剃浇,其中l(wèi)eader是誰巾兆,follower是誰。

2個(gè)數(shù)據(jù)流

所以在上圖中偿渡,有2個(gè)數(shù)據(jù)流:
Metadata流(A1,A2,A3):Sender從集群獲取信息臼寄,然后更新Metadata; KafkaProducer先讀取Metadata溜宽,然后把消息放入隊(duì)列。

消息流(B1, B2, B3):這個(gè)很好理解质帅,不再詳述适揉。

本篇著重講述Metadata流留攒,消息流,將在后續(xù)詳細(xì)講述嫉嘀。

Metadata的線程安全性

從上圖可以看出炼邀,Metadata是多個(gè)producer線程讀,一個(gè)sender線程更新剪侮,因此它必須是線程安全的拭宁。

Kafka的官方文檔上也有說明,KafkaProducer是線程安全的瓣俯,可以在多線程中調(diào)用:

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.

從下面代碼也可以看出杰标,它的所有public方法都是synchronized:

public final class Metadata {  
  。彩匕。腔剂。  
    public synchronized Cluster fetch() {  
        return this.cluster;  
    }  
    public synchronized long timeToNextUpdate(long nowMs) {  
       。驼仪。掸犬。  
    }  
    public synchronized int requestUpdate() {  
      。绪爸。湾碎。  
    }  
    。奠货。胜茧。      
}

Metadata的數(shù)據(jù)結(jié)構(gòu)

下面代碼列舉了Metadata的主要數(shù)據(jù)結(jié)構(gòu):一個(gè)Cluster對(duì)象 + 1堆狀態(tài)變量。前者記錄了集群的配置信息仇味,后者用于控制Metadata的更新策略呻顽。

public final class Metadata {  
...  
    private final long refreshBackoffMs;  //更新失敗的情況下,下1次更新的補(bǔ)償時(shí)間(這個(gè)變量在代碼中意義不是太大)  
    private final long metadataExpireMs; //關(guān)鍵值:每隔多久丹墨,更新一次廊遍。缺省是600*1000,也就是10分種  
    private int version;         //每更新成功1次贩挣,version遞增1喉前。這個(gè)變量主要用于在while循環(huán),wait的時(shí)候王财,作為循環(huán)判斷條件  
    private long lastRefreshMs;  //上一次更新時(shí)間(也包含更新失敗的情況)  
    private long lastSuccessfulRefreshMs; //上一次成功更新的時(shí)間(如果每次都成功的話卵迂,則2者相等。否則绒净,lastSuccessulRefreshMs < lastRefreshMs)  
    private Cluster cluster;   //集群配置信息  
    private boolean needUpdate;  //是否強(qiáng)制刷新  
见咒、  
  ...  
}  
  
public final class Cluster {  
...  
    private final List<Node> nodes;   //Node也就是Broker  
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;  //Topic/Partion和broker list的映射關(guān)系  
    private final Map<String, List<PartitionInfo>> partitionsByTopic;  
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;  
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;  
    private final Map<Integer, Node> nodesById;  
}  
  
public class PartitionInfo {  
    private final String topic;  
    private final int partition;  
    private final Node leader;  
    private final Node[] replicas;  
    private final Node[] inSyncReplicas;  
}

producer讀取Metadata

下面是send函數(shù)的源碼,可以看到挂疆,在send之前改览,會(huì)先讀取metadata下翎。如果metadata讀不到,會(huì)一直阻塞在那宝当,直到超時(shí)视事,拋出TimeoutException

//KafkaProducer  
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {  
        try {  
     long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);  //拿不到topic的配置信息,會(huì)一直阻塞在這庆揩,直到拋異常  
  
     ... //拿到了俐东,執(zhí)行下面的send邏輯  
     } catch()  
     {}  
 }  
  
//KafkaProducer  
    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {  
        if (!this.metadata.containsTopic(topic))  
            this.metadata.add(topic);  
  
        if (metadata.fetch().partitionsForTopic(topic) != null)  
            return 0;   //取到topic的配置信息,直接返回  
  
        long begin = time.milliseconds();  
        long remainingWaitMs = maxWaitMs;  
        while (metadata.fetch().partitionsForTopic(topic) == null) { //取不到topic的配置信息订晌,一直死循環(huán)wait虏辫,直到超時(shí),拋TimeoutException  
            log.trace("Requesting metadata update for topic {}.", topic);  
            int version = metadata.requestUpdate(); //把needUpdate置為true  
            sender.wakeup(); //喚起sender  
  
            metadata.awaitUpdate(version, remainingWaitMs); //metadata的關(guān)鍵函數(shù)  
            long elapsed = time.milliseconds() - begin;  
            if (elapsed >= maxWaitMs)  
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");  
            if (metadata.fetch().unauthorizedTopics().contains(topic))  
                throw new TopicAuthorizationException(topic);  
            remainingWaitMs = maxWaitMs - elapsed;  
        }  
        return time.milliseconds() - begin;  
    }  
  
//Metadata  
    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {  
        if (maxWaitMs < 0) {  
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");  
        }  
        long begin = System.currentTimeMillis();  
        long remainingWaitMs = maxWaitMs;  
        while (this.version <= lastVersion) {  //當(dāng)Sender成功更新meatadata之后腾仅,version加1乒裆。否則會(huì)循環(huán),一直wait  
            if (remainingWaitMs != 0  
                wait(remainingWaitMs);  //線程的wait機(jī)制推励,wait和synchronized的配合使用  
            long elapsed = System.currentTimeMillis() - begin;  
            if (elapsed >= maxWaitMs)  //wait時(shí)間超出了最長等待時(shí)間  
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");  
            remainingWaitMs = maxWaitMs - elapsed;  
        }  
    }

總結(jié):從上面代碼可以看出鹤耍,producer wait metadata的時(shí)候,有2個(gè)條件:
(1) while (metadata.fetch().partitionsForTopic(topic) == null)
(2)while (this.version <= lastVersion)

有wait就會(huì)有notify验辞,notify在Sender更新Metadata的時(shí)候發(fā)出稿黄。

Sender的創(chuàng)建

下面是KafkaProducer的構(gòu)造函數(shù),從代碼可以看出跌造,Sender就是KafkaProducer中創(chuàng)建的一個(gè)Thread.

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {  
        try {  
        ...  
                    this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); //構(gòu)造metadata  
  
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); //往metadata中杆怕,填入初始的,配置的node列表  
  
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());  
  
            NetworkClient client = new NetworkClient(  
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),  
                    this.metadata,  
                    clientId,  
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),  
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),  
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),  
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),  
  
  
            this.sender = new Sender(client,  //構(gòu)造一個(gè)sender壳贪。sender本身實(shí)現(xiàn)的是Runnable接口  
                    this.metadata,  
                    this.accumulator,  
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),  
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),  
                    config.getInt(ProducerConfig.RETRIES_CONFIG),  
                    this.metrics,  
                    new SystemTime(),  
                    clientId,  
                    this.requestTimeoutMs);  
  
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");  
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);  
            this.ioThread.start();  //一個(gè)線程陵珍,開啟sender

Sender poll()更新Metadata

    public void run() {  
        // main loop, runs until close is called  
        while (running) {  
            try {  
                run(time.milliseconds());  
            } catch (Exception e) {  
                log.error("Uncaught error in kafka producer I/O thread: ", e);  
            }  
        }  
       。违施。互纯。  
    }  
  
    public void run(long now) {  
        Cluster cluster = metadata.fetch();  
。磕蒲。留潦。  
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);   //遍歷消息隊(duì)列中所有的消息,找出對(duì)應(yīng)的辣往,已經(jīng)ready的Node  
  
        if (result.unknownLeadersExist)  //如果一個(gè)ready的node都沒有兔院,請(qǐng)求更新metadata  
            this.metadata.requestUpdate();  
  
  。站削。坊萝。  
  
     //client的2個(gè)關(guān)鍵函數(shù),一個(gè)發(fā)送ClientRequest,一個(gè)接收ClientResponse屹堰。底層調(diào)用的是NIO的poll肛冶。關(guān)于nio, 后面會(huì)詳細(xì)介紹  
        for (ClientRequest request : requests)  
            client.send(request, now);  
  
        this.client.poll(pollTimeout, now);  
    }  
  
//NetworkClient  
    public List<ClientResponse> poll(long timeout, long now) {  
        long metadataTimeout = metadataUpdater.maybeUpdate(now); //關(guān)鍵點(diǎn):每次poll的時(shí)候判斷是否要更新metadata  
  
        try {  
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));  
        } catch (IOException e) {  
            log.error("Unexpected error during I/O", e);  
        }  
  
        // process completed actions  
        long updatedNow = this.time.milliseconds();  
        List<ClientResponse> responses = new ArrayList<>();  
        handleCompletedSends(responses, updatedNow);  
        handleCompletedReceives(responses, updatedNow);   //在返回的handler中街氢,會(huì)處理metadata的更新  
        handleDisconnections(responses, updatedNow);  
        handleConnections();  
        handleTimedOutRequests(responses, updatedNow);  
  
        // invoke callbacks  
        for (ClientResponse response : responses) {  
            if (response.request().hasCallback()) {  
                try {  
                    response.request().callback().onComplete(response);  
                } catch (Exception e) {  
                    log.error("Uncaught error in request completion:", e);  
                }  
            }  
        }  
  
        return responses;  
    }  
  
 //DefaultMetadataUpdater  
         @Override  
        public long maybeUpdate(long now) {  
            // should we update our metadata?  
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);  
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);  
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;  
            // if there is no node available to connect, back off refreshing metadata  
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),  
                    waitForMetadataFetch);  
  
            if (metadataTimeout == 0) {  
                // highly dependent on the behavior of leastLoadedNode.  
                Node node = leastLoadedNode(now);  //找到負(fù)載最小的Node  
                maybeUpdate(now, node); //把更新Metadata的請(qǐng)求扯键,發(fā)給這個(gè)Node  
            }  
  
            return metadataTimeout;  
        }  
  
        private void maybeUpdate(long now, Node node) {  
            if (node == null) {  
                log.debug("Give up sending metadata request since no node is available");  
                // mark the timestamp for no node available to connect  
                this.lastNoNodeAvailableMs = now;  
                return;  
            }  
            String nodeConnectionId = node.idString();  
  
            if (canSendRequest(nodeConnectionId)) {  
                Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();  
                this.metadataFetchInProgress = true;  
                ClientRequest metadataRequest = request(now, nodeConnectionId, topics);  //關(guān)鍵點(diǎn):發(fā)送更新Metadata的Request  
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());  
                doSend(metadataRequest, now); //這里只是異步發(fā)送,返回的response在上面的handleCompletedReceives里面處理  
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {  
                log.debug("Initialize connection to node {} for sending metadata request", node.id());  
                initiateConnect(node, now);  
  
            } else { // connected, but can't send more OR connecting  
                this.lastNoNodeAvailableMs = now;  
            }  
        }  
  
     private void handleCompletedReceives(List<ClientResponse> responses, long now) {  
        for (NetworkReceive receive : this.selector.completedReceives()) {  
            String source = receive.source();  
            ClientRequest req = inFlightRequests.completeNext(source);  
            ResponseHeader header = ResponseHeader.parse(receive.payload());  
            // Always expect the response version id to be the same as the request version id  
            short apiKey = req.request().header().apiKey();  
            short apiVer = req.request().header().apiVersion();  
            Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());  
            correlate(req.request().header(), header);  
            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))  
                responses.add(new ClientResponse(req, now, false, body));  
        }  
    }  
  
  
        @Override  
        public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {  
            short apiKey = req.request().header().apiKey();  
            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {  
                handleResponse(req.request().header(), body, now);  
                return true;  
            }  
            return false;  
        }  
  
//關(guān)鍵函數(shù)  
        private void handleResponse(RequestHeader header, Struct body, long now) {  
            this.metadataFetchInProgress = false;  
            MetadataResponse response = new MetadataResponse(body);  
            Cluster cluster = response.cluster();   //從response中珊肃,拿到一個(gè)新的cluster對(duì)象  
            if (response.errors().size() > 0) {  
                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());  
            }  
  
            if (cluster.nodes().size() > 0) {  
                this.metadata.update(cluster, now);   //更新metadata荣刑,用新的cluster覆蓋舊的cluster  
            } else {  
                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());  
                this.metadata.failedUpdate(now);  //更新metadata失敗,做失敗處理邏輯  
            }  
        }  
  
  
//更新成功伦乔,version+1, 同時(shí)更新其它字段  
    public synchronized void update(Cluster cluster, long now) {  
        this.needUpdate = false;  
        this.lastRefreshMs = now;  
        this.lastSuccessfulRefreshMs = now;  
        this.version += 1;  
  
        for (Listener listener: listeners)  
            listener.onMetadataUpdate(cluster);  //如果有人監(jiān)聽了metadata的更新厉亏,通知他們  
  
        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;    //新的cluster覆蓋舊的cluster  
  
        notifyAll();  //通知所有的阻塞的producer線程  
  
        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);  
    }  
  
//更新失敗,只更新lastRefreshMs  
    public synchronized void failedUpdate(long now) {  
        this.lastRefreshMs = now;  
    }

從上面可以看出烈和,Metadata的更新爱只,是在while循環(huán),每次調(diào)用client.poll()的時(shí)候更新的招刹。

更新機(jī)制又有以下2種:

Metadata的2種更新機(jī)制

(1)周期性的更新: 每隔一段時(shí)間更新一次恬试,這個(gè)通過 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 這2個(gè)字段來實(shí)現(xiàn)

對(duì)應(yīng)的ProducerConfig配置項(xiàng)為:
metadata.max.age.ms //缺省300000,即10分鐘1次

(2) 失效檢測(cè)疯暑,強(qiáng)制更新:檢查到metadata失效以后训柴,調(diào)用metadata.requestUpdate()強(qiáng)制更新。 requestUpdate()函數(shù)里面其實(shí)什么都沒做妇拯,就是把needUpdate置成了false

每次poll的時(shí)候幻馁,都檢查這2種更新機(jī)制,達(dá)到了越锈,就觸發(fā)更新仗嗦。

那如何判定Metadata失效了呢?這個(gè)在代碼中很分散甘凭,有很多地方稀拐,會(huì)判定Metadata失效。

Metadata失效檢測(cè)

條件1:initConnect的時(shí)候

private void initiateConnect(Node node, long now) {  
    String nodeConnectionId = node.idString();  
    try {  
        log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());  
        this.connectionStates.connecting(nodeConnectionId, now);  
        selector.connect(nodeConnectionId,  
                         new InetSocketAddress(node.host(), node.port()),  
                         this.socketSendBuffer,  
                         this.socketReceiveBuffer);  
    } catch (IOException e) {  
        connectionStates.disconnected(nodeConnectionId, now);  
        metadataUpdater.requestUpdate(); //判定metadata失效  
        log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);  
    }  
}

條件2:poll里面IO的時(shí)候对蒲,連接斷掉了

private void handleDisconnections(List<ClientResponse> responses, long now) {  
    for (String node : this.selector.disconnected()) {  
        log.debug("Node {} disconnected.", node);  
        processDisconnection(responses, node, now);  
    }  
    if (this.selector.disconnected().size() > 0)  
        metadataUpdater.requestUpdate();  //判定metadata失效  
}

條件3:有請(qǐng)求超時(shí)

private void handleTimedOutRequests(List<ClientResponse> responses, long now) {  
    List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);  
    for (String nodeId : nodeIds) {  
        this.selector.close(nodeId);  
        log.debug("Disconnecting from node {} due to request timeout.", nodeId);  
        processDisconnection(responses, nodeId, now);  
    }  
  
    if (nodeIds.size() > 0)  
        metadataUpdater.requestUpdate();  //判定metadata失效  
}

條件4:發(fā)消息的時(shí)候钩蚊,有partition的leader沒找到

public void run(long now) {  
    Cluster cluster = metadata.fetch();  
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);  
  
    if (result.unknownLeadersExist)  
        this.metadata.requestUpdate();

條件5:返回的response和請(qǐng)求對(duì)不上的時(shí)候

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {  
    int correlationId = response.request().request().header().correlationId();  
    if (response.wasDisconnected()) {  
        log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()  
                                                                                              .request()  
                                                                                              .destination());  
        for (RecordBatch batch : batches.values())  
            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now);

總之1句話:發(fā)生各式各樣的異常,數(shù)據(jù)不同步蹈矮,都認(rèn)為metadata可能出問題了砰逻,要求更新。

Metadata其他的更新策略

除了上面所述泛鸟,Metadata的更新蝠咆,還有以下幾個(gè)特點(diǎn):

1.更新請(qǐng)求MetadataRequest是nio異步發(fā)送的,在poll的返回中,處理MetadataResponse的時(shí)候刚操,才真正更新Metadata闸翅。

這里有個(gè)關(guān)鍵點(diǎn):Metadata的cluster對(duì)象,每次是整個(gè)覆蓋的菊霜,而不是局部更新坚冀。所以cluster內(nèi)部不用加鎖。

2.更新的時(shí)候鉴逞,是從metadata保存的所有Node记某,或者說Broker中,選負(fù)載最小的那個(gè)构捡,也就是當(dāng)前接收請(qǐng)求最少的那個(gè)液南。向其發(fā)送MetadataRequest請(qǐng)求,獲取新的Cluster對(duì)象勾徽。

歡迎加入QQ群:104286694

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末滑凉,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子喘帚,更是在濱河造成了極大的恐慌畅姊,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,509評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件啥辨,死亡現(xiàn)場(chǎng)離奇詭異涡匀,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)溉知,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門陨瘩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人级乍,你說我怎么就攤上這事舌劳。” “怎么了玫荣?”我有些...
    開封第一講書人閱讀 163,875評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵甚淡,是天一觀的道長。 經(jīng)常有香客問我捅厂,道長贯卦,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,441評(píng)論 1 293
  • 正文 為了忘掉前任焙贷,我火速辦了婚禮撵割,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘辙芍。我一直安慰自己啡彬,他們只是感情好羹与,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,488評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著庶灿,像睡著了一般纵搁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上往踢,一...
    開封第一講書人閱讀 51,365評(píng)論 1 302
  • 那天腾誉,我揣著相機(jī)與錄音,去河邊找鬼菲语。 笑死妄辩,一個(gè)胖子當(dāng)著我的面吹牛惑灵,可吹牛的內(nèi)容都是我干的山上。 我是一名探鬼主播,決...
    沈念sama閱讀 40,190評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼英支,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼佩憾!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起干花,我...
    開封第一講書人閱讀 39,062評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤妄帘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后池凄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體抡驼,經(jīng)...
    沈念sama閱讀 45,500評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,706評(píng)論 3 335
  • 正文 我和宋清朗相戀三年肿仑,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了致盟。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,834評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡尤慰,死狀恐怖馏锡,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情伟端,我是刑警寧澤杯道,帶...
    沈念sama閱讀 35,559評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站责蝠,受9級(jí)特大地震影響党巾,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜霜医,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,167評(píng)論 3 328
  • 文/蒙蒙 一齿拂、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧支子,春花似錦创肥、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽巩搏。三九已至,卻和暖如春趾代,著一層夾襖步出監(jiān)牢的瞬間贯底,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評(píng)論 1 269
  • 我被黑心中介騙來泰國打工撒强, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留禽捆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,958評(píng)論 2 370
  • 正文 我出身青樓飘哨,卻偏偏與公主長得像胚想,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子芽隆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,779評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理浊服,服務(wù)發(fā)現(xiàn),斷路器胚吁,智...
    卡卡羅2017閱讀 134,654評(píng)論 18 139
  • kafka的定義:是一個(gè)分布式消息系統(tǒng)牙躺,由LinkedIn使用Scala編寫,用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,320評(píng)論 1 15
  • 轉(zhuǎn)帖:原文地址http://www.infoq.com/cn/articles/depth-interpretat...
    端木軒閱讀 2,324評(píng)論 0 19
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,469評(píng)論 0 34
  • 背景介紹 Kafka簡介 Kafka是一種分布式的腕扶,基于發(fā)布/訂閱的消息系統(tǒng)孽拷。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O...
    高廣超閱讀 12,833評(píng)論 8 167