RocketMq 重置消費(fèi)位點(diǎn)邏輯

系列


開篇

  • 這篇文章的主要目的是分析RocketMq根據(jù)時間戳重置某topic在某consumeGroup下的消費(fèi)位點(diǎn)狼犯。

  • 重置位點(diǎn)的執(zhí)行順序按照admin 到 broker 到 consumer的順序依次觸發(fā)蔑匣,admin負(fù)責(zé)構(gòu)建參數(shù)通知broker碟狞,broker負(fù)責(zé)查詢consumeQueue的具體位移河咽,broker負(fù)責(zé)通知consumer進(jìn)行位移重置。

  • 根據(jù)時間戳查找consumeQueue對應(yīng)的位移,然后由broker通知consumer來持久化消費(fèi)位移,最終會持久化到broker的消費(fèi)位移遵班。

  • 重置位點(diǎn)操作本質(zhì)上是在consumer端執(zhí)行,consumer端負(fù)責(zé)持久化新的消費(fèi)位移然后由定時任務(wù)通知broker更新消費(fèi)位移潮改。

  • consumer在整個位移重置過程中會設(shè)置ProcessQueue的狀態(tài)為Dropped狭郑,從而阻斷消息拉取任務(wù)ConsumeRequest的執(zhí)行阻斷消息拉取,其次會在consumer側(cè)修改消費(fèi)位移通過心跳通知broker修改consumer的消費(fèi)位移汇在,最后通過重新的rebalance過程開始重新消費(fèi)消息翰萨。


重置命令

public class ResetOffsetByTimeCommand implements SubCommand {

    @Override
    public String commandName() {
        return "resetOffsetByTime";
    }

    @Override
    public Options buildCommandlineOptions(Options options) {
        Option opt = new Option("g", "group", true, "set the consumer group");
        opt.setRequired(true);
        options.addOption(opt);

        opt = new Option("t", "topic", true, "set the topic");
        opt.setRequired(true);
        options.addOption(opt);

        opt = new Option("s", "timestamp", true, "set the timestamp[now|currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
        opt.setRequired(true);
        options.addOption(opt);

        opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]");
        opt.setRequired(false);
        options.addOption(opt);

        opt = new Option("c", "cplus", false, "reset c++ client offset");
        opt.setRequired(false);
        options.addOption(opt);
        return options;
    }

    @Override
    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
        try {
            String group = commandLine.getOptionValue("g").trim();
            String topic = commandLine.getOptionValue("t").trim();
            String timeStampStr = commandLine.getOptionValue("s").trim();
            long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : 0;

            try {
                if (timestamp == 0) {
                    timestamp = Long.parseLong(timeStampStr);
                }
            } catch (NumberFormatException e) {

                timestamp = UtilAll.parseDate(timeStampStr, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
            }

            boolean force = true;
            if (commandLine.hasOption('f')) {
                force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
            }

            boolean isC = false;
            if (commandLine.hasOption('c')) {
                isC = true;
            }

            defaultMQAdminExt.start();
            Map<MessageQueue, Long> offsetTable;
            try {
                // 通過defaultMQAdminExt#resetOffsetByTimestamp來執(zhí)行
                offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
            } catch (MQClientException e) {
                if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
                    ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, group, topic, timestamp, force, timeStampStr);
                    return;
                }
                throw e;
            }
        } catch (Exception e) {
            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
        } finally {
            defaultMQAdminExt.shutdown();
        }
    }
}
  • ResetOffsetByTimeCommand的命令格式如上代碼所示,核心調(diào)用DefaultMQAdminExt#resetOffsetByTimestamp來重置趾疚。


admin

public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {

    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
        boolean isC)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        // 通過defaultMQAdminExtImpl#resetOffsetByTimestamp重置位移
        return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC);
    }
}


public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {

    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
        boolean isC)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

        // 1缨历、獲取topic對應(yīng)的TopicRouteData信息,進(jìn)而獲取對應(yīng)的BrokerData信息
        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
        List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
        Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
        if (brokerDatas != null) {
            // 2糙麦、遍歷topic所在的broker節(jié)點(diǎn)信息
            for (BrokerData brokerData : brokerDatas) {
                String addr = brokerData.selectBrokerAddr();
                if (addr != null) {
                    // 2辛孵、遠(yuǎn)程調(diào)用broker來執(zhí)行重置操作
                    Map<MessageQueue, Long> offsetTable =
                        this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
                            timeoutMillis, isC);
                    if (offsetTable != null) {
                        allOffsetTable.putAll(offsetTable);
                    }
                }
            }
        }
        return allOffsetTable;
    }
}
  • 獲取topic對應(yīng)的TopicRouteData信息,進(jìn)而獲取對應(yīng)的BrokerData信息赡磅。
  • 遍歷topic所在的broker節(jié)點(diǎn)信息魄缚,遠(yuǎn)程調(diào)用broker來執(zhí)行重置操作。
public class MQClientAPIImpl {

    public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
        final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
        throws RemotingException, MQClientException, InterruptedException {
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setTimestamp(timestamp);
        requestHeader.setForce(isForce);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
        if (isC) {
            request.setLanguage(LanguageCode.CPP);
        }

        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                if (response.getBody() != null) {
                    ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
                    return body.getOffsetTable();
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }
}
  • 遠(yuǎn)程調(diào)用的RequestCode為INVOKE_BROKER_TO_RESET_OFFSET。
  • 通過RequestCode可以方便查找BrokerProcessor便于分析冶匹。


broker

public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {

    public RemotingCommand resetOffset(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        // 解析requestHeader的信息
        final ResetOffsetRequestHeader requestHeader =
            (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);

        boolean isC = false;
        LanguageCode language = request.getLanguage();
        switch (language) {
            case CPP:
                isC = true;
                break;
        }
        // 通過Broker2Client#resetOffset重置位移
        return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
            requestHeader.getTimestamp(), requestHeader.isForce(), isC);
    }
}

public class Broker2Client {

    public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
                                       boolean isC) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        // 獲取TopicConfig相關(guān)信息
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
            return response;
        }

        Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
        // 遍歷所有的寫隊(duì)列并獲取每個MessageQueue的消費(fèi)位移
        for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
            MessageQueue mq = new MessageQueue();
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setTopic(topic);
            mq.setQueueId(i);
            // 查詢每個messageQueue查詢對應(yīng)的consumeQueue的消費(fèi)位移consumerOffset
            long consumerOffset =
                this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
            if (-1 == consumerOffset) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(String.format("THe consumer group <%s> not exist", group));
                return response;
            }
            
            // 根據(jù)時間戳查詢consumeQueue的位移
            long timeStampOffset;
            if (timeStamp == -1) {
                // 沒時間戳就獲取consumeQueue的最大位移
                timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
            } else {
                // 根據(jù)時間戳查找consumeQueue的最大位移
                timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
            }

            if (timeStampOffset < 0) {
                log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
                timeStampOffset = 0;
            }

            if (isForce || timeStampOffset < consumerOffset) {
                offsetTable.put(mq, timeStampOffset);
            } else {
                offsetTable.put(mq, consumerOffset);
            }
        }
        // RequestCode 為 RESET_CONSUMER_CLIENT_OFFSET
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setTimestamp(timeStamp);
        RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
        if (isC) {
            // c++ language
            ResetOffsetBodyForC body = new ResetOffsetBodyForC();
            List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
            body.setOffsetTable(offsetList);
            request.setBody(body.encode());
        } else {
            // other language
            ResetOffsetBody body = new ResetOffsetBody();
            body.setOffsetTable(offsetTable);
            request.setBody(body.encode());
        }
        // 獲取consumeGroup的所有consumer信息并通知位移重置
        ConsumerGroupInfo consumerGroupInfo =
            this.brokerController.getConsumerManager().getConsumerGroupInfo(group);

        if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
            ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
                consumerGroupInfo.getChannelInfoTable();
            for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
                int version = entry.getValue().getVersion();
                if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                    try {
                        this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                    } catch (Exception e) {
                    }
                } else {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("the client does not support this feature. version="
                        + MQVersion.getVersionDesc(version));
                    
                    return response;
                }
            }
        } else {
            String errorInfo =
                String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
                    requestHeader.getGroup(),
                    requestHeader.getTopic(),
                    requestHeader.getTimestamp());
            log.error(errorInfo);
            response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
            response.setRemark(errorInfo);
            return response;
        }
        response.setCode(ResponseCode.SUCCESS);
        ResetOffsetBody resBody = new ResetOffsetBody();
        resBody.setOffsetTable(offsetTable);
        response.setBody(resBody.encode());
        return response;
    }
}
  • Broker2Client#resetOffset負(fù)責(zé)執(zhí)行broker側(cè)的消費(fèi)位點(diǎn)的流程习劫。
  • 獲取topic對應(yīng)TopicConfig,進(jìn)而獲取topic的所有寫隊(duì)列嚼隘,
  • 遍歷所有的寫隊(duì)列诽里,結(jié)合consumeGroup和topic獲取該topic在consumeGroup下所有的隊(duì)列的根據(jù)時間戳查找的消費(fèi)位移。
  • 獲取consumeGroup下所有的client端信息飞蛹,依次調(diào)用client通知消費(fèi)位移谤狡。


public class DefaultMessageStore implements MessageStore {

    public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
        // 先根據(jù)topic和queueId查找對應(yīng)的ConsumeQueue對象
        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
        if (logic != null) {
            // 針對ConsumeQueue對象根據(jù)時間戳進(jìn)行查找返回對應(yīng)的位移
            return logic.getOffsetInQueueByTime(timestamp);
        }

        return 0;
    }

    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }
}
  • 根據(jù)topic和queueId查找對應(yīng)的ConsumeQueue對象。
public class ConsumeQueue {

    public long getOffsetInQueueByTime(final long timestamp) {
        // 根據(jù)timestamp查找ConsumeQueue卧檐,文件最后更新時間戳大于該timestamp的文件
        MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
        if (mappedFile != null) {
            long offset = 0;
            int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
            int high = 0;
            int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
            long leftIndexValue = -1L, rightIndexValue = -1L;

            // 獲取commitLog的最小物理偏移量
            long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
            SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
            if (null != sbr) {
                ByteBuffer byteBuffer = sbr.getByteBuffer();
                high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
                try {
                    // 二分查找確定時間戳對應(yīng)的位移
                    while (high >= low) {
                        // 二分查找的midOffset及對應(yīng)的commitLog的phyOffset
                        midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                        byteBuffer.position(midOffset);
                        long phyOffset = byteBuffer.getLong();
                        int size = byteBuffer.getInt();
                        // 比較二分查找中間值的位移和minPhysicOffset進(jìn)行比較
                        // 如果中間值偏小那么就從中間值往右進(jìn)行查找
                        if (phyOffset < minPhysicOffset) {
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            continue;
                        }
                        // 根據(jù)consumeQueue的記錄的commitLog的消息位移和文件大小查找對應(yīng)的消息存儲時間
                        long storeTime =
                            this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                        // 判斷storeTime的時間和timestamp進(jìn)行下比較
                        if (storeTime < 0) {
                            return 0;
                        } else if (storeTime == timestamp) {
                            // 查找到目的commitLog的位移
                            targetOffset = midOffset;
                            break;
                        } else if (storeTime > timestamp) {
                            // 查找到消息的存儲時間比當(dāng)前查找的timestamp大
                            high = midOffset - CQ_STORE_UNIT_SIZE;
                            rightOffset = midOffset;
                            rightIndexValue = storeTime;
                        } else {
                            // 查找到消息的存儲時間比當(dāng)前查找的timestamp小
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            leftIndexValue = storeTime;
                        }
                    }

                    if (targetOffset != -1) {
                        offset = targetOffset;
                    } else {
                        if (leftIndexValue == -1) {
                            offset = rightOffset;
                        } else if (rightIndexValue == -1) {
                            offset = leftOffset;
                        } else {
                            offset =
                                Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                    - rightIndexValue) ? rightOffset : leftOffset;
                        }
                    }

                    return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
                } finally {
                    sbr.release();
                }
            }
        }
        return 0;
    }
}
  • 通過二分查找方法來定位重置時間戳對應(yīng)的consumeQueue的位移墓懂。


consumer

public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {

    public RemotingCommand resetOffset(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        // 1、解析ResetOffsetRequestHeader對象
        final ResetOffsetRequestHeader requestHeader =
            (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
        // 2霉囚、解析重置的offsetTable
        Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
        if (request.getBody() != null) {
            ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
            offsetTable = body.getOffsetTable();
        }

        // 3捕仔、通過offsetTable來重置client的消費(fèi)位移
        this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
        return null;
    }
}
  • ClientRemotingProcessor#resetOffset負(fù)責(zé)執(zhí)行broker通知client的重置位移命令。
public class MQClientInstance {

    public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
        DefaultMQPushConsumerImpl consumer = null;
        try {
            MQConsumerInner impl = this.consumerTable.get(group);
            if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
                consumer = (DefaultMQPushConsumerImpl) impl;
            } else {
                log.info("[reset-offset] consumer dose not exist. group={}", group);
                return;
            }
            consumer.suspend();

            ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
            for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
                MessageQueue mq = entry.getKey();
                // 重置所有的ProcessQueue標(biāo)記為dropped
                if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                    ProcessQueue pq = entry.getValue();
                    pq.setDropped(true);
                    pq.clear();
                }
            }

            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
            }

            Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
            while (iterator.hasNext()) {
                MessageQueue mq = iterator.next();
                Long offset = offsetTable.get(mq);
                if (topic.equals(mq.getTopic()) && offset != null) {
                    try {
                        consumer.updateConsumeOffset(mq, offset);
                        consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                        iterator.remove();
                    } catch (Exception e) {
                        log.warn("reset offset failed. group={}, {}", group, mq, e);
                    }
                }
            }
        } finally {
            if (consumer != null) {
                consumer.resume();
            }
        }
    }
}
  • consumer會匹配所有重置位移的MessageQueue盈罐,然后設(shè)置對應(yīng)的ProcessQueue的狀態(tài)為Dropped榜跌。
  • consumer會針對所有的MessageQueue持久化對應(yīng)的重置消費(fèi)位移。
  • consumer會移除processQueueTable所有相關(guān)的MessageQueue對象暖呕。
  • ConsumeRequest在ProcessQueue的狀態(tài)為Dropped會停止拉取消息斜做。
  • consumer在定時rebalance過程中會重新生成ConsumeRequest并重新開始拉取消息。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末湾揽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子笼吟,更是在濱河造成了極大的恐慌库物,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贷帮,死亡現(xiàn)場離奇詭異戚揭,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)撵枢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進(jìn)店門民晒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人锄禽,你說我怎么就攤上這事潜必。” “怎么了沃但?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵磁滚,是天一觀的道長。 經(jīng)常有香客問我,道長垂攘,這世上最難降的妖魔是什么维雇? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任,我火速辦了婚禮晒他,結(jié)果婚禮上吱型,老公的妹妹穿的比我還像新娘。我一直安慰自己陨仅,他們只是感情好唁影,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著掂名,像睡著了一般据沈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上饺蔑,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天锌介,我揣著相機(jī)與錄音,去河邊找鬼猾警。 笑死孔祸,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的发皿。 我是一名探鬼主播崔慧,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼穴墅!你這毒婦竟也來了惶室?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤玄货,失蹤者是張志新(化名)和其女友劉穎皇钞,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體松捉,經(jīng)...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贡蓖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年耳舅,在試婚紗的時候發(fā)現(xiàn)自己被綠了伞芹。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片怯邪。...
    茶點(diǎn)故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖丙者,靈堂內(nèi)的尸體忽然破棺而出复斥,到底是詐尸還是另有隱情,我是刑警寧澤蔓钟,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布永票,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏侣集。R本人自食惡果不足惜键俱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望世分。 院中可真熱鬧编振,春花似錦、人聲如沸臭埋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瓢阴。三九已至畅蹂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間荣恐,已是汗流浹背液斜。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留叠穆,地道東北人少漆。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像硼被,于是被迫代替她去往敵國和親示损。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,630評論 2 359