RocketMQ為什么要保證訂閱關(guān)系的一致性?

微信公眾號(hào)「后端進(jìn)階」破婆,專注后端技術(shù)分享:Java涮总、Golang、WEB框架荠割、分布式中間件妹卿、服務(wù)治理等等。

前段時(shí)間有個(gè)朋友向我提了一個(gè)問題蔑鹦,他說在搭建 RocketMQ 集群過程中遇到了關(guān)于消費(fèi)訂閱的問題夺克,具體問題如下:

image
image

然后他發(fā)了報(bào)錯(cuò)的日志給我看:

the consumer's subscription not exist

我第一時(shí)間在源碼里找到了報(bào)錯(cuò)的位置:

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
  log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
  response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
  response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
  return response;
}

此處源碼是將該 Topic 的訂閱信息找出來,然而這里卻沒找到嚎朽,所以報(bào)了消費(fèi)訂閱不存在的錯(cuò)誤铺纽。

朋友還跟我講了他的消費(fèi)集群中,每個(gè)消費(fèi)者訂閱了自己的 Topic哟忍,他的消費(fèi)組中 有 c1 和 c2 消費(fèi)者狡门,c1 訂閱了 topicA,而 c2 訂閱了 topicB锅很。

這時(shí)我已經(jīng)知道什么原因了其馏,我先說一下消費(fèi)者的訂閱信息在 broker 中是以 group 來分組的,數(shù)據(jù)結(jié)構(gòu)如下:

org.apache.rocketmq.broker.client.ConsumerManager:

private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
  new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

這意味著集群中的每個(gè)消費(fèi)者在向 broker 注冊(cè)訂閱信息的時(shí)候相互覆蓋掉對(duì)方的訂閱信息了爆安,這也是為什么同一個(gè)消費(fèi)組應(yīng)該擁有完全一樣的訂閱關(guān)系的原因叛复,而朋友在同一個(gè)消費(fèi)組的每個(gè)消費(fèi)者訂閱關(guān)系都不一樣,就出現(xiàn)了訂閱信息相互覆蓋的問題扔仓。

可是朋友這時(shí)又有疑惑了褐奥,他覺得每個(gè)消費(fèi)者訂閱自己的主題,貌似沒問題啊翘簇,邏輯上也行的通撬码,他不明白為什么 RocketMQ 不允許這樣做,于是秉承著老司機(jī)的職業(yè)素養(yǎng)版保,下面我會(huì)從源碼的角度深度分析 RocketMQ 消費(fèi)訂閱注冊(cè)呜笑,消息拉取夫否,消息隊(duì)列負(fù)載與重新分布機(jī)制,讓大家徹底弄清 RocketMQ 消費(fèi)訂閱機(jī)制叫胁。

消費(fèi)者訂閱信息注冊(cè)

消費(fèi)者在啟動(dòng)時(shí)會(huì)向所有 broker 注冊(cè)訂閱信息慷吊,并啟動(dòng)心跳機(jī)制,定時(shí)更新訂閱信息曹抬,每個(gè)消費(fèi)者都有一個(gè) MQClientInstance,消費(fèi)者啟動(dòng)時(shí)會(huì)啟動(dòng)這個(gè)類急鳄,啟動(dòng)方法中會(huì)啟動(dòng)一些列定時(shí)任務(wù)谤民,其中:

org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
    try {
      MQClientInstance.this.cleanOfflineBroker();
      MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
    } catch (Exception e) {
      log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
    }
  }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

上面是向集群內(nèi)所有 broker 發(fā)送訂閱心跳信息的定時(shí)任務(wù),源碼繼續(xù)跟進(jìn)去疾宏,發(fā)現(xiàn)會(huì)給集群中的每個(gè) broker 都發(fā)送自己的 HeartbeatData张足,HeartbeatData 即是每個(gè)客戶端的心跳數(shù)據(jù),它包含了如下數(shù)據(jù):

// 客戶端ID
private String clientID;
// 生產(chǎn)者信息
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
// 消費(fèi)者信息
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();

其中消費(fèi)者信息包含了客戶端訂閱的主題信息坎藐。

我們繼續(xù)看看 broker 如何處理 HeartbeatData 數(shù)據(jù)为牍,客戶端發(fā)送 HeartbeatData 時(shí)的請(qǐng)求類型為 HEART_BEAT,我們直接找到 broker 處理 HEART_BEAT 請(qǐng)求類型的邏輯:

org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat:

public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
  RemotingCommand response = RemotingCommand.createResponseCommand(null);
  // 解碼岩馍,獲取 HeartbeatData
  HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
  ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
    ctx.channel(),
    heartbeatData.getClientID(),
    request.getLanguage(),
    request.getVersion()
  );

  // 循環(huán)注冊(cè)消費(fèi)者訂閱信息
  for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
    // 按消費(fèi)組獲取訂閱配置信息
    SubscriptionGroupConfig subscriptionGroupConfig =
      this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
      data.getGroupName());
    boolean isNotifyConsumerIdsChangedEnable = true;
    if (null != subscriptionGroupConfig) {
      isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
      int topicSysFlag = 0;
      if (data.isUnitMode()) {
        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
      }
      String newTopic = MixAll.getRetryTopic(data.getGroupName());
      this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
        newTopic,
        subscriptionGroupConfig.getRetryQueueNums(),
        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    }

    // 注冊(cè)消費(fèi)者訂閱信息
    boolean changed = this.brokerController.getConsumerManager().registerConsumer(
      data.getGroupName(),
      clientChannelInfo,
      data.getConsumeType(),
      data.getMessageModel(),
      data.getConsumeFromWhere(),
      data.getSubscriptionDataSet(),
      isNotifyConsumerIdsChangedEnable
    );
    // ...
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
  }

在這里我們可以看到碉咆,broker 收到 HEART_BEAT 請(qǐng)求后,將請(qǐng)求數(shù)據(jù)解壓獲取 HeartbeatData蛀恩,根據(jù) HeartbeatData 里面的消費(fèi)訂閱信息疫铜,循環(huán)進(jìn)行注冊(cè):

org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer:

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
                                ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
                                final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

  // 獲取消費(fèi)組內(nèi)的消費(fèi)者信息
  ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
  // 如果消費(fèi)組的消費(fèi)者信息為空,則新建一個(gè)
  if (null == consumerGroupInfo) {
    ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
    ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
    consumerGroupInfo = prev != null ? prev : tmp;
  }

  boolean r1 =
    consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                                    consumeFromWhere);
  // 更新訂閱信息双谆,訂閱信息是按照消費(fèi)組存放的壳咕,因此這步驟就會(huì)導(dǎo)致同一個(gè)消費(fèi)組內(nèi)的各個(gè)消費(fèi)者客戶端的訂閱信息相互被覆蓋
  boolean r2 = consumerGroupInfo.updateSubscription(subList);

  if (r1 || r2) {
    if (isNotifyConsumerIdsChangedEnable) {
      this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    }
  }

  this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

  return r1 || r2;
}

這步驟是 broker 更新消費(fèi)者訂閱信息的核心方法,如果消費(fèi)組的消費(fèi)者信息 ConsumerGroupInfo 為空顽馋,則新建一個(gè)谓厘,從名字可知道,訂閱信息是按照消費(fèi)組進(jìn)行存放的寸谜,因此在更新訂閱信息時(shí)竟稳,訂閱信息是按照消費(fèi)組存放的,這步驟就會(huì)導(dǎo)致同一個(gè)消費(fèi)組內(nèi)的各個(gè)消費(fèi)者客戶端的訂閱信息相互被覆蓋程帕。

消息拉取

在 MQClientInstance 啟動(dòng)時(shí)住练,會(huì)啟動(dòng)一條線程來處理消息拉取任務(wù):

org.apache.rocketmq.client.impl.factory.MQClientInstance#start:

// Start pull service
this.pullMessageService.start();

pullMessageService 繼承了 ServiceThread,而 ServiceThread 實(shí)現(xiàn)了 Runnable 接口愁拭,它的 run 方法實(shí)現(xiàn)如下:

org.apache.rocketmq.client.impl.consumer.PullMessageService#run:

@Override
public void run() {
  while (!this.isStopped()) {
    try {
      // 從 pullRequestQueue 中獲取拉取消息請(qǐng)求對(duì)象
      PullRequest pullRequest = this.pullRequestQueue.take();
      // 執(zhí)行消息拉取
      this.pullMessage(pullRequest);
    } catch (InterruptedException ignored) {
    } catch (Exception e) {
      log.error("Pull Message Service Run Method exception", e);
    }
  }
}

消費(fèi)端拿到 PullRequest 對(duì)象進(jìn)行拉取消息讲逛,pullRequestQueue 是一個(gè)阻塞隊(duì)列,如果 pullRequest 數(shù)據(jù)為空岭埠,執(zhí)行 take() 方法會(huì)一直阻塞盏混,直到有新的 pullRequest 拉取任務(wù)進(jìn)來蔚鸥,這里是一個(gè)很關(guān)鍵的步驟,你可能會(huì)想许赃,pullRequest 什么時(shí)候被創(chuàng)建然后放入 pullRequestQueue止喷?pullRequest 它是在RebalanceImpl 中創(chuàng)建,它是 RocketMQ 消息隊(duì)列負(fù)載與重新分布機(jī)制的實(shí)現(xiàn)混聊。

消息隊(duì)列負(fù)載與重新分布

從上面消息拉取源碼分析可知弹谁,pullMessageService 啟動(dòng)時(shí)由于 pullRequestQueue 中沒有 pullRequest 對(duì)象,會(huì)一直阻塞句喜,而在 MQClientInstance 啟動(dòng)時(shí)预愤,同樣會(huì)啟動(dòng)一條線程來處理消息隊(duì)列負(fù)載與重新分布任務(wù):

org.apache.rocketmq.client.impl.factory.MQClientInstance#start:

// Start rebalance service
this.rebalanceService.start();

rebalanceService 同樣繼承了 ServiceThread,它的 run 方法如下:

@Override
public void run() {
  while (!this.isStopped()) {
    this.waitForRunning(waitInterval);
    this.mqClientFactory.doRebalance();
  }
}

繼續(xù)跟進(jìn)去:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance:

public void doRebalance(final boolean isOrder) {
  // 獲取消費(fèi)者所有訂閱信息
  Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  if (subTable != null) {
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
      final String topic = entry.getKey();
      try {
        // 消息隊(duì)列負(fù)載與重新分布
        this.rebalanceByTopic(topic, isOrder);
      } catch (Throwable e) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          log.warn("rebalanceByTopic Exception", e);
        }
      }
    }
  }
  this.truncateMessageQueueNotMyTopic();
}

這里主要是獲取客戶端訂閱的主題咳胃,并根據(jù)主題進(jìn)行消息隊(duì)列負(fù)載與重新分布植康,subTable 存儲(chǔ)了消費(fèi)者的訂閱信息,消費(fèi)者進(jìn)行消息訂閱時(shí)會(huì)填充到里面展懈,我們接著往下:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

rebalanceByTopic 方法是實(shí)現(xiàn) Consumer 端負(fù)載均衡的核心销睁,我們這里以集群模式的消息隊(duì)列負(fù)載與重新分布,首先從 topicSubscribeInfoTable 中獲取訂閱主題的隊(duì)列信息存崖,接著隨機(jī)從集群中的一個(gè) broker 中獲取消費(fèi)組內(nèi)某個(gè) topic 的訂閱客戶端 ID 列表冻记,這里需要注意的是,為什么從集群內(nèi)任意一個(gè) broker 就可以獲取訂閱客戶端信息呢来惧?前面的分析也說了檩赢,消費(fèi)者客戶端啟動(dòng)時(shí)會(huì)啟動(dòng)一個(gè)線程,向所有 broker 發(fā)送心跳包违寞。

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

// 如果 主題訂閱信息mqSet和主題訂閱客戶端不為空贞瞒,就執(zhí)行消息隊(duì)列負(fù)載與重新分布
if (mqSet != null && cidAll != null) {
  List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
  mqAll.addAll(mqSet);

  // 排序,確保每個(gè)消息隊(duì)列只分配一個(gè)消費(fèi)者
  Collections.sort(mqAll);
  Collections.sort(cidAll);

  // 消息隊(duì)列分配算法
  AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

  // 執(zhí)行算法趁曼,并得到隊(duì)列重新分配后的結(jié)果對(duì)象allocateResult
  List<MessageQueue> allocateResult = null;
  try {
    allocateResult = strategy.allocate(
      this.consumerGroup,
      this.mQClientFactory.getClientId(),
      mqAll,
      cidAll);
  } catch (Throwable e) {
    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
              e);
    return;
  }
  // ...
}

以上是消息負(fù)載均衡的核心邏輯军浆,RocketMQ 本身提供了 5 種負(fù)載算法,默認(rèn)使用 AllocateMessageQueueAveragely 平均分配算法挡闰,它分配算法特點(diǎn)如下:

假設(shè)有消費(fèi)組 g1乒融,有消費(fèi)者 c1 和 c2,c1 訂閱了 topicA摄悯,c2 訂閱了 topicB赞季,集群內(nèi)有 broker1 和broker2,假設(shè) topicA 有 8 個(gè)消息隊(duì)列奢驯,broker_a(q0/q1/q2/q3) 和 broker_b(q0/q1/q2/q3)申钩,前面我們知道 findConsumerIdList 方法會(huì)獲取消費(fèi)組內(nèi)所有消費(fèi)者客戶端 ID,topicA 經(jīng)過平均分配算法進(jìn)行分配之后的消費(fèi)情況如下:

c1:broker_a(q0/q1/q2/q3)

c2:broker_b(q0/q1/q2/q3)

問題就出現(xiàn)在這里瘪阁,c2 根本沒有訂閱 topicA撒遣,但根據(jù)分配算法邮偎,卻要加上 c2 進(jìn)行分配,這樣就會(huì)導(dǎo)致這種情況有一半的消息被分配到 c2 進(jìn)行消費(fèi)义黎,被分配到 c2 的消息隊(duì)列會(huì)延遲十幾秒甚至更久才會(huì)被消費(fèi)禾进,topicB 同理

下面我用圖表示 topicA 和 topicB 經(jīng)過 rebalance 之后的消費(fèi)情況:

image

至于為什么會(huì)報(bào) the consumer's subscription not exist廉涕,我們繼續(xù)往下擼:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

if (mqSet != null && cidAll != null) {
  // ...
  Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
  if (allocateResult != null) {
    allocateResultSet.addAll(allocateResult);
  }
  // 用戶重新分配后的結(jié)果allocateResult來更新當(dāng)前消費(fèi)者負(fù)載的消息隊(duì)列緩存表processQueueTable泻云,并生成 pullRequestList 放入 pullRequestQueue 阻塞隊(duì)列中
  boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
  if (changed) {
    log.info(
      "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
      strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
      allocateResultSet.size(), allocateResultSet);
    this.messageQueueChanged(topic, mqSet, allocateResultSet);
  }
}

以上代碼邏輯主要是拿 mqSet 和 cidAll 進(jìn)行消息隊(duì)列負(fù)載與重新分布,得到結(jié)果 allocateResult狐蜕,它是一個(gè) MessageQueue 列表壶愤,接著用 allocateResult 更新消費(fèi)者負(fù)載的消息隊(duì)列緩存表 processQueueTable,生成 pullRequestList 放入 pullRequestQueue 阻塞隊(duì)列中:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 循環(huán)執(zhí)行馏鹤,將mqSet訂閱數(shù)據(jù)封裝成PullRequest對(duì)象,并添加到pullRequestList中
for (MessageQueue mq : mqSet) {
  // 如果緩存列表不存在該訂閱信息娇哆,說明這次消息隊(duì)列重新分配后新增加的消息隊(duì)列
  if (!this.processQueueTable.containsKey(mq)) {
    if (isOrder && !this.lock(mq)) {
      log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
      continue;
    }
    this.removeDirtyOffset(mq);
    ProcessQueue pq = new ProcessQueue();
    long nextOffset = this.computePullFromWhere(mq);
    if (nextOffset >= 0) {
      ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
      if (pre != null) {
        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
      } else {
        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
        PullRequest pullRequest = new PullRequest();
        pullRequest.setConsumerGroup(consumerGroup);
        pullRequest.setNextOffset(nextOffset);
        pullRequest.setMessageQueue(mq);
        pullRequest.setProcessQueue(pq);
        pullRequestList.add(pullRequest);
        changed = true;
      }
    } else {
      log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    }
  }
}
// 將pullRequestList添加到PullMessageService中的pullRequestQueue阻塞隊(duì)列中湃累,以喚醒PullMessageService線程執(zhí)行消息拉取
this.dispatchPullRequest(pullRequestList);

前面我們講到消息拉取是從 pullRequestQueue 阻塞隊(duì)列中拿 pullRequest 執(zhí)行拉取的,以上方法就是創(chuàng)建 pullRequest 的地方碍讨。

源碼分析到這里治力,就可以弄清楚為什么會(huì)報(bào) the consumer's subscription not exist 這個(gè)錯(cuò)誤了:

假設(shè)有消費(fèi)者組 g1,g1下有消費(fèi)者 c1 和消費(fèi)者 c2勃黍,c1 訂閱了 topicA宵统,c2 訂閱了 topicB,此時(shí)c2 先啟動(dòng)覆获,將 g1 的訂閱信息更新為 topicB马澈,c1 隨后啟動(dòng),將 g1 的訂閱信息覆蓋為 topicA弄息,c1 的 Rebalance 負(fù)載將 topicA 的 pullRequest 添加到 pullRequestQueue 中痊班,而恰好此時(shí) c2 心跳包又將 g1 的訂閱信息更新為 topicB,那么此時(shí) c1 的 PullMessageService 線程拿到 pullRequestQueue 中 topicA 的 pullRequest 進(jìn)行消息拉取摹量,然而在 broker 端找不到消費(fèi)者組 g1 下 topicA 的訂閱信息(因?yàn)榇藭r(shí)恰好被 c2 心跳包給覆蓋了)涤伐,就會(huì)報(bào)消費(fèi)者訂閱信息不存在的錯(cuò)誤了

公眾號(hào)「后端進(jìn)階」缨称,專注后端技術(shù)分享凝果!
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市睦尽,隨后出現(xiàn)的幾起案子器净,更是在濱河造成了極大的恐慌,老刑警劉巖当凡,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件掌动,死亡現(xiàn)場(chǎng)離奇詭異四啰,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)粗恢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門柑晒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人眷射,你說我怎么就攤上這事匙赞。” “怎么了妖碉?”我有些...
    開封第一講書人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵涌庭,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我欧宜,道長(zhǎng)坐榆,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任冗茸,我火速辦了婚禮席镀,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘夏漱。我一直安慰自己豪诲,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開白布挂绰。 她就那樣靜靜地躺著屎篱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪葵蒂。 梳的紋絲不亂的頭發(fā)上交播,一...
    開封第一講書人閱讀 49,929評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音践付,去河邊找鬼堪侯。 笑死,一個(gè)胖子當(dāng)著我的面吹牛荔仁,可吹牛的內(nèi)容都是我干的伍宦。 我是一名探鬼主播,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼乏梁,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼次洼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起遇骑,我...
    開封第一講書人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤卖毁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體亥啦,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡炭剪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了翔脱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片奴拦。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖届吁,靈堂內(nèi)的尸體忽然破棺而出错妖,到底是詐尸還是另有隱情,我是刑警寧澤疚沐,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布暂氯,位于F島的核電站,受9級(jí)特大地震影響亮蛔,放射性物質(zhì)發(fā)生泄漏痴施。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一究流、第九天 我趴在偏房一處隱蔽的房頂上張望辣吃。 院中可真熱鬧,春花似錦梯嗽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至绵估,卻和暖如春炎疆,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背国裳。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來泰國打工形入, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人缝左。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓亿遂,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親渺杉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蛇数,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容