Flink 生產(chǎn)實(shí)踐

Flink HA

Flink HA 的HighAvailabilityMode類中定義了是那種高可用性模式枚舉:

  • NONE:非HA模式
  • ZOOKEEPER:基于ZK實(shí)現(xiàn)HA
  • FACTORY_CLASS:自定義HA工廠類,實(shí)現(xiàn)HighAvailabilityServiceFactory接口。

ZooKeeperHaService主要提供了創(chuàng)建LeaderRetrievalService和LeaderElectionService等方法,并給出了各個(gè)服務(wù)組件使用的ZK節(jié)點(diǎn)名稱悲柱。

Flink Exactly-once實(shí)現(xiàn)原理解析

流處理引擎通常為用戶的應(yīng)用程序提供是那種數(shù)據(jù)處理語(yǔ)義:最多一次每窖、至少一次想许、精確一次罢缸。

  • 最多一次: 用戶數(shù)據(jù)只會(huì)被處理一次,不管成功還是失敗,不會(huì)重試也不會(huì)重發(fā)蚪腋。
  • 至少一次: 系統(tǒng)會(huì)保證數(shù)據(jù)或事件被處理一次凸克。如果中間發(fā)生錯(cuò)誤或者丟失,就會(huì)重發(fā)或者重試基跑。
  • 精確一次: 每一條數(shù)據(jù)只會(huì)被精確地處理一次,不多也不少。

Flink的快照可以到算子級(jí)別,并且對(duì)全局?jǐn)?shù)據(jù)也可以做快照篓像。
Flink分布式快照的核心元素之一是Barrier,該標(biāo)記是嚴(yán)格有序的,并隨著數(shù)據(jù)往下流動(dòng)动知。
每個(gè)流的barrier n到達(dá)時(shí)間不一致怎么辦,這是Flink采取的措施是快流等慢流。
Flink在做存儲(chǔ)時(shí),可采用異步方式,每次都是進(jìn)行的全量checkpoint,是基于上次進(jìn)行更新的员辩。

快照機(jī)制能夠保證作業(yè)出現(xiàn)fail-over后可以從最新的快照進(jìn)行恢復(fù),即分布式快照機(jī)制可以保證Flink系統(tǒng)內(nèi)部的精確一次處理盒粮。

兩階段處理繼承TwoPhaseCommitSinkFunction,需要實(shí)現(xiàn)beginTransaction、preCommit奠滑、commit丹皱、abort方法來(lái)實(shí)現(xiàn)精確一次的處理語(yǔ)義,

  • beginTransaction:在開啟事務(wù)之前,在目標(biāo)文件系統(tǒng)的臨時(shí)目錄中創(chuàng)建一個(gè)臨時(shí)文件,后面在處理數(shù)據(jù)時(shí)將數(shù)據(jù)寫入此文件。
  • preCommit:在預(yù)提交階段,刷寫文件,然后關(guān)閉文件,之后就不能寫入到文件,為屬于下一個(gè)檢查點(diǎn)的任何后續(xù)寫入啟動(dòng)新事務(wù)宋税。
  • commit:在提交階段,將預(yù)提交的文件原子性移動(dòng)到真正的目標(biāo)目錄中,這會(huì)增加輸出數(shù)據(jù)可見性的延遲摊崭。
  • abort:在終止階段,刪除臨時(shí)文件。

Kafka-Flink-Kafka過(guò)程:

  • Flink開始做checkpoint操作, 進(jìn)入pre-commit階段,同時(shí)Flink JobManager會(huì)將檢查點(diǎn)Barrier注入數(shù)據(jù)流中杰赛。
  • 當(dāng)所有barrier在算子中成功進(jìn)行一遍傳遞,并完成快照后,則pre-commit階段完成
  • 等所有的算子完成預(yù)提交,就會(huì)發(fā)起一個(gè)提交動(dòng)作,但是任何一個(gè)預(yù)提交失敗都會(huì)導(dǎo)致Flink回滾到最近的checkpoint;
  • pre-commit完成,必須要確保commit也要成功呢簸。

如何排查生產(chǎn)環(huán)境中的反壓?jiǎn)栴}

不同框架的反壓對(duì)比:

  • Storm:從1.0版本之后引入反壓,Storm會(huì)主動(dòng)監(jiān)控工作節(jié)點(diǎn),工作節(jié)點(diǎn)接收數(shù)據(jù)超過(guò)閾值,反壓信息會(huì)被發(fā)送到ZooKeeper,ZooKeeper通知所有的工作節(jié)點(diǎn)
    進(jìn)入反壓狀態(tài),最后數(shù)據(jù)的生產(chǎn)源頭會(huì)降低數(shù)據(jù)的發(fā)送速度。
  • Spark Streaming:RateController組件,利用經(jīng)典的PID算法,根據(jù)消息數(shù)量乏屯、調(diào)度時(shí)間根时、處理時(shí)間等計(jì)算出來(lái)速率,然后進(jìn)行限速。
  • Flink:利用網(wǎng)絡(luò)傳輸和動(dòng)態(tài)限流,流中的數(shù)據(jù)在算子間進(jìn)行計(jì)算和轉(zhuǎn)換時(shí),會(huì)被放入分布式的阻塞隊(duì)列中辰晕。當(dāng)消費(fèi)者的阻塞隊(duì)列滿時(shí),則會(huì)降低生產(chǎn)者的數(shù)據(jù)生產(chǎn)速度蛤迎。

Flink Web UI Back Pressure出現(xiàn)數(shù)值:

  • OK: 0<=Ratio<=0.10,正常;
  • LOW:0.10<Ratio<=0.50,一般;
  • HIGH: 0.5 < Ratio <=1,嚴(yán)重。
指標(biāo)名稱 用途 解釋
outPoolUsage 發(fā)送端緩沖池的使用率 當(dāng)前Task的數(shù)據(jù)發(fā)送率,如果數(shù)值很低,當(dāng)前節(jié)點(diǎn)有可能為反壓節(jié)點(diǎn)
inPoolUsage 接收端緩沖池的使用率 Task的接收速度,inPoolUsage很高,outPoolUsage很低,這個(gè)節(jié)點(diǎn)有可能是反壓節(jié)點(diǎn)
floatingBuffersUsage 處理節(jié)點(diǎn)緩沖池的使用率
exclusiveBuffersUsage 數(shù)據(jù)輸入方緩沖池的使用率

反壓?jiǎn)栴}處理:

  • 數(shù)據(jù)傾斜:使用類似的KeyBy等分組聚合函數(shù)導(dǎo)致,需要用戶將熱點(diǎn)key進(jìn)行預(yù)處理,降低或者消除熱點(diǎn)key的影響含友。
  • GC:使用-XX:+PrintGCDetails參數(shù)查看GC日志
  • 代碼本身:查看機(jī)器的CPU替裆、內(nèi)存使用

如何處理生產(chǎn)環(huán)境中的數(shù)據(jù)傾斜問(wèn)題

兩階段聚合解決KeyBy熱點(diǎn)

根據(jù)type進(jìn)行KeyBy時(shí),如果數(shù)據(jù)的type分布不均勻就會(huì)導(dǎo)致大量的數(shù)據(jù)分配到一個(gè)task中,發(fā)生數(shù)據(jù)傾斜。解決的思路為:

  • 首先把分組的key打散,比如添加隨機(jī)后綴;
  • 對(duì)打散后的數(shù)據(jù)進(jìn)行聚合;
  • 將打散的key還原為原先的key
  • 二次KeyBy進(jìn)行結(jié)果統(tǒng)計(jì),然后輸出窘问。

Flink消費(fèi)Kafka數(shù)據(jù)時(shí),要保證Kafka的分區(qū)數(shù)等于Flink Consumer的并行度辆童。如果不一致,需要設(shè)置Flink的Redistributing(數(shù)據(jù)充分配),
Rebalance分區(qū)策略,數(shù)據(jù)會(huì)以round-robin的方式對(duì)數(shù)據(jù)進(jìn)行再次分區(qū),可以全局負(fù)載均衡。
Rescale分區(qū)策略基于上下游的并行度,會(huì)將數(shù)據(jù)以循環(huán)的方式輸出到下游的每個(gè)實(shí)例中惠赫。

生產(chǎn)環(huán)境中的并行度和資源設(shè)置

在Flink集群中,一個(gè)TaskManager就是一個(gè)JVM進(jìn)程,并且會(huì)用獨(dú)立的線程來(lái)執(zhí)行task把鉴,slot僅僅用來(lái)做內(nèi)存的隔離,對(duì)CPU不起作用。

默認(rèn)情況下,Flink還允許同一個(gè)Job的子任務(wù)共享slot汉形。

Flink自身會(huì)把不同的算子的task連接在一起組成一個(gè)新的task纸镊。因?yàn)閠ask在同一個(gè)線程中執(zhí)行,可以有效減少線程間上下文的切換,減少序列化/反序列化帶來(lái)的資源消耗,
提高任務(wù)的吞吐量概疆。

并行度級(jí)別:算子級(jí)別逗威、環(huán)境級(jí)別、客戶端級(jí)別岔冀、集群配置級(jí)別凯旭。

在生產(chǎn)中,推薦在算子級(jí)別顯式指定各自的并行度,方便進(jìn)行顯式和精確的資源控制。
環(huán)境級(jí)別:任務(wù)中的所有算子的并行度都是指定的值,生產(chǎn)環(huán)境不推薦。

設(shè)置并行度的優(yōu)先級(jí)為:算子級(jí)別 > 環(huán)境級(jí)別 > 客戶端級(jí)別 > 集群級(jí)別配置罐呼。

Flink如何做維表關(guān)聯(lián)

業(yè)務(wù)對(duì)維表數(shù)據(jù)關(guān)聯(lián)的時(shí)效性要求,有以下幾種解決方案:

  • 實(shí)時(shí)查詢維表:用戶在Flink算子中直接訪問(wèn)外部數(shù)據(jù)庫(kù)鞠柄,這種是同步方式,數(shù)據(jù)保證是最新的。
  • 預(yù)加載全量數(shù)據(jù):每次啟動(dòng)時(shí),將維表中全部數(shù)據(jù)加載到內(nèi)存中嫉柴。
  • LRU緩存:將最近最少使用的數(shù)據(jù)則被淘汰厌杜。

實(shí)時(shí)查詢維表

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class DimSync extends RichMapFunction<String,Order> {

    private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);

    private Connection conn = null;
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
    }

    public Order map(String in) throws Exception {

        JSONObject jsonObject = JSONObject.parseObject(in);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");

        //根據(jù)city_id 查詢 city_name
        PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
        pst.setInt(1,cityId);
        ResultSet resultSet = pst.executeQuery();
        String cityName = null;
        while (resultSet.next()){
            cityName = resultSet.getString(1);
        }
        pst.close();
        return new Order(cityId,userName,items,cityName);
    }

    public void close() throws Exception {
        super.close();
        conn.close();
    }

}

要保證及時(shí)關(guān)閉連接池

public class Order {
    private Integer cityId;
    private String userName;
    private String items;
    private String cityName;

    public Order(Integer cityId, String userName, String items, String cityName) {
        this.cityId = cityId;
        this.userName = userName;
        this.items = items;
        this.cityName = cityName;
    }

    public Order() {
    }

    public Integer getCityId() {
        return cityId;
    }

    public void setCityId(Integer cityId) {
        this.cityId = cityId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getItems() {
        return items;
    }

    public void setItems(String items) {
        this.items = items;
    }

    public String getCityName() {
        return cityName;
    }

    public void setCityName(String cityName) {
        this.cityName = cityName;
    }

    @Override
    public String toString() {
        return "Order{" +
                "cityId=" + cityId +
                ", userName='" + userName + '\'' +
                ", items='" + items + '\'' +
                ", cityName='" + cityName + '\'' +
                '}';
    }
}

預(yù)加載全量數(shù)據(jù)

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class WholeLoad extends RichMapFunction<String,Order> {


    private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
    ScheduledExecutorService executor = null;
    private Map<String,String> cache;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    load();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },5,5, TimeUnit.MINUTES);
    }

    @Override
    public Order map(String value) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(value);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        String cityName = cache.get(cityId);
        return new Order(cityId,userName,items,cityName);
    }

    public void load() throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
        PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
        ResultSet rs = statement.executeQuery();
        //全量更新維度數(shù)據(jù)到內(nèi)存
        while (rs.next()) {
            String cityId = rs.getString("city_id");
            String cityName = rs.getString("city_name");
            cache.put(cityId, cityName);
        }
        con.close();
    }
}

LRU緩存

import com.alibaba.fastjson.JSONObject;
import com.stumbleupon.async.Callback;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class LRU extends RichAsyncFunction<String,Order> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
    String table = "info";
    Cache<String, String> cache = null;
    private HBaseClient client = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //創(chuàng)建hbase客戶端
        client = new HBaseClient("127.0.0.1","7071");
        cache = CacheBuilder.newBuilder()
                //最多存儲(chǔ)10000條
                .maximumSize(10000)
                //過(guò)期時(shí)間為1分鐘
                .expireAfterWrite(60, TimeUnit.SECONDS)
                .build();
    }

    @Override
    public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {

        JSONObject jsonObject = JSONObject.parseObject(input);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        //讀緩存
        String cacheCityName = cache.getIfPresent(cityId);
        //如果緩存獲取失敗再?gòu)膆base獲取維度數(shù)據(jù)
        if(cacheCityName != null){
            Order order = new Order();
            order.setCityId(cityId);
            order.setItems(items);
            order.setUserName(userName);
            order.setCityName(cacheCityName);
            resultFuture.complete(Collections.singleton(order));
        }else {

            client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                for (KeyValue kv : arg) {
                    String value = new String(kv.value());
                    Order order = new Order();
                    order.setCityId(cityId);
                    order.setItems(items);
                    order.setUserName(userName);
                    order.setCityName(value);
                    resultFuture.complete(Collections.singleton(order));
                    cache.put(String.valueOf(cityId), value);
                }
                return null;
            });

        }
    }
}

海量數(shù)據(jù)去重

Flink中實(shí)時(shí)去重的方案:

  • 基于狀態(tài)后端
  • 基于HyperLogLog
  • 基于布隆過(guò)濾器
  • 基于BitMap
  • 基于外部數(shù)據(jù)庫(kù)

基于狀態(tài)后端

狀態(tài)后端的種類之一是RocksDBStateBackend,它會(huì)將正在云心中的狀態(tài)數(shù)據(jù)保存在RockDB數(shù)據(jù)庫(kù)中,該數(shù)據(jù)庫(kù)默認(rèn)將數(shù)據(jù)存儲(chǔ)在TaskManager運(yùn)行節(jié)點(diǎn)的數(shù)據(jù)目錄下。
計(jì)算每天每個(gè)商品的訪問(wèn)量:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class MapStateDistinctFunction extends KeyedProcessFunction<String,Tuple2<String,Integer>,Tuple2<String,Integer>> {

    private transient ValueState<Integer> counts;

    @Override
    public void open(Configuration parameters) throws Exception {
        //我們?cè)O(shè)置ValueState的TTL的生命周期為24小時(shí)计螺,到期自動(dòng)清除狀態(tài)
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        //設(shè)置ValueState的默認(rèn)值
        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
        descriptor.enableTimeToLive(ttlConfig);
        counts = getRuntimeContext().getState(descriptor);
        super.open(parameters);
    }


    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

        String f0 = value.f0;

        //如果不存在則新增
        if(counts.value() == null){
            counts.update(1);
        }else{
            //如果存在則加1
            counts.update(counts.value()+1);
        }

        out.collect(Tuple2.of(f0, counts.value()));

    }

}

基于HyperLogLo

HyperLogLog是一種估計(jì)統(tǒng)計(jì)算法,被用來(lái)統(tǒng)計(jì)一餓集合中不同數(shù)據(jù)的個(gè)數(shù)夯尽。

import net.agkn.hll.HLL;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> {


    @Override
    public HLL createAccumulator() {

        return new HLL(14, 5);
    }

    @Override
    public HLL add(Tuple2<String, Long> value, HLL accumulator) {

        //value為購(gòu)買記錄 <商品sku, 用戶id>
        accumulator.addRaw(value.f1);
        return accumulator;
    }

    @Override
    public Long getResult(HLL accumulator) {
        long cardinality = accumulator.cardinality();
        return cardinality;
    }


    @Override
    public HLL merge(HLL a, HLL b) {
        a.union(b);
        return a;
    }
}

添加相應(yīng)的pom依賴:

<dependency>
    <groupId>net.agkn</groupId>
    <artifactId>hll</artifactId>
    <version>1.6.0</version>
</dependency>

如果元素是非數(shù)值型,需要hash過(guò)后才能插入。

基于布隆過(guò)濾器

BloomFilter類似于一個(gè)HashSet,用于快速判斷某個(gè)元素是否存在與集合中,其典型的應(yīng)用場(chǎng)景就是能夠快速判斷一個(gè)key是否存在某個(gè)容器中,不存在直接返回登馒。

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {

    private transient ValueState<BloomFilter> bloomState;
    private transient ValueState<Long> countState;


    @Override
    public void processElement(String value, Context ctx, Collector<Long> out) throws Exception {

        BloomFilter bloomFilter = bloomState.value();
        Long skuCount = countState.value();

        if(bloomFilter == null){
            BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
        }

        if(skuCount == null){
            skuCount = 0L;
        }

        if(!bloomFilter.mightContain(value)){
            bloomFilter.put(value);
            skuCount = skuCount + 1;
        }

        bloomState.update(bloomFilter);
        countState.update(skuCount);
        out.collect(countState.value());
    }
}

BitMap

HyperLogLog 和BloomFilter雖然減少了存儲(chǔ)但是丟失了精度匙握。
BitMap的基本思想是用一個(gè)bit位來(lái)標(biāo)記某個(gè)元素對(duì)應(yīng)的value,而key即是該元素。

import org.apache.flink.api.common.functions.AggregateFunction;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {


    @Override
    public Roaring64NavigableMap createAccumulator() {
        return new Roaring64NavigableMap();
    }

    @Override
    public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {
        accumulator.add(value);
        return accumulator;
    }


    @Override
    public Long getResult(Roaring64NavigableMap accumulator) {
        return accumulator.getLongCardinality();
    }

    @Override
    public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {
        return null;
    }
}

添加依賴:

<dependency>
    <groupId>org.roaringbitmap</groupId>
    <artifactId>RoaringBitmap</artifactId>
    <version>0.9.21</version>
</dependency>
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末陈轿,一起剝皮案震驚了整個(gè)濱河市圈纺,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌麦射,老刑警劉巖蛾娶,帶你破解...
    沈念sama閱讀 218,386評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異法褥,居然都是意外死亡贞瞒,警方通過(guò)查閱死者的電腦和手機(jī)墨微,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)拾氓,“玉大人呐萨,你說(shuō)我怎么就攤上這事杀饵。” “怎么了谬擦?”我有些...
    開封第一講書人閱讀 164,704評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵切距,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我惨远,道長(zhǎng)谜悟,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,702評(píng)論 1 294
  • 正文 為了忘掉前任北秽,我火速辦了婚禮葡幸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘贺氓。我一直安慰自己蔚叨,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蔑水,像睡著了一般邢锯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上搀别,一...
    開封第一講書人閱讀 51,573評(píng)論 1 305
  • 那天丹擎,我揣著相機(jī)與錄音,去河邊找鬼歇父。 笑死鸥鹉,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的庶骄。 我是一名探鬼主播毁渗,決...
    沈念sama閱讀 40,314評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼单刁!你這毒婦竟也來(lái)了灸异?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,230評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤羔飞,失蹤者是張志新(化名)和其女友劉穎肺樟,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體逻淌,經(jīng)...
    沈念sama閱讀 45,680評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡么伯,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了卡儒。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片田柔。...
    茶點(diǎn)故事閱讀 39,991評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖骨望,靈堂內(nèi)的尸體忽然破棺而出硬爆,到底是詐尸還是另有隱情,我是刑警寧澤擎鸠,帶...
    沈念sama閱讀 35,706評(píng)論 5 346
  • 正文 年R本政府宣布缀磕,位于F島的核電站,受9級(jí)特大地震影響劣光,放射性物質(zhì)發(fā)生泄漏袜蚕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評(píng)論 3 330
  • 文/蒙蒙 一绢涡、第九天 我趴在偏房一處隱蔽的房頂上張望牲剃。 院中可真熱鬧,春花似錦垂寥、人聲如沸颠黎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)狭归。三九已至夭坪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間过椎,已是汗流浹背室梅。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留疚宇,地道東北人亡鼠。 一個(gè)月前我還...
    沈念sama閱讀 48,158評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像敷待,于是被迫代替她去往敵國(guó)和親间涵。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一. 背景 數(shù)據(jù)準(zhǔn)實(shí)時(shí)復(fù)制(CDC)是目前行內(nèi)實(shí)時(shí)數(shù)據(jù)需求大量使用的技術(shù)榜揖,目前行內(nèi)已經(jīng)大量使用IBM CDC 軟件...
    jianwbj閱讀 3,555評(píng)論 0 2
  • 概述 2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年勾哩,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    Yobhel閱讀 1,844評(píng)論 0 33
  • 說(shuō)明:本文為《Flink大數(shù)據(jù)項(xiàng)目實(shí)戰(zhàn)》學(xué)習(xí)筆記,想通過(guò)視頻系統(tǒng)學(xué)習(xí)Flink這個(gè)最火爆的大數(shù)據(jù)計(jì)算框架的同學(xué)举哟,推...
    大數(shù)據(jù)研習(xí)社閱讀 1,153評(píng)論 0 0
  • 概述 2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年思劳,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    王知無(wú)閱讀 3,248評(píng)論 2 11
  • 前篇主要介紹流式計(jì)算相關(guān)的核心概念,這篇簡(jiǎn)要聊聊Flink總體架構(gòu)妨猩、運(yùn)行環(huán)境及其在大數(shù)據(jù)生態(tài)系統(tǒng)中的位置潜叛,讓大家先...
    data之道閱讀 1,225評(píng)論 0 6