kafka stream 內(nèi)容過(guò)濾 demo

寫個(gè)demo 練練手 椿争, 結(jié)論: 思維必須成流式 柳击,不要以數(shù)據(jù)庫(kù)的方式去看待流式聚合 橄杨, 流式的聚合七芭,在time window 中 也會(huì)產(chǎn)生很多事件 . 最后一點(diǎn) 素挽,kafka 提供著數(shù)據(jù)庫(kù)存儲(chǔ)能力的ktable。
也就是說(shuō) 狸驳,你可以 發(fā)請(qǐng)求給instance 预明,獲取ktable 的聚合數(shù)據(jù) 。
而不是 自己寫個(gè)服務(wù)作為消費(fèi)者然后去實(shí)現(xiàn)ktable的聚合 耙箍,這點(diǎn)有點(diǎn)別扭
建議大家看看 https://github.com/confluentinc/kafka-streams-examples.git

網(wǎng)狀網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu) (2).png

10s 超過(guò)5次 評(píng)論 代碼

        SpecificAvroSerde<Content> contentSpecificAvroSerde = new SpecificAvroSerde<>();
        SpecificAvroSerde<ContentbyUserId> userContentSpecificAvroSerde = new SpecificAvroSerde<>();
        contentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
        userContentSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
        Properties properties = config("contentId1", "localhost:9092", "/tmp/filter1");
        // key 為 contentId  value content
        final StreamsBuilder builder = new StreamsBuilder();
        // key userId , value content
        KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
                .selectKey((k, v) -> v.getUserId());
        KGroupedStream<String, Content> stringContentKGroupedStream = kStream.groupByKey();
        KStream<Windowed<String>, ContentbyUserId> k1ResultStrem = k1Result
                .toStream()
                .filter((k, v) -> {
            return null != v && v.getCount() > 5;
        });
        k1ResultStrem.print(Printed.toSysOut());

10s 內(nèi) 輸入事件間隔小于1s的事件數(shù) >5

  // key 為 contentId  value content
        final StreamsBuilder builder = new StreamsBuilder();
        // key userId , value content
        KStream<String, Content> kStream = builder.stream("content2", Consumed.with(Serdes.String(), contentSpecificAvroSerde))
                .selectKey((k, v) -> v.getUserId());
        KStream<Windowed<String>,Long> kStreamResult2 = stringContentKGroupedStream.windowedBy(SessionWindows.with(1 * 1000))
                .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("result2")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()))
                .toStream()
                .filter((e, v) -> {
                            // session 窗口時(shí)間 大于 10 s 且 數(shù)量大于5
                            if (v != null && e.window().end() - e.window().start() > 10 * 1000 && v.longValue() > 5) {
                                return true;
                            }
                            return false;

                        }
                );

        kStreamResult2.print(Printed.toSysOut());

基礎(chǔ)配置

    public static Properties config(String appliactionId, String bootstrapServers, String stateDir) {
        final Properties streamsConfiguration = new Properties();
        // Give the Streams application a unique name.  The name must be unique in the Kafka cluster
        // against which the application is run.
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appliactionId);
        // Where to find Kafka broker(s).
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        streamsConfiguration.put("schema.registry.url", "http://localhost:8081");


        // Provide the details of our embedded http service that we'll use to connect to this streams
        // instance and discover locations of stores.
//        streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host + ":" + applicationServerPort);
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
        // Set to earliest so we don't miss any data that arrived in the topics before the process
        // started
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // Set the commit interval to 500ms so that any changes are flushed frequently and the top five
        // charts are updated with low latency.
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 500);
        // Allow the user to fine-tune the `metadata.max.age.ms` via Java system properties from the CLI.
        // Lowering this parameter from its default of 5 minutes to a few seconds is helpful in
        // situations where the input topic was not pre-created before running the application because
        // the application will discover a newly created topic faster.  In production, you would
        // typically not change this parameter from its default.
        String metadataMaxAgeMs = System.getProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG);
        if (metadataMaxAgeMs != null) {
            try {
                int value = Integer.parseInt(metadataMaxAgeMs);
                streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, value);
                System.out.println("Set consumer configuration " + ConsumerConfig.METADATA_MAX_AGE_CONFIG +
                        " to " + value);
            } catch (NumberFormatException ignored) {
            }
        }
        return streamsConfiguration;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末撰糠,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子辩昆,更是在濱河造成了極大的恐慌阅酪,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,590評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件汁针,死亡現(xiàn)場(chǎng)離奇詭異术辐,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)施无,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門术吗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人帆精,你說(shuō)我怎么就攤上這事较屿∷砥牵” “怎么了?”我有些...
    開封第一講書人閱讀 169,301評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵隘蝎,是天一觀的道長(zhǎng)购啄。 經(jīng)常有香客問(wèn)我,道長(zhǎng)嘱么,這世上最難降的妖魔是什么狮含? 我笑而不...
    開封第一講書人閱讀 60,078評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮曼振,結(jié)果婚禮上几迄,老公的妹妹穿的比我還像新娘。我一直安慰自己冰评,他們只是感情好映胁,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,082評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著甲雅,像睡著了一般解孙。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上抛人,一...
    開封第一講書人閱讀 52,682評(píng)論 1 312
  • 那天弛姜,我揣著相機(jī)與錄音,去河邊找鬼妖枚。 笑死廷臼,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的绝页。 我是一名探鬼主播中剩,決...
    沈念sama閱讀 41,155評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼抒寂!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起掠剑,我...
    開封第一講書人閱讀 40,098評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤屈芜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后朴译,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體井佑,經(jīng)...
    沈念sama閱讀 46,638評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,701評(píng)論 3 342
  • 正文 我和宋清朗相戀三年眠寿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了躬翁。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片怒见。...
    茶點(diǎn)故事閱讀 40,852評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡印颤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出玛迄,到底是詐尸還是另有隱情,我是刑警寧澤宁舰,帶...
    沈念sama閱讀 36,520評(píng)論 5 351
  • 正文 年R本政府宣布拼卵,位于F島的核電站,受9級(jí)特大地震影響蛮艰,放射性物質(zhì)發(fā)生泄漏腋腮。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,181評(píng)論 3 335
  • 文/蒙蒙 一壤蚜、第九天 我趴在偏房一處隱蔽的房頂上張望即寡。 院中可真熱鬧,春花似錦袜刷、人聲如沸聪富。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)善涨。三九已至,卻和暖如春草则,著一層夾襖步出監(jiān)牢的瞬間钢拧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工炕横, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留源内,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,279評(píng)論 3 379
  • 正文 我出身青樓份殿,卻偏偏與公主長(zhǎng)得像膜钓,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子卿嘲,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,851評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理颂斜,服務(wù)發(fā)現(xiàn),斷路器拾枣,智...
    卡卡羅2017閱讀 134,715評(píng)論 18 139
  • Kafka設(shè)計(jì)解析(七)- Kafka Stream 原創(chuàng)文章沃疮,轉(zhuǎn)載請(qǐng)務(wù)必將下面這段話置于文章開頭處。本文轉(zhuǎn)發(fā)自技...
    小小少年Boy閱讀 5,255評(píng)論 0 32
  • Kafka官網(wǎng):http://kafka.apache.org/入門1.1 介紹Kafka? 是一個(gè)分布式流處理系...
    it_zzy閱讀 3,901評(píng)論 3 53
  • 前言:前段時(shí)間接觸過(guò)一個(gè)流式計(jì)算的任務(wù)梅肤,使用了阿里巴巴集團(tuán)的JStorm司蔬,發(fā)現(xiàn)這個(gè)領(lǐng)域值得探索,就發(fā)現(xiàn)了這篇文章—...
    程序熊大閱讀 6,301評(píng)論 5 31
  • 大學(xué)四年姨蝴,如果沒有定好目標(biāo)前行俊啼,很容易隨大流,然后慢慢退步左医,該吃吃授帕,該喝喝同木,說(shuō)玩耍就玩耍,你所以為的自由不...
    Control_280f閱讀 165評(píng)論 0 0