flink學(xué)習(xí)之七-map蚤认、fliter、flatmap

看完了Flink的datasource糕伐、sink烙懦,也就把一頭一尾給看完了,從數(shù)據(jù)流入到數(shù)據(jù)流出赤炒,缺少了中間的處理環(huán)節(jié)。

而flink的大頭恰恰是只在這個(gè)中間環(huán)節(jié)亏较,如下圖:

source-transform-sink-update.png

中間的處理環(huán)節(jié)比較復(fù)雜莺褒,現(xiàn)在也就看了其中一部分,這里先開始講其中最簡(jiǎn)單 也最常用的map雪情、flatmap及filter遵岩。

map

flink中dataSourceStream和java8中的map很類似,都是用來做轉(zhuǎn)換處理的巡通,看下map的實(shí)現(xiàn):

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
        TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
    
        return this.transform("Map", outType, new StreamMap((MapFunction)this.clean(mapper)));
    
}

可以看到:

1尘执、返回的是SingleOutputStreamOperator泛型,這是個(gè)基礎(chǔ)的類型宴凉,好多DataStream的方法都返回它誊锭,比如map、flapmap弥锄、filter丧靡、process等

2蟆沫、最終是調(diào)用transform方法來實(shí)現(xiàn)的,看下transfrom的實(shí)現(xiàn):

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
        
        this.transformation.getOutputType();
        
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation(this.transformation, operatorName, operator, outTypeInfo, this.environment.getParallelism());
        
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(this.environment, resultTransform);
        
        this.getExecutionEnvironment().addOperator(resultTransform);
        
        return returnStream;
    }

額温治,好像還不如不看饭庞,直接看怎么用吧!

@Slf4j
public class KafkaUrlSinkJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "metric-group");
        properties.put("auto.offset.reset", "latest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
                new FlinkKafkaConsumer010<String>(
                        "testjin",// topic
                        new SimpleStringSchema(),
                        properties
                )
        ).setParallelism(1)
                // map操作熬荆,轉(zhuǎn)換舟山,從一個(gè)數(shù)據(jù)流轉(zhuǎn)換成另一個(gè)數(shù)據(jù)流,這里是從string-->UrlInfo
                .map(string -> JSON.parseObject(string, UrlInfo.class))
           
       
    }

可以看到卤恳,kafka中傳遞的是String類型累盗,在這里通過map轉(zhuǎn)換后,變SingleOutputStreamOperator<UrlInfo> 類型纬黎,否則就是SingleOutputStreamOperator<String> 幅骄。

map方法不允許缺少數(shù)據(jù),也就是原來多少條數(shù)據(jù)本今,處理后依然是多少條數(shù)據(jù)拆座,只是用來做轉(zhuǎn)換。

flatmap

flatmap冠息,也就是將嵌套集合轉(zhuǎn)換并平鋪成非嵌套集合挪凑。看個(gè)例子逛艰,還是用上面的kafka datasource:

        // 構(gòu)造一個(gè)嵌套的數(shù)據(jù)
        SingleOutputStreamOperator<List<UrlInfo>> listDataStreaamSource = dataStreamSource
                .map(urlInfo -> {
                    List<UrlInfo> list = Lists.newArrayList();

                    list.add(urlInfo);

                    UrlInfo urlInfo1 = new UrlInfo();
                    urlInfo1.setUrl(urlInfo.getUrl() + "-copy");
                    urlInfo1.setHash(DigestUtils.md5Hex(urlInfo1.getUrl()));

                    list.add(urlInfo1);

                    return list;
                }).returns(new ListTypeInfo(UrlInfo.class));

        listDataStreaamSource.addSink(new PrintSinkFunction<>());

說明:

1躏碳、注意這里的returns方法,如果不指定散怖,會(huì)在運(yùn)行時(shí)報(bào)錯(cuò)

/*I think the short description of the error message is quite good, but let me expand it a bit.
In order to execute a program, Flink needs to know the type of the values that are processed because it needs to serialize and deserialize them. Flink's type system is based on TypeInformation which describes a data type. When you specify a function, Flink tries to infer the return type of that function. In case of the FlatMapFunction of your example the type of the objects that are passed to the Collector.
Unfortunately, some Lambda functions lose this information due to type erasure such that Flink cannot automatically infer the type. Therefore, you have to explicitly declare the return type.

如果直接上面這樣轉(zhuǎn)換菇绵,因?yàn)閘ambda表達(dá)式會(huì)丟失部分信息,會(huì)報(bào)如下異常:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
*/

不過由于返回的是一個(gè)List<Ojbect>,不可能直接用 List<Object>.class镇眷,沒這種寫法咬最。而flink則

提供了更多選項(xiàng),這里使用的是

public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo){}

這個(gè)構(gòu)造函數(shù)欠动,而ListTypeInfo則是繼承TypeInfomation抽象類的一個(gè)List實(shí)現(xiàn)永乌。

和上文的KafkaSender一起運(yùn)行,會(huì)有如下結(jié)果:

kafkaSender:

2019-01-15 20:21:46.650 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799
2019-01-15 20:21:46.653 [main] INFO  myflink.KafkaSender - send msg:{"domain":"so.com","id":0,"url":"http://so.com/1547554906650"}

KafkaUrlSinkJob

[UrlInfo(id=0, url=http://so.com/1547554906650, hash=null), UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)]

也就是一個(gè)UrlInfo 擴(kuò)展成了 一個(gè)List<UrlInfo>

下面看看怎么使用flatmap

...
    
SingleOutputStreamOperator<UrlInfo> flatSource = listDataStreaamSource.flatMap(new FlatMapFunction<List<UrlInfo>, UrlInfo>() {
            @Override
            public void flatMap(List<UrlInfo> urlInfos, Collector<UrlInfo> collector) throws Exception {
                urlInfos.parallelStream().forEach(urlInfo -> collector.collect(urlInfo));
            }
        });

flatSource.addSink(new PrintSinkFunction<>());

...

當(dāng)然可以寫成lambda表達(dá)式:(注意lambda表達(dá)式需要顯式指定return type)

SingleOutputStreamOperator<UrlInfo> flatSource = listDataStreaamSource.flatMap(
                (FlatMapFunction<List<UrlInfo>, UrlInfo>) (urlInfos, collector) ->
                        urlInfos.parallelStream().forEach(urlInfo -> collector.collect(urlInfo))).returns(UrlInfo.class);

看看打印出來的結(jié)果:

2> [UrlInfo(id=0, url=http://so.com/1547554906650, hash=null), UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)]
1> [UrlInfo(id=0, url=http://so.com/1547554903640, hash=null), UrlInfo(id=0, url=http://so.com/1547554903640-copy, hash=138f79ecc92744a65b03132959da2f73)]

1> UrlInfo(id=0, url=http://so.com/1547554903640-copy, hash=138f79ecc92744a65b03132959da2f73)
1> UrlInfo(id=0, url=http://so.com/1547554903640, hash=null)
2> UrlInfo(id=0, url=http://so.com/1547554906650, hash=null)
2> UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)

也就是說具伍,flatmap方法最終返回的是一個(gè)collector翅雏,而這個(gè)collector只有一層,當(dāng)輸入數(shù)據(jù)有嵌套的情況下人芽,可以將數(shù)據(jù)平鋪處理望几。

當(dāng)然,不只是針對(duì)嵌套集合啼肩,由于flatmap返回的數(shù)據(jù)條數(shù)并不會(huì)做限制橄妆,也就可以做一些擴(kuò)展數(shù)據(jù)處理的情況衙伶,如下:

dataStream.flatMap((FlatMapFunction<String, String>) (value, out) -> {
            for (String word : value.split(" ")) {
                out.collect(word);
            }
        });

這里就是將string使用空格切割后,組成一個(gè)新的dataStream.

filter

顧名思義害碾,filter用于過濾數(shù)據(jù)矢劲,繼續(xù)在上面代碼的基礎(chǔ)上寫測(cè)試。為了避免干擾慌随,將上面兩個(gè)dataSourceStream.addSink注釋掉芬沉,添加以下代碼:

// 根據(jù)domain字段,過濾數(shù)據(jù)阁猜,只保留BAIDU的domain
SingleOutputStreamOperator<UrlInfo> filterSource = flatSource.filter(urlInfo -> {
            if(StringUtils.equals(UrlInfo.BAIDU,urlInfo.getDomain())){
                return true;
            }
            return false;
        });

        filterSource.addSink(new PrintSinkFunction<>());

這里排除別的domain數(shù)據(jù)丸逸,只保留BAIDU的數(shù)據(jù),運(yùn)行結(jié)果就不貼出來了剃袍,驗(yàn)證了filter的效果黄刚。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市民效,隨后出現(xiàn)的幾起案子憔维,更是在濱河造成了極大的恐慌,老刑警劉巖畏邢,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件业扒,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡舒萎,警方通過查閱死者的電腦和手機(jī)程储,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來臂寝,“玉大人章鲤,你說我怎么就攤上這事∨乇幔” “怎么了咏窿?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長素征。 經(jīng)常有香客問我,道長萝挤,這世上最難降的妖魔是什么御毅? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮怜珍,結(jié)果婚禮上端蛆,老公的妹妹穿的比我還像新娘。我一直安慰自己酥泛,他們只是感情好今豆,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布嫌拣。 她就那樣靜靜地躺著,像睡著了一般呆躲。 火紅的嫁衣襯著肌膚如雪异逐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天插掂,我揣著相機(jī)與錄音灰瞻,去河邊找鬼。 笑死辅甥,一個(gè)胖子當(dāng)著我的面吹牛酝润,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播璃弄,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼要销,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了夏块?” 一聲冷哼從身側(cè)響起绣张,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎奴潘,沒想到半個(gè)月后呼巴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡患民,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年缩举,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片匹颤。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡仅孩,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出印蓖,到底是詐尸還是另有隱情辽慕,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布赦肃,位于F島的核電站溅蛉,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏他宛。R本人自食惡果不足惜船侧,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望厅各。 院中可真熱鬧镜撩,春花似錦、人聲如沸队塘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至遮怜,卻和暖如春淋袖,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背奈泪。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來泰國打工适贸, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人涝桅。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓拜姿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親冯遂。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蕊肥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,332評(píng)論 0 10
  • 爬滿青苔的墻,綠了誰家的窗蛤肌?門前風(fēng)掠過壁却,鈴微響,那古鎮(zhèn)小巷裸准,有我成長的痕跡展东。門前柳,有風(fēng)剪過的痕跡炒俱,敢問誰能奈何...
    Dominatetheworl閱讀 534評(píng)論 1 3
  • 老公盐肃,多么熟悉而又陌生的字眼。曾幾何時(shí)老公這兩個(gè)字對(duì)我而言也就只是兩個(gè)漢字权悟,一個(gè)詞語而已…… 老公:我的男...
    俺是河南妞兒閱讀 811評(píng)論 0 1
  • “如果三十歲我還未嫁峦阁,你還未娶谦铃,我就嫁給你” “好” …… 他,生在東邊榔昔,她驹闰,生在西邊;他來自農(nóng)村撒会,她生在城市疮方,如...
    書齋校尉閱讀 1,044評(píng)論 0 3
  • 姓名:呂海森 學(xué)號(hào):17011210545 轉(zhuǎn)載自:http://tech.sina.com.cn/i/2017-...
    淺藍(lán)色的臆想閱讀 118評(píng)論 0 0