Librdkafka對kafka topic的封裝

  • 上一節(jié)我們講了librdkakfa對topic-partition的封裝, 任何一個partition都必須要屬于一下topic;
  • 我們這節(jié)就來分析一上librdkafka對topic的封裝

rd_kafka_itopic_s
  • 所在文件: src/rdkafka_topic.h
  • 這里還有一個類型rd_kafka_topic_t,定義:typedef struct rd_kafka_topic_s rd_kafka_topic_t;,這是個空定義沒有現(xiàn)實(shí), 其實(shí)就是rd_kafka_itopic_s, 這個類型主要是面向librdkafka的使用者,sdk里稱作app topic, 它有自己的引用計數(shù). 在librdkafka內(nèi)部使用rd_kafka_itopic, 它也有自己的引用計數(shù), 有點(diǎn)羅嗦啊~
  • 定義:
struct rd_kafka_itopic_s {
        // 定義成tailq的元素
    TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;

        //  引用計數(shù)
    rd_refcnt_t        rkt_refcnt;

    rwlock_t           rkt_lock;

        // 所屬topic的名字
    rd_kafkap_str_t   *rkt_topic;

        // 表示一個未assigned的partition
    shptr_rd_kafka_toppar_t  *rkt_ua;  /* unassigned partition */

        // 擁有的partition列表
    shptr_rd_kafka_toppar_t **rkt_p;
    int32_t            rkt_partition_cnt;

        // 期望操作的partition, 但還沒有從broker獲取到其信息的partition列表
        rd_list_t          rkt_desp;              /* Desired partitions
                                                   * that are not yet seen
                                                   * in the cluster. */
        // 最近一次更新metadata的時間
    rd_ts_t            rkt_ts_metadata; /* Timestamp of last metadata
                         * update for this topic. */

        mtx_t              rkt_app_lock;    /* Protects rkt_app_* */
, 
       // 在application層一個rd_kafka_itopic_s對外表現(xiàn)為一個 rd_kafka_topic_t類型的對象, 且有自己的引用計數(shù)
        rd_kafka_topic_t *rkt_app_rkt;      /* A shared topic pointer
                                             * to be used for callbacks
                                             * to the application. */
    int               rkt_app_refcnt;   /* Number of active rkt's new()ed
                         * by application. */
        //  topic的三種狀態(tài):未知, 已存在, 不存在
    enum {
        RD_KAFKA_TOPIC_S_UNKNOWN,   /* No cluster information yet */
        RD_KAFKA_TOPIC_S_EXISTS,    /* Topic exists in cluster */
        RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
    } rkt_state;

        int               rkt_flags;
#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL   0x1 /* Leader lost/unavailable
                                               * for at least one partition. */
        // 所屬的rd_kafka_t
    rd_kafka_t       *rkt_rk;

        shptr_rd_kafka_itopic_t *rkt_shptr_app; /* Application's topic_new() */

    rd_kafka_topic_conf_t rkt_conf;
};
  • 創(chuàng)建一個rd_kafka_itopic_s對象rd_kafka_topic_new0, 這是一個內(nèi)部調(diào)用函數(shù)
shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
                                              const char *topic,
                                              rd_kafka_topic_conf_t *conf,
                                              int *existing,
                                              int do_lock) {
    rd_kafka_itopic_t *rkt;
        shptr_rd_kafka_itopic_t *s_rkt;
        const struct rd_kafka_metadata_cache_entry *rkmce;

       // topic名字check , 長度不能超512
    if (!topic || strlen(topic) > 512) {
        if (conf)
            rd_kafka_topic_conf_destroy(conf);
        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
                    EINVAL);
        return NULL;
    }

    if (do_lock)
                rd_kafka_wrlock(rk);
        // 所有創(chuàng)建的rd_kafka_itopic對象都會加入到對應(yīng)的topic的rk->rk_topics中, 先從中查找, 如果找到就不再創(chuàng)建
    if ((s_rkt = rd_kafka_topic_find(rk, topic, 0/*no lock*/))) {
                if (do_lock)
                        rd_kafka_wrunlock(rk);
        if (conf)
            rd_kafka_topic_conf_destroy(conf);
                if (existing)
                        *existing = 1;
        return s_rkt;
        }

        if (existing)
                *existing = 0;

        // 分配對應(yīng)的內(nèi)存, 設(shè)置各屬性
    rkt = rd_calloc(1, sizeof(*rkt));

    rkt->rkt_topic     = rd_kafkap_str_new(topic, -1);
    rkt->rkt_rk        = rk;

    if (!conf) {
                if (rk->rk_conf.topic_conf)
                        conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
                else
                        conf = rd_kafka_topic_conf_new();
        }
    rkt->rkt_conf = *conf;
    rd_free(conf); /* explicitly not rd_kafka_topic_destroy()
                        * since we dont want to rd_free internal members,
                        * just the placeholder. The internal members
                        * were copied on the line above. */

    /* Default partitioner: consistent_random */
    if (!rkt->rkt_conf.partitioner)
        rkt->rkt_conf.partitioner = rd_kafka_msg_partitioner_consistent_random;

    if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT)
        rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec;

        rd_list_init(&rkt->rkt_desp, 16, NULL);
        rd_refcnt_init(&rkt->rkt_refcnt, 0);

        s_rkt = rd_kafka_topic_keep(rkt);

    rwlock_init(&rkt->rkt_lock);
        mtx_init(&rkt->rkt_app_lock, mtx_plain);

    /* Create unassigned partition */
    rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA);

        // 加入到對應(yīng)的rk_kafka_t中的topic列表
    TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link);
    rk->rk_topic_cnt++;

        /* Populate from metadata cache. */
        // 加入或更新到metadata cache
        if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1/*valid*/))) {
                if (existing)
                        *existing = 1;

                rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic,
                                               rkmce->rkmce_ts_insert);
        }

        if (do_lock)
                rd_kafka_wrunlock(rk);

    return s_rkt;
  • 創(chuàng)建rd_kafka_topic_t對象, 對外的接口rd_kafka_topic_new
rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
                                      rd_kafka_topic_conf_t *conf) {
        shptr_rd_kafka_itopic_t *s_rkt;
        rd_kafka_itopic_t *rkt;
        rd_kafka_topic_t *app_rkt;
        int existing;

       // 創(chuàng)建一個`shptr_rd_kafka_itopic_t`對象
        s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1/*lock*/);
        if (!s_rkt)
                return NULL;

        // 指針轉(zhuǎn)換, 從`shptr_rd_kafka_itopic_t`到`rd_kafka_itopic_t`, 引用計數(shù)不變
        rkt = rd_kafka_topic_s2i(s_rkt);

        /* Save a shared pointer to be used in callbacks. */
        // 引數(shù)計數(shù)加1, 指針轉(zhuǎn)換成一個`rd_kafka_topic_t`
        // app相對應(yīng)的引用計數(shù)也加1
    app_rkt = rd_kafka_topic_keep_app(rkt);

        /* Query for the topic leader (async) */
        if (!existing)
               // 發(fā)metadata request, 獲取leader等相關(guān)信息
                rd_kafka_topic_leader_query(rk, rkt);

        /* Drop our reference since there is already/now a rkt_app_rkt */
        rd_kafka_topic_destroy0(s_rkt);

        return app_rkt;
}
  • 獲取當(dāng)前rd_kafka_t對象持有的所有topic的名字,保存在一個rd_list
void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) {
        rd_kafka_itopic_t *rkt;

        rd_kafka_rdlock(rk);
        rd_list_grow(topics, rk->rk_topic_cnt);
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link)
                rd_list_add(topics, rd_strdup(rkt->rkt_topic->str));
        rd_kafka_rdunlock(rk);
}
  • 判斷parition是否是有效的,就是判斷其leader是否有效
  int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
                    int32_t partition) {
    int avail;
    shptr_rd_kafka_toppar_t *s_rktp;
        rd_kafka_toppar_t *rktp;
        rd_kafka_broker_t *rkb;

    s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt),
                                     partition, 0/*no ua-on-miss*/);
    if (unlikely(!s_rktp))
        return 0;

        rktp = rd_kafka_toppar_s2i(s_rktp);
        rkb = rd_kafka_toppar_leader(rktp, 1/*proper broker*/);
        avail = rkb ? 1 : 0;
        if (rkb)
                rd_kafka_broker_destroy(rkb);
    rd_kafka_toppar_destroy(s_rktp);
    return avail;
}
  • 掃描所有topic的patitions:
    1. 篩出 kafka message過期的, 回調(diào)application層
    2. 找出需要刷新metadata的, 發(fā)送metadata request
int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) {
    rd_kafka_itopic_t *rkt;
    rd_kafka_toppar_t *rktp;
        shptr_rd_kafka_toppar_t *s_rktp;
    int totcnt = 0;
        rd_list_t query_topics;

        rd_list_init(&query_topics, 0, rd_free);

    rd_kafka_rdlock(rk);
    TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
        int p;
                int cnt = 0, tpcnt = 0;
                rd_kafka_msgq_t timedout;
                int query_this = 0;

                rd_kafka_msgq_init(&timedout);

        rd_kafka_topic_wrlock(rkt);

                /* Check if metadata information has timed out. */
               // metadata cache中沒有緩存,需要query metadata
                if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN &&
                    !rd_kafka_metadata_cache_topic_get(
                            rk, rkt->rkt_topic->str, 1/*only valid*/)) {
                        rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN);

                        query_this = 1;
                }

                /* Just need a read-lock from here on. */
                rd_kafka_topic_wrunlock(rkt);
                rd_kafka_topic_rdlock(rkt);

                if (rkt->rkt_partition_cnt == 0) {
                        query_this = 1;
                }

        for (p = RD_KAFKA_PARTITION_UA ;
             p < rkt->rkt_partition_cnt ; p++) {
            int did_tmout = 0;

            if (!(s_rktp = rd_kafka_toppar_get(rkt, p, 0)))
                continue;

                        rktp = rd_kafka_toppar_s2i(s_rktp);
            rd_kafka_toppar_lock(rktp);

                        /* Check that partition has a leader that is up,
                         * else add topic to query list. */
                       // partition leader無效時, 要request metadata
                        if (p != RD_KAFKA_PARTITION_UA &&
                            (!rktp->rktp_leader ||
                             rktp->rktp_leader->rkb_source ==
                             RD_KAFKA_INTERNAL ||
                             rd_kafka_broker_get_state(rktp->rktp_leader) <
                             RD_KAFKA_BROKER_STATE_UP)) {
                                query_this = 1;
                        }

            /* Scan toppar's message queues for timeouts */
            if (rd_kafka_msgq_age_scan(&rktp->rktp_xmit_msgq,
                           &timedout, now) > 0)
                did_tmout = 1;

            if (rd_kafka_msgq_age_scan(&rktp->rktp_msgq,
                           &timedout, now) > 0)
                did_tmout = 1;

            tpcnt += did_tmout;

            rd_kafka_toppar_unlock(rktp);
            rd_kafka_toppar_destroy(s_rktp);
        }

                rd_kafka_topic_rdunlock(rkt);

                if ((cnt = rd_atomic32_get(&timedout.rkmq_msg_cnt)) > 0) {
                        totcnt += cnt;
                        // kafka mesage過期, 則需要回調(diào)到application層
                        rd_kafka_dr_msgq(rkt, &timedout,
                                         RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
                }

                /* Need to re-query this topic's leader. */
                if (query_this &&
                    !rd_list_find(&query_topics, rkt->rkt_topic->str,
                                  (void *)strcmp))
                        rd_list_add(&query_topics,
                                    rd_strdup(rkt->rkt_topic->str));

        }
        rd_kafka_rdunlock(rk);

        if (!rd_list_empty(&query_topics))
                // 發(fā)送 metadata request
                rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics,
                                                 1/*force even if cached
                                                    * info exists*/,
                                                 "refresh unavailable topics");
        rd_list_destroy(&query_topics);

        return totcnt;
}
  • 更新topic的partition個數(shù), partition個數(shù)可能增加, 也可能減少rd_kafka_topic_partition_cnt_update, 簡單講:
    1. 新增的partition, 創(chuàng)建;
    2. 老的partition, 刪除;
static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
                        int32_t partition_cnt) {
    rd_kafka_t *rk = rkt->rkt_rk;
    shptr_rd_kafka_toppar_t **rktps;
    shptr_rd_kafka_toppar_t *rktp_ua;
        shptr_rd_kafka_toppar_t *s_rktp;
    rd_kafka_toppar_t *rktp;
    rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
    int32_t i;

        更新前后partition數(shù)量相同的話, 不作任何處理
    if (likely(rkt->rkt_partition_cnt == partition_cnt))
        return 0; /* No change in partition count */

    /* Create and assign new partition list */
       // 創(chuàng)建新的partition list, 分配內(nèi)存
    if (partition_cnt > 0)
        rktps = rd_calloc(partition_cnt, sizeof(*rktps));
    else
        rktps = NULL;

        // 如果新個數(shù)大于老個數(shù)
    for (i = 0 ; i < partition_cnt ; i++) {
                // 多出來的都是新擴(kuò)容的partition
        if (i >= rkt->rkt_partition_cnt) {
            /* New partition. Check if its in the list of
             * desired partitions first. */
                        // 檢查是否在desired partition 列表中
                        s_rktp = rd_kafka_toppar_desired_get(rkt, i);

                        rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL;
                        if (rktp) {
                                // 在desired partition 列表中,則移除它
                rd_kafka_toppar_lock(rktp);
                                rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_UNKNOWN;

                                /* Remove from desp list since the
                                 * partition is now known. */
                                rd_kafka_toppar_desired_unlink(rktp);
                                rd_kafka_toppar_unlock(rktp);
            } else
                s_rktp = rd_kafka_toppar_new(rkt, i);
                        // 賦值rktps[i]
            rktps[i] = s_rktp;
        } else {
                        // 如果是已經(jīng)存在的partition, 放到rktps[i], 并且作引用計數(shù)的增減
            /* Existing partition, grab our own reference. */
            rktps[i] = rd_kafka_toppar_keep(
                rd_kafka_toppar_s2i(rkt->rkt_p[i]));
            /* Loose previous ref */
            rd_kafka_toppar_destroy(rkt->rkt_p[i]);
        }
    }

    rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);

        /* Propagate notexist errors for desired partitions */
        // 掃描desired partition 列表中, 還余下的都是無主的, 集群中不存在的partition, 回調(diào)error
        RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
                rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp),
                                          RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
        }

    /* Remove excessive partitions */
        // 處理更新后的partition個數(shù)小于更新前的情況, 需要刪除一部分partition
    for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) {
        s_rktp = rkt->rkt_p[i];
                rktp = rd_kafka_toppar_s2i(s_rktp);
        rd_kafka_toppar_lock(rktp);

        if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) {
                        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
                                     "Topic %s [%"PRId32"] is desired "
                                     "but no longer known: "
                                     "moving back on desired list",
                                     rkt->rkt_topic->str, rktp->rktp_partition);
                        // 是DESIRED狀態(tài)的話, 再放回到desired列表
            rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
                        rd_kafka_toppar_desired_link(rktp);

                        if (!rd_kafka_terminating(rkt->rkt_rk))
                                rd_kafka_toppar_enq_error(
                                        rktp,
                                        RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
                        // 解除和broker的聯(lián)系, 實(shí)際上是關(guān)聯(lián)到內(nèi)部的UA broker
            rd_kafka_toppar_broker_delegate(rktp, NULL, 0);

        } else {
            /* Tell handling broker to let go of the toppar */
            rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
            rd_kafka_toppar_broker_leave_for_remove(rktp);
        }

        rd_kafka_toppar_unlock(rktp);

        rd_kafka_toppar_destroy(s_rktp);
    }


    if (rkt->rkt_p)
        rd_free(rkt->rkt_p);

    rkt->rkt_p = rktps;
    rkt->rkt_partition_cnt = partition_cnt;

    return 1;
}
  • 將在UA partition上待發(fā)送的kafka message重新分配到有效的patition上rd_kafka_topic_assign_uas:
static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
                                       rd_kafka_resp_err_t err) {
    rd_kafka_t *rk = rkt->rkt_rk;
    shptr_rd_kafka_toppar_t *s_rktp_ua;
        rd_kafka_toppar_t *rktp_ua;
    rd_kafka_msg_t *rkm, *tmp;
    rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas);
    rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed);
    int cnt;

    if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER)
        return;

        // 沒有UA partition,就直接返回了
    s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
    if (unlikely(!s_rktp_ua)) {
        return;
    }

        rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua);

        // 將ua partition上的msg移動到臨時隊列上
    rd_kafka_toppar_lock(rktp_ua);
    rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq);
    cnt = rd_atomic32_get(&uas.rkmq_msg_cnt);
    rd_kafka_toppar_unlock(rktp_ua);

    TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
        /* Fast-path for failing messages with forced partition */
               // 無效的msg放到failed 隊列
        if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
            rkm->rkm_partition >= rkt->rkt_partition_cnt &&
            rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) {
            rd_kafka_msgq_enq(&failed, rkm);
            continue;
        }
                
               // 重新路由kafka message到相應(yīng)的partition, 失敗則放入failed 隊列
        if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
            /* Desired partition not available */
            rd_kafka_msgq_enq(&failed, rkm);
        }
    }
       
        // 失敗的msg, 都回調(diào)給application層
    if (rd_atomic32_get(&failed.rkmq_msg_cnt) > 0) {
        /* Fail the messages */
        rd_kafka_dr_msgq(rkt, &failed,
                 rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ?
                 err :
                 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
    }

    rd_kafka_toppar_destroy(s_rktp_ua); /* from get() */
}
  • 關(guān)于metadata相關(guān)的操作, 我們介紹metadata時再來分析

Librdkafka源碼分析-Content Table

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末猫缭,一起剝皮案震驚了整個濱河市机久,隨后出現(xiàn)的幾起案子冀瓦,更是在濱河造成了極大的恐慌缨伊,老刑警劉巖页响,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件竞川,死亡現(xiàn)場離奇詭異,居然都是意外死亡呼股,警方通過查閱死者的電腦和手機(jī)旱眯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進(jìn)店門晨川,熙熙樓的掌柜王于貴愁眉苦臉地迎上來证九,“玉大人,你說我怎么就攤上這事共虑±⒘” “怎么了?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵妈拌,是天一觀的道長拥坛。 經(jīng)常有香客問我,道長尘分,這世上最難降的妖魔是什么渴逻? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮音诫,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘雪位。我一直安慰自己竭钝,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布雹洗。 她就那樣靜靜地躺著香罐,像睡著了一般。 火紅的嫁衣襯著肌膚如雪时肿。 梳的紋絲不亂的頭發(fā)上庇茫,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天,我揣著相機(jī)與錄音螃成,去河邊找鬼旦签。 笑死,一個胖子當(dāng)著我的面吹牛寸宏,可吹牛的內(nèi)容都是我干的宁炫。 我是一名探鬼主播,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼氮凝,長吁一口氣:“原來是場噩夢啊……” “哼羔巢!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起罩阵,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤竿秆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后稿壁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體幽钢,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年常摧,在試婚紗的時候發(fā)現(xiàn)自己被綠了搅吁。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片威创。...
    茶點(diǎn)故事閱讀 40,861評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖谎懦,靈堂內(nèi)的尸體忽然破棺而出肚豺,到底是詐尸還是另有隱情,我是刑警寧澤界拦,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布吸申,位于F島的核電站,受9級特大地震影響享甸,放射性物質(zhì)發(fā)生泄漏截碴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一蛉威、第九天 我趴在偏房一處隱蔽的房頂上張望日丹。 院中可真熱鬧,春花似錦蚯嫌、人聲如沸哲虾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽束凑。三九已至,卻和暖如春栅盲,著一層夾襖步出監(jiān)牢的瞬間汪诉,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工谈秫, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留扒寄,地道東北人。 一個月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓孝常,卻偏偏與公主長得像旗们,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子构灸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理上渴,服務(wù)發(fā)現(xiàn),斷路器喜颁,智...
    卡卡羅2017閱讀 134,716評論 18 139
  • 背景介紹 Kafka簡介 Kafka是一種分布式的稠氮,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計目標(biāo)如下: 以時間復(fù)雜度為O...
    高廣超閱讀 12,843評論 8 167
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,729評論 13 425
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,475評論 0 34
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,842評論 4 54