Flume源碼分析

首先所有核心組件都會實現(xiàn)org.apache.flume.lifecycle.LifecycleAware接口:

public interface LifecycleAware {
  public void start();
  public void stop();
  public LifecycleState getLifecycleState();
}

start方法在整個Flume啟動時或者初始化組件時都會調(diào)用start方法進行組件初始化妆够,F(xiàn)lume組件出現(xiàn)異常停止時會調(diào)用stop识啦,getLifecycleState返回組件的生命周期狀態(tài),有IDLE, START, STOP, ERROR四個狀態(tài)神妹。
如果開發(fā)的組件需要配置颓哮,如設(shè)置一些屬性;可以實現(xiàn)org.apache.flume.conf.Configurable接口:

public interface Configurable {
   public void configure(Context context);
}

Flume在啟動組件之前會調(diào)用configure來初始化組件一些配置鸵荠。

1冕茅、Source

Source用于采集日志數(shù)據(jù),有兩種實現(xiàn)方式:輪詢拉取和事件驅(qū)動機制蛹找;Source接口如下:

public interface Source extends LifecycleAware, NamedComponent {
  public void setChannelProcessor(ChannelProcessor channelProcessor);
  public ChannelProcessor getChannelProcessor();
}

Source接口首先繼承了LifecycleAware接口姨伤,然后只提供了ChannelProcessor的setter和getter接口,也就是說它的的所有邏輯的實現(xiàn)應(yīng)該在LifecycleAware接口的start和stop中實現(xiàn)庸疾;ChannelProcessor之前介紹過用來進行日志流的過濾和Channel的選擇及調(diào)度乍楚。
而 Source 是通過 SourceFactory 工廠創(chuàng)建,默認提供了 DefaultSourceFactory 届慈,其首先通過 Enum 類型 org.apache.flume.conf.source.SourceType 查找默認實現(xiàn)徒溪,如 exec ,則找到 org.apache.flume.source.ExecSource 實現(xiàn)金顿,如果找不到直接 Class.forName(className) 創(chuàng)建臊泌。
Source 提供了兩種機制: PollableSource (輪詢拉取)和 EventDrivenSource(事件驅(qū)動):

PollableSource 默認提供了如下實現(xiàn):

比如 JMSSource 實現(xiàn)使用 javax.jms.MessageConsumer.receive(pollTimeout) 主動去拉取消息串绩。
EventDrivenSource 默認提供了如下實現(xiàn):

比如 NetcatSource 缺虐、 HttpSource 就是事件驅(qū)動,即被動等待礁凡;比如 HttpSource 就是內(nèi)部啟動了一個內(nèi)嵌的 Jetty 啟動了一個 Servlet 容器高氮,通過 FlumeHTTPServlet 去接收消息慧妄。
Flume 提供了 SourceRunner 用來啟動 Source 的流轉(zhuǎn):

public class EventDrivenSourceRunner extends SourceRunner {
  private LifecycleState lifecycleState;
  public EventDrivenSourceRunner() {
      lifecycleState = LifecycleState.IDLE; //啟動之前是空閑狀態(tài)
  }
 
  @Override
  public void start() {
    Source source = getSource(); //獲取Source
    ChannelProcessor cp = source.getChannelProcessor(); //Channel處理器
    cp.initialize(); //初始化Channel處理器
    source.start();  //啟動Source
    lifecycleState = LifecycleState.START; //本組件狀態(tài)改成啟動狀態(tài)
  }
  @Override
  public void stop() {
    Source source = getSource(); //先停Source
    source.stop();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.close();//再停Channel處理器
    lifecycleState = LifecycleState.STOP; //本組件狀態(tài)改成停止?fàn)顟B(tài)
  }
}

從本組件也可以看出: 1 、首先要初始化 ChannelProcessor 剪芍,其實現(xiàn)時初始化過濾器鏈塞淹; 2 、接著啟動 Source 并更改本組件的狀態(tài)罪裹。

public class PollableSourceRunner extends SourceRunner {
 @Override
 public void start() {
  PollableSource source = (PollableSource) getSource();
  ChannelProcessor cp = source.getChannelProcessor();
  cp.initialize();
  source.start();
 
  runner = new PollingRunner();
  runner.source = source;
  runner.counterGroup = counterGroup;
  runner.shouldStop = shouldStop;
 
  runnerThread = new Thread(runner);
  runnerThread.setName(getClass().getSimpleName() + "-" + 
      source.getClass().getSimpleName() + "-" + source.getName());
  runnerThread.start(); 
 
  lifecycleState = LifecycleState.START;
 }
}

而 PollingRunner 首先初始化組件饱普,但是又啟動了一個線程 PollingRunner ,其作用就是輪詢拉取數(shù)據(jù):

@Override
  public void run() {
    while (!shouldStop.get()) { //如果沒有停止状共,則一直在死循環(huán)運行
      counterGroup.incrementAndGet("runner.polls");
 
      try {
        //調(diào)用PollableSource的process方法進行輪詢拉取,然后判斷是否遇到了失敗補償
        if (source.process().equals(PollableSource.Status.BACKOFF)) {/
          counterGroup.incrementAndGet("runner.backoffs");
 
          //失敗補償時暫停線程處理峡继,等待超時時間之后重試
          Thread.sleep(Math.min(
              counterGroup.incrementAndGet("runner.backoffs.consecutive")
              * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
        } else {
          counterGroup.set("runner.backoffs.consecutive", 0L);
        }
      } catch (InterruptedException e) {
                }
      }
    }
  }
}

Flume 在啟動時會判斷 Source 是 PollableSource 還是 EventDrivenSource 來選擇使用 PollableSourceRunner 還是 EventDrivenSourceRunner 冯袍。
比如 HttpSource 實現(xiàn),其通過 FlumeHTTPServlet 接收消息然后:

List<Event> events = Collections.emptyList(); //create empty list
    //首先從請求中獲取Event
    events = handler.getEvents(request);
    //然后交給ChannelProcessor進行處理
    getChannelProcessor().processEventBatch(events);

到此基本的 Source 流程就介紹完了康愤,其作用就是監(jiān)聽日志征冷,采集检激,然后交給 ChannelProcessor 進行處理呵扛。

2 筐带、 Channel

Channel 用于連接 Source 和 Sink , Source 生產(chǎn)日志發(fā)送到 Channel 帖鸦, Sink 從 Channel 消費日志作儿;也就是說通過 Channel 實現(xiàn)了 Source 和 Sink 的解耦攻锰,可以實現(xiàn)多對多的關(guān)聯(lián)娶吞,和 Source 、 Sink 的異步化妒蛇。
之前 Source 采集到日志后會交給 ChannelProcessor 處理吏奸,那么接下來我們先從 ChannelProcessor 入手苦丁,其依賴三個組件:

private final ChannelSelector selector;  //Channel選擇器
private final InterceptorChain interceptorChain; //攔截器鏈
private ExecutorService execService; //用于實現(xiàn)可選Channel的ExecutorService,默認是單線程實現(xiàn)

接下來看下其是如何處理 Event 的:

public void processEvent(Event event) {
  event = interceptorChain.intercept(event); //首先進行攔截器鏈過濾
  if (event == null) {
    return;
  }
  List<Event> events = new ArrayList<Event>(1);
  events.add(event);
 
  //通過Channel選擇器獲取必須成功處理的Channel蛾狗,然后事務(wù)中執(zhí)行
  List<Channel> requiredChannels = selector.getRequiredChannels(event);
  for (Channel reqChannel : requiredChannels) { 
    executeChannelTransaction(reqChannel, events, false);
  }
 
  //通過Channel選擇器獲取可選的Channel留凭,這些Channel失敗是可以忽略蔼夜,不影響其他Channel的處理
  List<Channel> optionalChannels = selector.getOptionalChannels(event);
  for (Channel optChannel : optionalChannels) {
    execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
  }
}

另外內(nèi)部還提供了批處理實現(xiàn)方法 processEventBatch 匠题;對于內(nèi)部事務(wù)實現(xiàn)的話可以參考 executeChannelTransaction 方法钱磅,整體事務(wù)機制類似于 JDBC :

private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
  //1、獲取Channel上的事務(wù)
  Transaction tx = channel.getTransaction();
  Preconditions.checkNotNull(tx, "Transaction object must not be null");
  try {
    //2、開啟事務(wù)
    tx.begin();
    //3、在Channel上執(zhí)行批量put操作
    for (Event event : batch) {
      channel.put(event);
    }
    //4、成功后提交事務(wù)
    tx.commit();
  } catch (Throwable t) {
    //5、異常后回滾事務(wù)
    tx.rollback();
    if (t instanceof Error) {
       LOG.error("Error while writing to channel: " +
           channel, t);
       throw (Error) t;
    } else if(!isOptional) {//如果是可選的Channel工坊,異常忽略
       throw new ChannelException("Unable to put batch on required " +
             "channel: " + channel, t);
    }
  } finally {
    //最后關(guān)閉事務(wù)
    tx.close();
  }
}

Interceptor 用于過濾 Event 昭齐,即傳入一個 Event 然后進行過濾加工司浪,然后返回一個新的 Event 饮睬,接口如下:

public interface Interceptor {
    public void initialize();
    public Event intercept(Event event);
    public List<Event> intercept(List<Event> events);
    public void close();
}

可以看到其提供了 initialize 和 close 方法用于啟動和關(guān)閉割去; intercept 方法用于過濾或加工 Event 咖城。比如 HostInterceptor 攔截器用于獲取本機 IP 然后默認添加到 Event的字段為 host 的 Header 中辐董。
接下來就是 ChannelSelector 選擇器了苔严,其通過如下方式創(chuàng)建:

//獲取ChannelSelector配置悼沈,比如agent.sources.s1.selector.type = replicating
ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
//使用Source關(guān)聯(lián)的Channel創(chuàng)建絮供,比如agent.sources.s1.channels = c1 c2
ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);

ChannelSelector 默認提供了兩種實現(xiàn):復(fù)制和多路復(fù)用:
默認實現(xiàn)是復(fù)制選擇器 ReplicatingChannelSelector ,即把接收到的消息復(fù)制到每一個 Channel ;多路復(fù)用選擇器 MultiplexingChannelSelector 會根據(jù) Event Header中的參數(shù)進行選擇,以此來選擇使用哪個 Channel 晴圾。
而 Channel 是 Event 中轉(zhuǎn)的地方撒蟀, Source 發(fā)布 Event 到 Channel , Sink 消費 Channel 的 Event ; Channel 接口提供了如下接口用來實現(xiàn) Event 流轉(zhuǎn):

public interface Channel extends LifecycleAware, NamedComponent {
  public void put(Event event) throws ChannelException;
  public Event take() throws ChannelException;
  public Transaction getTransaction();
}

put 用于發(fā)布 Event 驻右, take 用于消費 Event 森爽, getTransaction 用于事務(wù)支持雕旨。默認提供了如下 Channel 的實現(xiàn):
對于 Channel 的實現(xiàn)我們后續(xù)單獨章節(jié)介紹。

3 可款、 Sink

Sink 從 Channel 消費 Event ,然后進行轉(zhuǎn)移到收集 / 聚合層或存儲層。 Sink 接口如下所示:

public interface Sink extends LifecycleAware, NamedComponent {
  public void setChannel(Channel channel);
  public Channel getChannel();
  public Status process() throws EventDeliveryException;
  public static enum Status {
    READY, BACKOFF
  }
}

類似于 Source 栈顷,其首先繼承了 LifecycleAware ,然后提供了 Channel 的 getter/setter 方法,并提供了 process 方法進行消費行嗤,此方法會返回消費的狀態(tài), READY 或 BACKOFF 。
Sink 也是通過 SinkFactory 工廠來創(chuàng)建鸵闪,其也提供了 DefaultSinkFactory 默認工廠,比如傳入 hdfs 苛让,會先查找 Enum org.apache.flume.conf.sink.SinkType ,然后找到相應(yīng)的默認處理類 org.apache.flume.sink.hdfs.HDFSEventSink 憔儿,如果沒找到默認處理類,直接通過 Class.forName(className) 進行反射創(chuàng)建叠艳。
我們知道 Sink 還提供了分組功能潦俺,用于把多個 Sink 聚合為一組進行使用劝堪,內(nèi)部提供了 SinkGroup 用來完成這個事情余境。此時問題來了绣张,如何去調(diào)度多個 Sink ,其內(nèi)部使用了 SinkProcessor 來完成這個事情,默認提供了故障轉(zhuǎn)移和負載均衡兩個策略。
首先 SinkGroup 就是聚合多個 Sink 為一組,然后將多個 Sink 傳給 SinkProcessorFactory 進行創(chuàng)建 SinkProcessor 狈惫,而策略是根據(jù)配置文件中配置的如 agent.sinkgroups.g1.processor.type = load_balance 來選擇的难菌。
SinkProcessor 提供了如下實現(xiàn):

DefaultSinkProcessor :默認實現(xiàn)舵匾,用于單個 Sink 的場景使用砍濒。
FailoverSinkProcessor :故障轉(zhuǎn)移實現(xiàn):

public Status process() throws EventDeliveryException {
  Long now = System.currentTimeMillis();
        //1绪囱、首先檢查失敗隊列的頭部的Sink是否已經(jīng)過了失敗補償?shù)却龝r間了
  while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
    //2琉挖、如果可以使用了羊始,則從失敗Sink隊列獲取隊列第一個Sink
    FailedSink cur = failedSinks.poll();
    Status s;
    try {
      s = cur.getSink().process(); //3、使用此Sink進行處理
      if (s  == Status.READY) { //4查描、如果處理成功
        liveSinks.put(cur.getPriority(), cur.getSink()); //4.1突委、放回存活Sink隊列
        activeSink = liveSinks.get(liveSinks.lastKey());
      } else {
        failedSinks.add(cur); //4.2、如果此時不是READY冬三,即BACKOFF期間匀油,再次放回失敗隊列
      }
      return s;
    } catch (Exception e) {
      cur.incFails(); //5、如果遇到異常了勾笆,則增加失敗次數(shù)敌蚜,并放回失敗隊列
      failedSinks.add(cur);
    }
  }
 
  Status ret = null;
  while(activeSink != null) { //6、此時失敗隊列中沒有Sink能處理了匠襟,那么需要使用存活Sink隊列進行處理
    try {
      ret = activeSink.process();
      return ret;
    } catch (Exception e) { //7钝侠、處理失敗進行轉(zhuǎn)移到失敗隊列
      activeSink = moveActiveToDeadAndGetNext();
    }
  }
 
  throw new EventDeliveryException("All sinks failed to process, " +
      "nothing left to failover to");
}

失敗隊列是一個優(yōu)先級隊列,使用 refresh 屬性排序酸舍,而 refresh 是通過如下機制計算的:

refresh = System.currentTimeMillis()
        + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);

其中 maxPenalty 是最大等待時間帅韧,默認 30s ,而 (1 << sequentialFailures) * FAILURE_PENALTY) 用于實現(xiàn)指數(shù)級等待時間遞增啃勉, FAILURE_PENALTY 是 1s 忽舟。
LoadBalanceSinkProcessor :用于實現(xiàn) Sink 的負載均衡,其通過 SinkSelector 進行實現(xiàn),類似于 ChannelSelector 叮阅。 LoadBalanceSinkProcessor 在啟動時會根據(jù)配置刁品,如 agent.sinkgroups.g1.processor.selector = random 進行選擇,默認提供了兩種選擇器:

LoadBalanceSinkProcessor 使用如下機制進行負載均衡:

public Status process() throws EventDeliveryException {
  Status status = null;
  //1浩姥、使用選擇器創(chuàng)建相應(yīng)的迭代器挑随,也就是用來選擇Sink的迭代器
  Iterator<Sink> sinkIterator = selector.createSinkIterator();
  while (sinkIterator.hasNext()) {
    Sink sink = sinkIterator.next();
    try {
      //2、選擇器迭代Sink進行處理,如果成功直接break掉這次處理,此次負載均衡就算完成了
      status = sink.process();
      break;
    } catch (Exception ex) {
      //3握牧、失敗后會通知選擇器饼记,采取相應(yīng)的失敗退避補償算法進行處理
      selector.informSinkFailed(sink);
      LOGGER.warn("Sink failed to consume event. "
          + "Attempting next sink if available.", ex);
    }
  }
  if (status == null) {
    throw new EventDeliveryException("All configured sinks have failed");
  }
  return status;
}

如上的核心就是怎么創(chuàng)建迭代器颜价,如何進行失敗退避補償處理,首先我們看下 RoundRobinSinkSelector 實現(xiàn),其內(nèi)部是通過通用的 RoundRobinOrderSelector 選擇器實現(xiàn):

public Iterator<T> createIterator() {
  //1、獲取存活的Sink索引噪舀,
  List<Integer> activeIndices = getIndexList();
  int size = activeIndices.size();
  //2、如果上次記錄的下一個存活Sink的位置超過了size飘诗,那么從隊列頭重新開始計數(shù)
  if (nextHead >= size) {
    nextHead = 0;
  }
  //3与倡、獲取本次使用的起始位置
  int begin = nextHead++;
  if (nextHead == activeIndices.size()) {
    nextHead = 0;
  }
  //4、從該位置開始迭代疚察,其實現(xiàn)類似于環(huán)形隊列蒸走,比如整個隊列是5,起始位置是3貌嫡,則按照 3、4该溯、0岛抄、1、2的順序進行輪詢狈茉,實現(xiàn)了輪詢算法 
  int[] indexOrder = new int[size];
  for (int i = 0; i < size; i++) {
    indexOrder[i] = activeIndices.get((begin + i) % size);
  }
  //indexOrder是迭代順序夫椭,getObjects返回相關(guān)的Sinks;
  return new SpecificOrderIterator<T>(indexOrder, getObjects());
}

getIndexList 實現(xiàn)如下:

protected List<Integer> getIndexList() {
  long now = System.currentTimeMillis();
  List<Integer> indexList = new ArrayList<Integer>();
  int i = 0;
  for (T obj : stateMap.keySet()) {
    if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
      indexList.add(i);
    }
    i++;
  }
  return indexList;
}

isShouldBackOff() 表示是否開啟退避算法支持氯庆,如果不開啟蹭秋,則認為每個 Sink 都是存活的,每次都會重試堤撵,通過 agent.sinkgroups.g1.processor.backoff = true 配置開啟仁讨,默認 false ; restoreTime 和之前介紹的 refresh 一樣实昨,是退避補償?shù)却龝r間洞豁,算法類似,就不多介紹了。
那么什么時候調(diào)用 Sink 進行消費呢丈挟?其類似于 SourceRunner 刁卜, Sink 提供了 SinkRunner 進行輪詢拉取處理, SinkRunner 會輪詢調(diào)度 SinkProcessor 消費 Channel 的消息曙咽,然后調(diào)用 Sink 進行轉(zhuǎn)移蛔趴。 SinkProcessor 之前介紹過,其負責(zé)消息復(fù)制 / 路由例朱。
SinkRunner 實現(xiàn)如下:

public void start() {
  SinkProcessor policy = getPolicy();
  policy.start();
  runner = new PollingRunner();
  runner.policy = policy;
  runner.counterGroup = counterGroup;
  runner.shouldStop = new AtomicBoolean();
  runnerThread = new Thread(runner);
  runnerThread.setName("SinkRunner-PollingRunner-" +
      policy.getClass().getSimpleName());
  runnerThread.start();
  lifecycleState = LifecycleState.START;
}

即獲取 SinkProcessor 然后啟動它夺脾,接著啟動輪詢線程去處理。 PollingRunner 線程負責(zé)輪詢消息茉继,核心實現(xiàn)如下:

public void run() {
  while (!shouldStop.get()) { //如果沒有停止
    try {
      if (policy.process().equals(Sink.Status.BACKOFF)) {//如果處理失敗了咧叭,進行退避補償處理
        counterGroup.incrementAndGet("runner.backoffs");
        Thread.sleep(Math.min(
            counterGroup.incrementAndGet("runner.backoffs.consecutive")
            * backoffSleepIncrement, maxBackoffSleep)); //暫停退避補償設(shè)定的超時時間
      } else {
        counterGroup.set("runner.backoffs.consecutive", 0L);
      }
    } catch (Exception e) {
      try {
        Thread.sleep(maxBackoffSleep); //如果遇到異常則等待最大退避時間
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

整體實現(xiàn)類似于 PollableSourceRunner 實現(xiàn),整體處理都是交給 SinkProcessor 完成的烁竭。 SinkProcessor 會輪詢 Sink 的 process 方法進行處理菲茬;此處以 LoggerSink 為例:

@Override
public Status process() throws EventDeliveryException {
  Status result = Status.READY;
  Channel channel = getChannel();
  //1、獲取事務(wù)
  Transaction transaction = channel.getTransaction();
  Event event = null;
 
  try {
    //2派撕、開啟事務(wù)
    transaction.begin();
    //3婉弹、從Channel獲取Event
    event = channel.take();
    if (event != null) {
      if (logger.isInfoEnabled()) {
        logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
      }
    } else {//4、如果Channel中沒有Event终吼,則默認進入故障補償機制镀赌,即防止死循環(huán)造成CPU負載高
      result = Status.BACKOFF;
    }
    //5、成功后提交事務(wù)
    transaction.commit();
  } catch (Exception ex) {
    //6际跪、失敗后回滾事務(wù)
    transaction.rollback();
    throw new EventDeliveryException("Failed to log event: " + event, ex);
  } finally {
    //7商佛、關(guān)閉事務(wù)
    transaction.close();
  }
  return result;
}

Sink 中一些實現(xiàn)是支持批處理的,比如 RollingFileSink :

//1姆打、開啟事務(wù)
//2良姆、批處理
for (int i = 0; i < batchSize; i++) {
  event = channel.take();
  if (event != null) {
    sinkCounter.incrementEventDrainAttemptCount();
    eventAttemptCounter++;
    serializer.write(event);
  }
}
//3、提交/回滾事務(wù)幔戏、關(guān)閉事務(wù)

定義一個批處理大小然后在事務(wù)中執(zhí)行批處理玛追。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市闲延,隨后出現(xiàn)的幾起案子痊剖,更是在濱河造成了極大的恐慌,老刑警劉巖垒玲,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件陆馁,死亡現(xiàn)場離奇詭異,居然都是意外死亡侍匙,警方通過查閱死者的電腦和手機氮惯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門叮雳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人妇汗,你說我怎么就攤上這事帘不。” “怎么了杨箭?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵寞焙,是天一觀的道長。 經(jīng)常有香客問我互婿,道長捣郊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任慈参,我火速辦了婚禮呛牲,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘驮配。我一直安慰自己娘扩,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布壮锻。 她就那樣靜靜地躺著琐旁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪猜绣。 梳的紋絲不亂的頭發(fā)上灰殴,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天,我揣著相機與錄音掰邢,去河邊找鬼牺陶。 笑死,一個胖子當(dāng)著我的面吹牛尸变,可吹牛的內(nèi)容都是我干的义图。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼召烂,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了娃承?” 一聲冷哼從身側(cè)響起奏夫,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎历筝,沒想到半個月后酗昼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡梳猪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年麻削,在試婚紗的時候發(fā)現(xiàn)自己被綠了蒸痹。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡呛哟,死狀恐怖叠荠,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情扫责,我是刑警寧澤榛鼎,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站鳖孤,受9級特大地震影響者娱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜苏揣,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一黄鳍、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧平匈,春花似錦框沟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至弟跑,卻和暖如春灾前,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背孟辑。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工哎甲, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人饲嗽。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓炭玫,卻偏偏與公主長得像,于是被迫代替她去往敵國和親貌虾。 傳聞我的和親對象是個殘疾皇子吞加,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容