Flink增量讀取文件模擬實(shí)時(shí)數(shù)據(jù)流

代碼

public class SourceFromFile extends RichSourceFunction<String> {
    private volatile Boolean isRunning = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        BufferedReader bufferedReader = new BufferedReader(new FileReader("E:\\documents\\test.txt"));
        while (isRunning) {
            String line = bufferedReader.readLine();
            if (StringUtils.isBlank(line)) {
                continue;
            }
            ctx.collect(line);
            TimeUnit.SECONDS.sleep(10);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
public class WindowWordCount {
    public static void main(String[] args) throws Exception {

        final ParameterTool parameters = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameters);

        env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);

        DataStreamSource<String> dataStream = env.addSource(new SourceFromFile()).setParallelism(1);

        dataStream
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] arr = value.split(",");
                    for (String item : arr) {
                        out.collect(new Tuple2<>(item, 1));
                    }
                }
            })
            .keyBy(0)
            .timeWindow(Time.minutes(1))
            .sum(1)
            .print();

        env.execute("WindowWordCount");
    }
}

測(cè)試

文件中輸入:
aaa,bbb,ccc

結(jié)果:
2> (bbb,1)
3> (aaa,1)
4> (ccc,1)


增加輸入:
bbb,ccc,ddd

結(jié)果:
4> (ccc,1)
2> (bbb,1)
4> (ddd,1)


增加輸入:
aaa,aaa,aaa

結(jié)果:
3> (aaa,3)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末奖蔓,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子僧著,更是在濱河造成了極大的恐慌,老刑警劉巖荚孵,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件咬展,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡笋额,警方通過(guò)查閱死者的電腦和手機(jī)岸梨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門喜颁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人曹阔,你說(shuō)我怎么就攤上這事半开。” “怎么了赃份?”我有些...
    開(kāi)封第一講書人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵寂拆,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我抓韩,道長(zhǎng)纠永,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任谒拴,我火速辦了婚禮尝江,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘英上。我一直安慰自己炭序,他們只是感情好啤覆,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著惭聂,像睡著了一般窗声。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辜纲,一...
    開(kāi)封第一講書人閱讀 51,443評(píng)論 1 302
  • 那天笨觅,我揣著相機(jī)與錄音,去河邊找鬼耕腾。 笑死屋摇,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的幽邓。 我是一名探鬼主播,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼火脉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼牵舵!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起倦挂,我...
    開(kāi)封第一講書人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤畸颅,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后方援,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體没炒,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年犯戏,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了送火。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡先匪,死狀恐怖种吸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情呀非,我是刑警寧澤坚俗,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站岸裙,受9級(jí)特大地震影響猖败,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜降允,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一恩闻、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧拟糕,春花似錦判呕、人聲如沸倦踢。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)辱挥。三九已至,卻和暖如春边涕,著一層夾襖步出監(jiān)牢的瞬間晤碘,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工功蜓, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留园爷,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓式撼,卻偏偏與公主長(zhǎng)得像童社,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子著隆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 2019.7.17永遠(yuǎn)都不會(huì)忘記的一天扰楼,在這一天我失去了肚里將近五個(gè)月的寶寶。本來(lái)我滿懷期待他出生以后的...
    一萱昵閱讀 340評(píng)論 0 0
  • 難得有幾天美浦,我可以享受短暫一個(gè)人的生活弦赖。老公進(jìn)課室,媽媽和兒子去重慶了浦辨。 想唱就唱蹬竖,就是年輕的狀態(tài) ...
    精尚閱讀 535評(píng)論 0 0
  • 再寫一篇小文送給我家那個(gè)寶貝閨女,懷念曾經(jīng)呀呀學(xué)語(yǔ)的她A鞒辍1也蕖! 首先芽腾,恕懷雙打擾劈榨,推薦我家閨女簡(jiǎn)書的第一篇小文,嘻嘻...
    懷雙閱讀 412評(píng)論 26 30
  • 過(guò)往時(shí)光不在倒轉(zhuǎn) 往昔都在笑談中淡忘 青春韶華也慢慢褪色 猶如黑白電影 在腦海播放 色調(diào)單一卻值得回味 過(guò)往錯(cuò)過(guò)與...
    燒火一條柴閱讀 157評(píng)論 0 1