30. Spring Cloud Alibaba之消息中間件 - Spring Cloud Stream

Spring Cloud Stream簡介

Spring Cloud Stream是什么:

Spring Cloud Stream是Spring Cloud的一個(gè)子項(xiàng)目骂倘,是一個(gè)能讓我們更加方便操作MQ的框架,其目的用于構(gòu)建與消息中間件連接的高度可伸縮的消息事件驅(qū)動(dòng)的微服務(wù)

簡單來說Spring Cloud Stream就是一個(gè)簡化了MQ操作的框架种柑,其架構(gòu)圖如下:

image
  • 圖片來自官方文檔厌丑,從圖中可以看到應(yīng)用通過input和output與Binder進(jìn)行交互钳恕,而Binder是一個(gè)讓我們的微服務(wù)與MQ集成的組件。圖中的Middleware即是消息中間件蹄衷,目前支持Kafka、RabbitMQ以及RocketMQ

Spring Cloud Stream編程模型:

image
  • 圖片來自官方文檔厘肮,微服務(wù)(Application)集成了Stream后愧口,Stream的Destination Binder會創(chuàng)建兩個(gè)Binding,左邊的Binding連接著RabbitMQ类茂,右邊的Binding連接著Kafka耍属。左邊的Binding從RabbitMQ消費(fèi)消息托嚣,然后經(jīng)過圖中代碼的處理后,把處理結(jié)果通過右邊的Binding投遞到Kafka厚骗。簡單來說示启,就是這個(gè)微服務(wù)消費(fèi)了RabbitMQ里的消息并對其進(jìn)行處理,最后將處理的結(jié)果投遞到Kafka中领舰。Input和Output是消息相對與微服務(wù)的走向夫嗓,input表示微服務(wù)接收消息,output表示微服務(wù)投遞消息或發(fā)送消息

關(guān)于圖中的概念:

  • Destination Binder(目標(biāo)綁定器):與消息中間件通信的組件冲秽,用于實(shí)現(xiàn)消息的消費(fèi)和投遞
  • Destination Bindings(目標(biāo)綁定):Binding是連接應(yīng)用程序跟消息中間件的橋梁舍咖,用于消息的消費(fèi)和生產(chǎn),由binder創(chuàng)建

使用Spring Cloud Stream

現(xiàn)在有一個(gè)微服務(wù)項(xiàng)目:content-center锉桑,該微服務(wù)作為生產(chǎn)者排霉,我們來為這個(gè)微服務(wù)集成Spring Cloud Stream,第一步添加stream依賴:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

  • Tips:該項(xiàng)目的Spring Cloud版本為:Greenwich.SR1民轴;Spring Cloud Alibaba版本為:2.1.0.RELEASE

第二步攻柠,在啟動(dòng)類上添加@EnableBinding注解,如下:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding(Source.class)
...

第三步后裸,在配置文件中瑰钮,添加與stream相關(guān)的配置項(xiàng):

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        # 生產(chǎn)者為output
        output:
          # 用于指定topic
          destination: stream-test-topic

完成以上步驟后,項(xiàng)目就已經(jīng)集成了Spring Cloud Stream轻抱,現(xiàn)在我們來使用Spring Cloud Stream編寫生產(chǎn)者飞涂,具體代碼如下:

package com.zj.node.contentcenter.controller.content;

import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 生產(chǎn)者
 *
 * @author 01
 * @date 2019-08-10
 **/
@RestController
@RequiredArgsConstructor
public class TestProducerController {

    private final Source source;

    @GetMapping("/test-stream")
    public String testStream(){
        Message<String> message = MessageBuilder
                .withPayload("消息體")
                .build();
        source.output()
                .send(message);

        return "send message success!";
    }
}

啟動(dòng)項(xiàng)目,測試該接口是否能成功執(zhí)行:

image

然后為另一個(gè)作為消費(fèi)者的微服務(wù)項(xiàng)目:user-center祈搜,集成Spring Cloud Stream较店,由于依賴配置是一樣的,這里就不進(jìn)行重復(fù)了容燕,但是配置和注解里的類需要更改一下梁呈。首先是配置如下:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        # 消費(fèi)者為input
        input:
          # 用于指定topic
          destination: stream-test-topic
          # rocketmq必須配置group,否則啟動(dòng)會報(bào)錯(cuò)
          # 如果使用的是其他MQ蘸秘,則不是必須配置的
          group: binder-group

啟動(dòng)類的注解如下:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
...

完成集成后官卡,使用Spring Cloud Stream編寫消費(fèi)者,具體代碼如下:

package com.zj.node.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

/**
 * 消費(fèi)者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {

    @StreamListener(Sink.INPUT)
    public void receive(String messageBody) {
        log.info("通過stream收到了消息醋虏,messageBody = {}", messageBody);
    }
}

完成代碼的編寫后啟動(dòng)項(xiàng)目寻咒,由于先前我們已經(jīng)通過生產(chǎn)者往RocketMQ投遞了消息,所以此時(shí)控制臺會輸出接收到的消息颈嚼,如下:

image

Spring Cloud Stream自定義接口

通過以上小節(jié)的學(xué)習(xí)毛秘,我們已經(jīng)了解了Spring Cloud Stream的基本使用。從以上示例可以得知,input用于綁定一個(gè)topic消費(fèi)消息叫挟,output則反之艰匙,用于綁定一個(gè)topic投遞消息。

但在實(shí)際的項(xiàng)目中抹恳,可能會有多個(gè)topic员凝,甚至在極端場景下,不同的topic可能使用不同的MQ實(shí)現(xiàn)奋献,而stream默認(rèn)提供的input和output都只能綁定一個(gè)topic健霹,所以這個(gè)時(shí)候就需要用到stream的自定義接口來實(shí)現(xiàn)多個(gè)“input”和“output”綁定不同的topic了。

在以上小節(jié)的示例中可以得知秽荞,生產(chǎn)者發(fā)送消息時(shí)使用的是Source接口里的output方法骤公,而消費(fèi)者發(fā)送消息時(shí)使用的是Sink接口里的input方法,并且都需要配置到啟動(dòng)類的@EnableBinding注解里扬跋。所以實(shí)際上我們需要自定義接口的源碼與這兩個(gè)接口的源碼幾乎一致阶捆,只是名稱有所不同而已,使用上也只是將SourceSink改為自定義的接口即可钦听。

接下來簡單演示一下如何自定義接口并使用洒试,我們基于上一小節(jié)的例子進(jìn)行改造。首先是生產(chǎn)者朴上,定義一個(gè)用于發(fā)送消息的接口垒棋,具體代碼如下:

package com.zj.node.contentcenter.rocketmq;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 自定義發(fā)送消息接口,與stream默認(rèn)提供的Source源碼是類似的
 *
 * @author 01
 * @date 2019-08-10
 **/
public interface MySource {

    /**
     * Name of the output channel.
     */
    String MY_OUTPUT = "my-output";

    /**
     * @return output channel
     */
    @Output(MY_OUTPUT)
    MessageChannel output();
}

然后在啟動(dòng)類的@EnableBinding中痪宰,添加這個(gè)接口:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding({Source.class, MySource.class})
...

在配置文件中添加如下配置:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        # 生產(chǎn)者為output
        output:
          # 用于指定topic
          destination: stream-test-topic
        # 自定義的”output“叼架,這里的名稱需要與MySource接口里的MY_OUTPUT相對應(yīng)  
        my-output:
          # 綁定不同的topic
          destination: stream-my-topic          

修改生產(chǎn)者的代碼如下即可:

package com.zj.node.contentcenter.controller.content;

import com.zj.node.contentcenter.rocketmq.MySource;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 生產(chǎn)者
 *
 * @author 01
 * @date 2019-08-03
 **/
@RestController
@RequiredArgsConstructor
public class TestProducerController {

    private final MySource mySource;

    @GetMapping("/test-stream")
    public String testStream(){
        Message<String> message = MessageBuilder
                .withPayload("消息體")
                .build();
        mySource.output()
                .send(message);

        return "send message success!";
    }
}

然后啟動(dòng)項(xiàng)目訪問該接口,測試消息是否能正常發(fā)送:

image

改造完生產(chǎn)者后接著改造消費(fèi)者衣撬,首先定義一個(gè)用于消費(fèi)消息的接口乖订,具體代碼如下:

package com.zj.node.usercenter.rocketmq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定義消費(fèi)消息接口,與stream默認(rèn)提供的Sink源碼是類似的
 *
 * @author 01
 * @date 2019-08-10
 **/
public interface MySink {

    /**
     * Input channel name.
     */
    String MY_INPUT = "my-input";

    /**
     * @return input channel.
     */
    @Input(MY_INPUT)
    SubscribableChannel input();
}

同樣需要在啟動(dòng)類的@EnableBinding中具练,添加這個(gè)接口:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding({Sink.class, MySink.class})
...

在配置文件中添加如下配置:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        # 消費(fèi)者為input
        input:
          # 用于指定topic
          destination: stream-test-topic
          # rocketmq必須配置group乍构,否則啟動(dòng)會報(bào)錯(cuò)
          # 如果使用的是其他MQ,則不是必須配置的
          group: binder-group
        # 自定義的”input“扛点,這里的名稱需要與MySink接口里的MY_INPUT相對應(yīng)    
        my-input:
          # 綁定不同的topic
          destination: stream-my-topic
          group: my-group

修改消費(fèi)者的代碼如下:

package com.zj.node.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

/**
 * 消費(fèi)者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {

    @StreamListener(MySink.MY_INPUT)
    public void receive(String messageBody) {
        log.info("自定義接口 - 通過stream收到了消息哥遮,messageBody = {}", messageBody);
    }
}

啟動(dòng)項(xiàng)目,由于先前我們已經(jīng)通過生產(chǎn)者往RocketMQ投遞了消息陵究,所以此時(shí)控制臺會輸出接收到的消息眠饮,如下:

image

Spring Cloud Stream的監(jiān)控

我們都知道Spring Boot Actuator組件用于暴露監(jiān)控端點(diǎn),很多監(jiān)控工具都需要依賴該組件的監(jiān)控端點(diǎn)實(shí)現(xiàn)監(jiān)控铜邮。而項(xiàng)目集成了Stream及Actuator后也會暴露相應(yīng)的監(jiān)控端點(diǎn)君仆,首先需要在項(xiàng)目里集成Actuator,添加依賴如下:

<!-- actuator -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

在配置文件中添加如下配置:

management:
  endpoints:
    web:
      exposure:
        # 暴露所有監(jiān)控端點(diǎn)
        include: '*'
  endpoint:
    health:
      # 顯示健康檢測詳情
      show-details: always

訪問http://127.0.0.1:{項(xiàng)目端口}/actuator可以獲取所有暴露出來的監(jiān)控端點(diǎn),Stream的相關(guān)監(jiān)控端點(diǎn)也在其列返咱,如下圖:

image

/actuator/bindings端點(diǎn)可以用于查看bindings相關(guān)信息:

image

/actuator/channels端點(diǎn)用于查看channels的相關(guān)信息,而“input”和“output”就是所謂的channel牍鞠,可以認(rèn)為這些channel是topic的抽象:

image

/actuator/health端點(diǎn)中可以查看binder及RocketMQ的狀態(tài)咖摹,主要是用于查看MQ的連接情況,如果連接不上其status則為DOWN:

image

Spring Cloud Stream + RocketMQ實(shí)現(xiàn)事務(wù)消息

先前在Spring Cloud Alibaba RocketMQ - 構(gòu)建異步通信的微服務(wù)一文的末尾中难述,我們介紹了RocketMQ的事務(wù)消息并且也演示了如何編碼實(shí)現(xiàn)萤晴。在本文學(xué)習(xí)了Spring Cloud Stream之后,我們來結(jié)合Stream對之前實(shí)現(xiàn)事務(wù)消息的代碼進(jìn)行重構(gòu)胁后。

首先修改配置文件如下:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
        bindings:
          output:
            producer:
              # 開啟事務(wù)消息店读,這樣通過output這個(gè)channel發(fā)送的消息都是半消息
              transactional: true
              # 生產(chǎn)者所在的事務(wù)組名稱
              group: tx-test-producer-group
      bindings:
        # 生產(chǎn)者為output
        output:
          # 用于指定topic
          destination: stream-test-topic

然后重構(gòu)TestProducerService,具體代碼如下:

package com.zj.node.contentcenter.service.test;

import com.alibaba.fastjson.JSON;
import com.zj.node.contentcenter.dao.content.NoticeMapper;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

/**
 * @author 01
 * @date 2019-08-08
 **/
@Service
@RequiredArgsConstructor
public class TestProducerService {

    private final NoticeMapper noticeMapper;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    private final Source source;

    public String testSendMsg(Notice notice) {
        // 生成事務(wù)id
        String transactionId = UUID.randomUUID().toString();
        // 通過stream發(fā)送消息攀芯,這里實(shí)際發(fā)送的就是半消息
        source.output().send(
                MessageBuilder.withPayload("消息體")
                        // header是消息的頭部分屯断,可以用作傳參
                        .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                        .setHeader("notice_id", notice.getId())
                        // 對象需要轉(zhuǎn)換成json,否則默認(rèn)是調(diào)用對象的toString方法轉(zhuǎn)換為字符串
                        .setHeader("notice", JSON.toJSONString(notice))
                        .build()
        );

        return "send message success";
    }

    @Transactional(rollbackFor = Exception.class)
    public void updateNotice(Integer noticeId, Notice notice) {
        Notice newNotice = new Notice();
        newNotice.setId(noticeId);
        newNotice.setContent(notice.getContent());

        noticeMapper.updateByPrimaryKeySelective(newNotice);
    }

    @Transactional(rollbackFor = Exception.class)
    public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
        updateNotice(noticeId, notice);
        // 寫入事務(wù)日志
        rocketmqTransactionLogMapper.insertSelective(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .log("updateNotice")
                        .build()
        );
    }
}

最后是重構(gòu)TestTransactionListener侣诺,具體代碼如下:

package com.zj.node.contentcenter.rocketmq;

import com.alibaba.fastjson.JSON;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import com.zj.node.contentcenter.service.test.TestProducerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/**
 * 本地事務(wù)監(jiān)聽器
 *
 * @author 01
 * @date 2019-08-08
 **/
@Slf4j
@RequiredArgsConstructor
// 這里的txProducerGroup需要與配置文件里配置的一致
@RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
public class TestTransactionListener implements RocketMQLocalTransactionListener {

    private final TestProducerService service;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    /**
     * 用于執(zhí)行本地事務(wù)的方法
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("執(zhí)行本地事務(wù)方法. 事務(wù)id: {}", transactionId);
        Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));
        // 由于從header里獲取的對象是json格式所以需要進(jìn)行轉(zhuǎn)換
        Notice notice = JSON.parseObject((String) headers.get("notice"), Notice.class);

        try {
            // 執(zhí)行帶有事務(wù)注解的方法
            service.updateNoticeWithRocketMQLog(noticeId, notice, transactionId);
            // 正常執(zhí)行向MQ Server發(fā)送commit消息
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("本地事務(wù)方法發(fā)生異常殖演,消息將被回滾", e);
            // 發(fā)生異常向MQ Server發(fā)送rollback消息
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 用于回查本地事務(wù)的執(zhí)行結(jié)果
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.warn("回查本地事務(wù)狀態(tài). 事務(wù)id: {}", transactionId);

        // 按事務(wù)id查詢?nèi)罩緮?shù)據(jù)
        RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .build()
        );

        // 如果能按事務(wù)id查詢出來數(shù)據(jù)表示本地事務(wù)執(zhí)行成功,沒有數(shù)據(jù)則表示本地事務(wù)執(zhí)行失敗
        if (transactionLog == null) {
            log.warn("本地事務(wù)執(zhí)行失敗年鸳,事務(wù)日志不存在趴久,消息將被回滾. 事務(wù)id: {}", transactionId);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

原文:https://blog.51cto.com/zero01/2428507

鏈接:http://www.reibang.com/p/aa1ded0e50c0

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市搔确,隨后出現(xiàn)的幾起案子彼棍,更是在濱河造成了極大的恐慌,老刑警劉巖膳算,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件座硕,死亡現(xiàn)場離奇詭異,居然都是意外死亡畦幢,警方通過查閱死者的電腦和手機(jī)坎吻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來宇葱,“玉大人瘦真,你說我怎么就攤上這事∈蚯疲” “怎么了诸尽?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長印颤。 經(jīng)常有香客問我您机,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任际看,我火速辦了婚禮咸产,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘仲闽。我一直安慰自己脑溢,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布赖欣。 她就那樣靜靜地躺著屑彻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪顶吮。 梳的紋絲不亂的頭發(fā)上社牲,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天,我揣著相機(jī)與錄音悴了,去河邊找鬼搏恤。 笑死,一個(gè)胖子當(dāng)著我的面吹牛让禀,可吹牛的內(nèi)容都是我干的挑社。 我是一名探鬼主播,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼巡揍,長吁一口氣:“原來是場噩夢啊……” “哼痛阻!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起腮敌,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤阱当,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后糜工,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體弊添,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年捌木,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了油坝。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,789評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡刨裆,死狀恐怖澈圈,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情帆啃,我是刑警寧澤瞬女,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站努潘,受9級特大地震影響诽偷,放射性物質(zhì)發(fā)生泄漏坤学。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一报慕、第九天 我趴在偏房一處隱蔽的房頂上張望深浮。 院中可真熱鬧,春花似錦卖子、人聲如沸略号。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至突梦,卻和暖如春诫舅,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背宫患。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工刊懈, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人娃闲。 一個(gè)月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓虚汛,卻偏偏與公主長得像,于是被迫代替她去往敵國和親皇帮。 傳聞我的和親對象是個(gè)殘疾皇子卷哩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評論 2 351