flumenote3

源將事件寫到一個多或者多個通道中历筝。
接收器只從一個通道接收事件掖蛤。
代理可能會有多個源毡鉴、通道與接收器石挂。
flume-ng-channels
里面包含了filechannel博助,jdbcchannel,kafkachannel,memorychannel通道的實現(xiàn)痹愚。
flume-ng-clients
實現(xiàn)了log4j相關(guān)的幾個Appender富岳,使得log4j的日志輸出可以直接發(fā)送給flume-agent;其中有一個LoadBalancingLog4jAppender的實現(xiàn)拯腮,提供了多個flume-agent的load balance和ha功能城瞎,采用flume作為日志收集的可以考慮將這個appender引入內(nèi)部的log4j中。
flume-ng-configuration
這個主要就是Flume配置信息相關(guān)的類疾瓮,包括載入flume-config.properties配置文件并解析脖镀。其中包括了Source的配置,Sink的配置狼电,Channel的配置蜒灰,在閱讀源碼前推薦先梳理這部分關(guān)系再看其他部分的。
flume-ng-core
flume整個核心框架肩碟,包括了各個模塊的接口以及邏輯關(guān)系實現(xiàn)强窖。其中instrumentation是flume內(nèi)部實現(xiàn)的一套metric機(jī)制,metric的變化和維護(hù)削祈,其核心也就是在MonitoredCounterGroup中通過一個Map來實現(xiàn)metric的計量翅溺。ng-core下幾乎大部分代碼任然幾種在channel脑漫、sink、source幾個子目錄下咙崎,其他目錄基本完成一個util和輔助的功能优幸。
flume-ng-node
實現(xiàn)啟動flume的一些基本類,包括main函數(shù)的入口(Application.java中)褪猛。在理解configuration之后网杆,從application的main函數(shù)入手,可以較快的了解整個flume的代碼伊滋。

flume-ng啟動文件介紹

################################
# constants
################################

#設(shè)置常量值碳却,主要是針對不同的參數(shù)執(zhí)行相應(yīng)的類,以啟動Flume環(huán)境
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"

#真正啟動Flume環(huán)境的方法
run_flume() {
  local FLUME_APPLICATION_CLASS

  if [ "$#" -gt 0 ]; then
    FLUME_APPLICATION_CLASS=$1
    shift
  else
    error "Must specify flume application class" 1
  fi

  if [ ${CLEAN_FLAG} -ne 0 ]; then
    set -x
  fi

  #執(zhí)行這一行命令笑旺,執(zhí)行相應(yīng)的啟動類昼浦,比如org.apache.flume.node.Application
  $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
      -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}


################################
# main
################################

# set default params
# 在啟動的過程中使用到的參數(shù)
FLUME_CLASSPATH=""
FLUME_JAVA_LIBRARY_PATH=""
#默認(rèn)占用堆空間大小,這一塊都可以根據(jù)JVM進(jìn)行重新設(shè)置
JAVA_OPTS="-Xmx20m"
LD_LIBRARY_PATH=""

opt_conf=""
opt_classpath=""
opt_plugins_dirs=""
arr_java_props=()
arr_java_props_ct=0
opt_dryrun=""

# 根據(jù)不同的參數(shù)筒主,執(zhí)行不同的啟動類关噪,每個常量所對應(yīng)的類路徑在代碼前面有過介紹。
if [ -n "$opt_agent" ] ; then
  run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
  run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
  run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
  run_flume $FLUME_TOOLS_CLASS $args
else
  error "This message should never appear" 1
fi

主要是run-flume語句以及指定的啟動類
從bin/flume-ng這個shell腳本可以看到Flume的起始于org.apache.flume.node.Application類物舒,這是flume的main函數(shù)所在色洞。main方法首先會先解析shell命令戏锹,如果指定的配置文件不存在就拋出異常冠胯。
main方法首先校驗shell命令行的代碼,解析
./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console
語句锦针,然后加載配置文件荠察。
根據(jù)命令中含有"no-reload-conf"參數(shù),決定采用那種加載配置文件方式:
一奈搜、沒有此參數(shù)悉盆,會動態(tài)加載配置文件,默認(rèn)每30秒加載一次配置文件馋吗,因此可以動態(tài)修改配置文件焕盟;
二、有此參數(shù)宏粤,則只在啟動時加載一次配置文件脚翘。實現(xiàn)動態(tài)加載功能采用了發(fā)布訂閱模式,使用guava中的EventBus實現(xiàn)绍哎。
三来农、PropertiesFileConfigurationProvider這個類是配置文件加載類,PollingPropertiesFileConfigurationProvider類中崇堰,它實現(xiàn)了LifecycleAware接口沃于,而這個接口是掌管整個Flume生命周期的一個核心接口涩咖,LifecycleSupervisor實現(xiàn)了這個接口,通過上面代碼中application.start方法觸發(fā)LifecyleAware的start方法繁莹,LifecycleAware接口定義了start和stop方法檩互。
時序調(diào)用圖:


PollingPropertiesFileConfigurationProvider.start()方法會啟動一個單線程FileWatcherRunnable每隔30s去加載一次配置文件。
如何解析文件蒋困?
1盾似、用agent.sources.s1.command=s1來舉例:變量prefix指的是:sink,source,channel等關(guān)鍵字。
2雪标、parseConfigKey方法零院,首先根據(jù)prefix判斷prefix的后面,有少多字符村刨。比如:sources.s1.command告抄,在sources后面s1.command一共有10個字符。
3嵌牺、解析出name變量打洼,如s1,這個是自己定義的逆粹。
4募疮、解析出configKey固定關(guān)鍵字,如command僻弹,這個是系統(tǒng)定義的阿浓。
5、封裝new ComponentNameAndConfigKey(name, configKey)返回蹋绽。
6芭毙、將sources、channel卸耘、sink配置信息退敦,分別存放到sourceContextMap、channelConfigMap蚣抗、sinkConfigMap三個HashMap侈百,最后統(tǒng)一封裝到AgentConfiguration對象中,然后再把AgentConfiguration存放到agentConfigMap中翰铡,key是agentName钝域。

圖片.png

source - channelProcessor - channel - sink -sinkgroup

Source  {
    ChannelProcessor  {
             Channel  ch1
             Channel  ch2
             …
    }
} 
Sink  {
   Channel  ch; 
} 
SinkGroup {
   Channel ch;
   Sink s1两蟀;
   Sink s2网梢;
   …
}

1、Source組件
Source是數(shù)據(jù)源的總稱赂毯,設(shè)定好源后战虏,數(shù)據(jù)將源源不斷的被抓取或者被推送拣宰。常見的數(shù)據(jù)源有:ExecSource,KafkaSource烦感,HttpSource巡社,NetcatSource,JmsSource手趣,AvroSource等等晌该。所有的數(shù)據(jù)源統(tǒng)一實現(xiàn)一個接口類如下:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source extends LifecycleAware, NamedComponent {

  /**
   * Specifies which channel processor will handle this source's events.
   *
   * @param channelProcessor
   */
  public void setChannelProcessor(ChannelProcessor channelProcessor);

  /**
   * Returns the channel processor that will handle this source's events.
   */
  public ChannelProcessor getChannelProcessor();

}

Source提供了兩種機(jī)制: PollableSource(輪詢拉取)和EventDrivenSource(事件驅(qū)動):


上圖展示的Source繼承關(guān)系類圖绿渣。通過類圖我們可以看到NetcatSource朝群,ExecSource和HttpSource屬于事件驅(qū)動模型。KafkaSource中符,SequenceGeneratorSource和JmsSource屬于輪詢拉取模型姜胖。Source接口繼承了LifecycleAware接口,它的的所有邏輯的實現(xiàn)在接口的start和stop方法中進(jìn)行淀散。下圖是類關(guān)系方法圖:

Source接口定義的是最終的實現(xiàn)過程右莱,如抓取日志,這個抓取的過程和實際操作就是在對應(yīng)的Source實現(xiàn)中档插,比如:ExecSource慢蜓。那么這些Source實現(xiàn)由誰來驅(qū)動的呢?現(xiàn)在我們將介紹SourceRunner類郭膛〕柯眨看一下類繼承結(jié)構(gòu)圖:


我們看一下PollableSourceRunner和EventDrivenSourceRunner的具體實現(xiàn):

  @Override //PollableSourceRunner:
  public void start() {
    PollableSource source = (PollableSource) getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();

    runner = new PollingRunner();

    runner.source = source;
    runner.counterGroup = counterGroup;
    runner.shouldStop = shouldStop;

    runnerThread = new Thread(runner);
    runnerThread.setName(getClass().getSimpleName() + "-" + 
        source.getClass().getSimpleName() + "-" + source.getName());
    runnerThread.start();

    lifecycleState = LifecycleState.START;
  }

  @Override //EventDrivenSourceRunner
  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();
    lifecycleState = LifecycleState.START;
  }

注:其實所有的Source實現(xiàn)類內(nèi)部都維護(hù)著線程,執(zhí)行source.start()其實就是啟動了相應(yīng)的線程饲鄙。
2凄诞、Channel組件
Channel用于連接Source和Sink圆雁,Source將日志信息發(fā)送到Channel忍级,Sink從Channel消費日志信息;Channel是中轉(zhuǎn)日志信息的一個臨時存儲伪朽,保存有Source組件傳遞過來的日志信息轴咱。先看代碼如下:AbstractConfigurationProvider的loadSources()

          ChannelSelectorConfiguration selectorConfig =
              config.getSelectorConfiguration();

          ChannelSelector selector = ChannelSelectorFactory.create(
              sourceChannels, selectorConfig);

          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
          Configurables.configure(channelProcessor, config);

          source.setChannelProcessor(channelProcessor);

ChannelSelectorFactory.create方法實現(xiàn)如下:

  public static ChannelSelector create(List<Channel> channels,
      ChannelSelectorConfiguration conf) {
    String type = ChannelSelectorType.REPLICATING.toString();
    if (conf != null) {
      type = conf.getType();
    }
    ChannelSelector selector = getSelectorForType(type);
    selector.setChannels(channels);
    Configurables.configure(selector, conf);
    return selector;
  }

其中我們看一下ChannelSelectorType這個枚舉類,包括了幾種類型:

public enum ChannelSelectorType {

  /**
   * Place holder for custom channel selectors not part of this enumeration.
   */
  OTHER(null),

  /**
   * Replicating channel selector.
   */
  REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),

  /**
   * Multiplexing channel selector.
   */
  MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");

ChannelSelector的類結(jié)構(gòu)圖如下所示:


注:RelicatingChannelSelector和MultiplexingChannelSelector是二個通道選擇器烈涮,第一個是復(fù)用型通道選擇器朴肺,也就是的默認(rèn)的方式,會把接收到的消息發(fā)送給其他每個channel坚洽。第二個是多路通道選擇器戈稿,這個會根據(jù)消息header中的參數(shù)進(jìn)行通道選擇。
Channel是什么讶舰?:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Channel extends LifecycleAware, NamedComponent {

  /**
   * <p>Puts the given event into the channel.</p>
   * <p><strong>Note</strong>: This method must be invoked within an active
   * {@link Transaction} boundary. Failure to do so can lead to unpredictable
   * results.</p>
   * @param event the event to transport.
   * @throws ChannelException in case this operation fails.
   * @see org.apache.flume.Transaction#begin()
   */
  public void put(Event event) throws ChannelException;

  /**
   * <p>Returns the next event from the channel if available. If the channel
   * does not have any events available, this method must return {@code null}.
   * </p>
   * <p><strong>Note</strong>: This method must be invoked within an active
   * {@link Transaction} boundary. Failure to do so can lead to unpredictable
   * results.</p>
   * @return the next available event or {@code null} if no events are
   * available.
   * @throws ChannelException in case this operation fails.
   * @see org.apache.flume.Transaction#begin()
   */
  public Event take() throws ChannelException;

  /**
   * @return the transaction instance associated with this channel.
   */
  public Transaction getTransaction();
}

注:put方法是用來發(fā)送消息鞍盗,take方法是獲取消息需了,transaction是用于事務(wù)操作。類結(jié)構(gòu)圖如下:

3般甲、Sink組件
Sink負(fù)責(zé)取出Channel中的消息數(shù)據(jù)肋乍,進(jìn)行相應(yīng)的存儲文件系統(tǒng),數(shù)據(jù)庫敷存,或者提交到遠(yuǎn)程服務(wù)器墓造。Sink在設(shè)置存儲數(shù)據(jù)時,可以向文件系統(tǒng)中锚烦,數(shù)據(jù)庫中觅闽,hadoop中儲數(shù)據(jù),在日志數(shù)據(jù)較少時涮俄,可以將數(shù)據(jù)存儲在文件系中谱煤,并且設(shè)定一定的時間間隔保存數(shù)據(jù)。在日志數(shù)據(jù)較多時禽拔,可以將相應(yīng)的日志數(shù)據(jù)存儲到Hadoop中刘离,便于日后進(jìn)行相應(yīng)的數(shù)據(jù)分析。
Sink接口類內(nèi)容如下:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink extends LifecycleAware, NamedComponent {
  /**
   * <p>Sets the channel the sink will consume from</p>
   * @param channel The channel to be polled
   */
  public void setChannel(Channel channel);

  /**
   * @return the channel associated with this sink
   */
  public Channel getChannel();

  /**
   * <p>Requests the sink to attempt to consume data from attached channel</p>
   * <p><strong>Note</strong>: This method should be consuming from the channel
   * within the bounds of a Transaction. On successful delivery, the transaction
   * should be committed, and on failure it should be rolled back.
   * @return READY if 1 or more Events were successfully delivered, BACKOFF if
   * no data could be retrieved from the channel feeding this sink
   * @throws EventDeliveryException In case of any kind of failure to
   * deliver data to the next hop destination.
   */
  public Status process() throws EventDeliveryException;

  public static enum Status {
    READY, BACKOFF
  }
}

Sink是通過如下代碼進(jìn)行的創(chuàng)建:
Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());

DefaultSinkFactory.create方法如下:

  @Override
  public Sink create(String name, String type) throws FlumeException {
    Preconditions.checkNotNull(name, "name");
    Preconditions.checkNotNull(type, "type");
    logger.info("Creating instance of sink: {}, type: {}", name, type);
    Class<? extends Sink> sinkClass = getClass(type);
    try {
      Sink sink = sinkClass.newInstance();
      sink.setName(name);
      return sink;
    } catch (Exception ex) {
      throw new FlumeException("Unable to create sink: " + name
          + ", type: " + type + ", class: " + sinkClass.getName(), ex);
    }
  }

**注:Sink是通過SinkFactory工廠來創(chuàng)建睹栖,提供了DefaultSinkFactory默認(rèn)工廠硫惕,程序會查找org.apache.flume.conf.sink.SinkType這個枚舉類找到相應(yīng)的Sink處理類,比如:org.apache.flume.sink.LoggerSink野来,如果沒找到對應(yīng)的處理類恼除,直接通過Class.forName(className)進(jìn)行直接查找實例化實現(xiàn)類。 **
Sink的類結(jié)構(gòu)圖如下:


與ChannelProcessor處理類對應(yīng)的是SinkProcessor曼氛,由SinkProcessorFactory工廠類負(fù)責(zé)創(chuàng)建豁辉,SinkProcessor的類型由一個枚舉類提供,看下面代碼:

public enum SinkProcessorType {
  /**
   * Place holder for custom sinks not part of this enumeration.
   */
  OTHER(null),

  /**
   * Failover processor
   *
   * @see org.apache.flume.sink.FailoverSinkProcessor
   */
  FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"),

  /**
   * Standard processor
   *
   * @see org.apache.flume.sink.DefaultSinkProcessor
   */
  DEFAULT("org.apache.flume.sink.DefaultSinkProcessor"),


  /**
   * Load balancing processor
   *
   * @see org.apache.flume.sink.LoadBalancingSinkProcessor
   */
  LOAD_BALANCE("org.apache.flume.sink.LoadBalancingSinkProcessor");

SinkProcessor的類結(jié)構(gòu)圖如下:


說明:1舀患、FailoverSinkProcessor是故障轉(zhuǎn)移處理器徽级,當(dāng)sink從通道拿數(shù)據(jù)信息時出錯進(jìn)行的相關(guān)處理,代碼如下:

 @Override
  public Status process() throws EventDeliveryException {
    // Retry any failed sinks that have gone through their "cooldown" period
    Long now = System.currentTimeMillis();
    while (!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
      FailedSink cur = failedSinks.poll();
      Status s;
      try {
        s = cur.getSink().process();
        if (s  == Status.READY) {
          liveSinks.put(cur.getPriority(), cur.getSink());
          activeSink = liveSinks.get(liveSinks.lastKey());
          logger.debug("Sink {} was recovered from the fail list",
                  cur.getSink().getName());
        } else {
          // if it's a backoff it needn't be penalized.
          failedSinks.add(cur);
        }
        return s;
      } catch (Exception e) {
        cur.incFails();
        failedSinks.add(cur);
      }
    }

    Status ret = null;
    while (activeSink != null) {
      try {
        ret = activeSink.process();
        return ret;
      } catch (Exception e) {
        logger.warn("Sink {} failed and has been sent to failover list",
                activeSink.getName(), e);
        activeSink = moveActiveToDeadAndGetNext();
      }
    }

    throw new EventDeliveryException("All sinks failed to process, " +
        "nothing left to failover to");
  }

2聊浅、LoadBalancingSinkProcessor是負(fù)載Sink處理器首先我們和ChannelProcessor一樣餐抢,我們也要重點說明一下SinkSelector這個選擇器。先看一下LoadBalancingSinkProcessor extends AbstractSinkProcessor的configure方法的部分代碼:

  public interface SinkSelector extends Configurable, LifecycleAware {
    void setSinks(List<Sink> sinks);
    Iterator<Sink> createSinkIterator();
    void informSinkFailed(Sink failedSink);
  }

    if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
      selector = new RoundRobinSinkSelector(shouldBackOff);
    } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
      selector = new RandomOrderSinkSelector(shouldBackOff);
    } else {
      try {
        @SuppressWarnings("unchecked")
        Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
            Class.forName(selectorTypeName);
        selector = klass.newInstance();
      } catch (Exception ex) {
        throw new FlumeException("Unable to instantiate sink selector: "
            + selectorTypeName, ex);
      }
    }

結(jié)合上面的代碼低匙,再看類結(jié)構(gòu)圖如下:


注:RoundRobinSinkSelector是輪詢選擇器旷痕,RandomOrderSinkSelector是隨機(jī)分配選擇器。
以KafkaSink為例看一下Sink里面的具體實現(xiàn):

public Status process() throws EventDeliveryException {
    Status result = Status.READY;
    Channel channel = getChannel();
    Transaction transaction = null;
    Event event = null;
    String eventTopic = null;
    String eventKey = null;

    try {
      long processedEvents = 0;

      transaction = channel.getTransaction();
      transaction.begin();

      messageList.clear();
      for (; processedEvents < batchSize; processedEvents += 1) {
        event = channel.take();

        if (event == null) {
          // no events available in channel
          break;
        }

        byte[] eventBody = event.getBody();
        Map<String, String> headers = event.getHeaders();

        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
          eventTopic = topic;
        }

        eventKey = headers.get(KEY_HDR);

        if (logger.isDebugEnabled()) {
          logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
            + new String(eventBody, "UTF-8"));
          logger.debug("event #{}", processedEvents);
        }

        // create a message and add to buffer
        KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
          (eventTopic, eventKey, eventBody);
        messageList.add(data);

      }

      // publish batch and commit.
      if (processedEvents > 0) {
        long startTime = System.nanoTime();
        producer.send(messageList);
        long endTime = System.nanoTime();
        counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
        counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
      }

      transaction.commit();

    } catch (Exception ex) {
      String errorMsg = "Failed to publish events";
      logger.error("Failed to publish events", ex);
      result = Status.BACKOFF;
      if (transaction != null) {
        try {
          transaction.rollback();
          counter.incrementRollbackCount();
        } catch (Exception e) {
          logger.error("Transaction rollback failed", e);
          throw Throwables.propagate(e);
        }
      }
      throw new EventDeliveryException(errorMsg, ex);
    } finally {
      if (transaction != null) {
        transaction.close();
      }
    }

    return result;
  }

注:方法從channel中不斷的獲取數(shù)據(jù)顽冶,然后通過Kafka的producer生產(chǎn)者將消息發(fā)送到Kafka里面

主要參考:http://www.reibang.com/p/0187459831af

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末欺抗,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子强重,更是在濱河造成了極大的恐慌绞呈,老刑警劉巖团滥,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異报强,居然都是意外死亡灸姊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門秉溉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來力惯,“玉大人,你說我怎么就攤上這事召嘶「妇В” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵弄跌,是天一觀的道長甲喝。 經(jīng)常有香客問我,道長铛只,這世上最難降的妖魔是什么埠胖? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮淳玩,結(jié)果婚禮上直撤,老公的妹妹穿的比我還像新娘。我一直安慰自己蜕着,他們只是感情好谋竖,可當(dāng)我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著承匣,像睡著了一般蓖乘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上韧骗,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天嘉抒,我揣著相機(jī)與錄音,去河邊找鬼宽闲。 笑死众眨,一個胖子當(dāng)著我的面吹牛握牧,可吹牛的內(nèi)容都是我干的容诬。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼沿腰,長吁一口氣:“原來是場噩夢啊……” “哼览徒!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起颂龙,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤习蓬,失蹤者是張志新(化名)和其女友劉穎纽什,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體躲叼,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡芦缰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了枫慷。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片让蕾。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖或听,靈堂內(nèi)的尸體忽然破棺而出探孝,到底是詐尸還是另有隱情,我是刑警寧澤誉裆,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布顿颅,位于F島的核電站,受9級特大地震影響足丢,放射性物質(zhì)發(fā)生泄漏粱腻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一斩跌、第九天 我趴在偏房一處隱蔽的房頂上張望栖疑。 院中可真熱鬧,春花似錦滔驶、人聲如沸遇革。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽萝快。三九已至,卻和暖如春著角,著一層夾襖步出監(jiān)牢的瞬間揪漩,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工吏口, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留奄容,地道東北人。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓产徊,卻偏偏與公主長得像昂勒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子舟铜,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 博客原文 翻譯作品戈盈,水平有限,如有錯誤,煩請留言指正塘娶。原文請見 官網(wǎng)英文文檔 引言 概述 Apache Flume...
    rabbitGYK閱讀 11,449評論 13 34
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理归斤,服務(wù)發(fā)現(xiàn),斷路器刁岸,智...
    卡卡羅2017閱讀 134,600評論 18 139
  • 介紹 概述 Apache Flume是為有效收集聚合和移動大量來自不同源到中心數(shù)據(jù)存儲而設(shè)計的可分布脏里,可靠的,可用...
    ximengchj閱讀 3,516評論 0 13
  • 介紹 概述 Apache Flume是一個分布式的虹曙,可靠的膝宁,高可用的系統(tǒng),用于高效地從多個不同的數(shù)據(jù)源收集根吁,匯總及...
    steanxy閱讀 1,052評論 0 1
  • //聊聊Flume和Logstash的那些事兒 - 簡書http://www.reibang.com/p/349e...
    葡萄喃喃囈語閱讀 5,377評論 1 12