siddhi事件封裝存入Kafka

1、項目依賴:

 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <siddhi.version>5.1.11</siddhi.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-query-api</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.61</version>
        </dependency>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-core</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-query-compiler</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
            <version>4.12</version>
        </dependency>
    </dependencies>

2、示列項目:

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
public class HelloWorld {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();
        String siddhiApp =
                "define stream cseEventStream (id string,symbol string, price float, volume int,time long); " +
                        "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count  group by id having count>=2 insert into outputStream;"
                        + "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count  group by id having count>=3 insert into outputStream;";
        ;
        // Generating runtime
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
        siddhiAppRuntime.start();
        siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    System.out.println(event);
                }
            }
        });
        // Sending events to Siddhi
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 800, 1575011611229L}); //11-29 15:13:31
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575111612330L});//11-30 19:00:12

        inputHandler.send(new Object[]{"4", "to", 50f, 30, 1575011639158L});
        inputHandler.send(new Object[]{"5", "IBM", 76.6f, 400, 1575011639158L});
        inputHandler.send(new Object[]{"6", "siddhi!", 45.6f, 200, 1575011639158L});

        siddhiAppRuntime.shutdown();
        siddhiManager.shutdown();
    }
}

3咽块、在回調函數(shù)中將event封裝成固定對象輸出到Kafka侈沪。此處需要修改addCallback中的函數(shù)晚凿。著急的可以直接先看第四點后記,防止向我一樣繞彎应役。

封裝的對象Result:

public class Result implements Serializable {
    private int volume;
    private String symbol;
    private int count;

    public Result(Event event) {
        this.volume = (int) event.getData(0);
        this.symbol = (String) event.getData(1);
        this.count = (int) event.getData(2);
    }
     public Result() {
    }

    public int getVolume() {
        return volume;
    }

    public void setVolume(int volume) {
        this.volume = volume;
    }

    public String getSymbol() {
        return symbol;
    }

    public void setSymbol(String symbol) {
        this.symbol = symbol;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }
  @Override
    public String toString() {
        return "Result{" +
                "volume=" + volume +
                ", symbol='" + symbol + '\'' +
                ", count=" + count +
                '}';
    }
}

在測試時將addCallback改成

siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    Result result = new Result(event);
                    System.out.println(result.toString());
                }
            }
        });

本來想著直接new一個Result對象還比較快箩祥。不用一個個去set袍祖。結果什么也沒有輸出谢揪,也沒有報錯捐凭。很納悶柑营。
然后之好試試set村视。代碼改成:

  siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    Result result = new Result();
                    result.setVolume((int) event.getData(0));
                    result.setSymbol((String) event.getData(1));
                    result.setCount((int) event.getData(2));
                    System.out.println(result.toString());
                }
            }
        });

結果還是沒有輸出,這就很奇怪了奶赔。

測試了很久發(fā)現(xiàn)杠氢。這個回調函數(shù)的even類型很嚴格,long類型不能通過加(int)設置成int绞旅。必須通過 Integer.parseInt(event.getData(2).toString())這樣才行啊温艇。累死終于找到問題了。

最后轉化為json寫入Kafka即可晃琳。參考代碼:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaSink {
    private static Producer<String, String> producer = new KafkaProducer<String, String>(getProperties());

    public static Properties getProperties() {
        Properties properties = new Properties();
        properties.put("retries", 3);
        properties.put("acks", "0");
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //值為字符串類型
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    public static void send(String data) throws ExecutionException, InterruptedException {
        producer.send(new ProducerRecord<>("test", data)).get();
    }
}



===========================
回調函數(shù)改成:
  siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    Result result = new Result(event);
                    try {
                        KafkaSink.send(JSON.toJSONString(result));
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

4卫旱、后記:

后來發(fā)現(xiàn)回調類里面有轉換為map的函數(shù)(toMap)顾翼,map值的類型也不會亂奈泪,可以直接轉換為map后轉化為json。如下:

 siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    System.out.println(JSON.toJSONString(toMap(event)));
                }
            }
        });
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末取逾,一起剝皮案震驚了整個濱河市苹支,隨后出現(xiàn)的幾起案子债蜜,更是在濱河造成了極大的恐慌,老刑警劉巖寻定,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件狼速,死亡現(xiàn)場離奇詭異,居然都是意外死亡向胡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進店門处硬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來荷辕,“玉大人件豌,你說我怎么就攤上這事〖胪” “怎么了?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵蟆盐,是天一觀的道長石挂。 經(jīng)常有香客問我险污,道長,這世上最難降的妖魔是什么蛔糯? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任蚁飒,我火速辦了婚禮,結果婚禮上琼懊,老公的妹妹穿的比我還像新娘。我一直安慰自己哼丈,他們只是感情好,可當我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布饶米。 她就那樣靜靜地躺著车胡,像睡著了一般。 火紅的嫁衣襯著肌膚如雪褪猛。 梳的紋絲不亂的頭發(fā)上羹饰,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天,我揣著相機與錄音笑旺,去河邊找鬼馍资。 笑死,一個胖子當著我的面吹牛鸟蟹,可吹牛的內容都是我干的。 我是一名探鬼主播藤韵,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼泽艘,長吁一口氣:“原來是場噩夢啊……” “哼镐依!你這毒婦竟也來了?” 一聲冷哼從身側響起槐壳,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后绍哎,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡沃于,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年繁莹,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片咨演。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡薄风,死狀恐怖,靈堂內的尸體忽然破棺而出遭赂,到底是詐尸還是另有隱情横辆,我是刑警寧澤,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布困肩,位于F島的核電站脆侮,受9級特大地震影響,放射性物質發(fā)生泄漏他嚷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一卸耘、第九天 我趴在偏房一處隱蔽的房頂上張望粘咖。 院中可真熱鬧,春花似錦、人聲如沸钝域。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽织咧。三九已至漠秋,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間庆锦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工绿渣, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留燕耿,地道東北人。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓淀散,卻偏偏與公主長得像蚜锨,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子亚再,可洞房花燭夜當晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內容