1、項目依賴:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<siddhi.version>5.1.11</siddhi.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-query-api</artifactId>
<version>${siddhi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.61</version>
</dependency>
<dependency>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
<version>${siddhi.version}</version>
</dependency>
<dependency>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-query-compiler</artifactId>
<version>${siddhi.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>4.12</version>
</dependency>
</dependencies>
2、示列項目:
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
public class HelloWorld {
public static void main(String[] args) throws InterruptedException {
SiddhiManager siddhiManager = new SiddhiManager();
String siddhiApp =
"define stream cseEventStream (id string,symbol string, price float, volume int,time long); " +
"from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count group by id having count>=2 insert into outputStream;"
+ "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count group by id having count>=3 insert into outputStream;";
;
// Generating runtime
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
siddhiAppRuntime.start();
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
System.out.println(event);
}
}
});
// Sending events to Siddhi
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 800, 1575011611229L}); //11-29 15:13:31
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575111612330L});//11-30 19:00:12
inputHandler.send(new Object[]{"4", "to", 50f, 30, 1575011639158L});
inputHandler.send(new Object[]{"5", "IBM", 76.6f, 400, 1575011639158L});
inputHandler.send(new Object[]{"6", "siddhi!", 45.6f, 200, 1575011639158L});
siddhiAppRuntime.shutdown();
siddhiManager.shutdown();
}
}
3咽块、在回調函數(shù)中將event封裝成固定對象輸出到Kafka侈沪。此處需要修改addCallback中的函數(shù)晚凿。著急的可以直接先看第四點后記,防止向我一樣繞彎应役。
封裝的對象Result:
public class Result implements Serializable {
private int volume;
private String symbol;
private int count;
public Result(Event event) {
this.volume = (int) event.getData(0);
this.symbol = (String) event.getData(1);
this.count = (int) event.getData(2);
}
public Result() {
}
public int getVolume() {
return volume;
}
public void setVolume(int volume) {
this.volume = volume;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "Result{" +
"volume=" + volume +
", symbol='" + symbol + '\'' +
", count=" + count +
'}';
}
}
在測試時將addCallback改成
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
Result result = new Result(event);
System.out.println(result.toString());
}
}
});
本來想著直接new一個Result對象還比較快箩祥。不用一個個去set袍祖。結果什么也沒有輸出谢揪,也沒有報錯捐凭。很納悶柑营。
然后之好試試set村视。代碼改成:
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
Result result = new Result();
result.setVolume((int) event.getData(0));
result.setSymbol((String) event.getData(1));
result.setCount((int) event.getData(2));
System.out.println(result.toString());
}
}
});
結果還是沒有輸出,這就很奇怪了奶赔。
測試了很久發(fā)現(xiàn)杠氢。這個回調函數(shù)的even類型很嚴格,long類型不能通過加(int)設置成int绞旅。必須通過 Integer.parseInt(event.getData(2).toString())這樣才行啊温艇。累死終于找到問題了。
最后轉化為json寫入Kafka即可晃琳。參考代碼:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaSink {
private static Producer<String, String> producer = new KafkaProducer<String, String>(getProperties());
public static Properties getProperties() {
Properties properties = new Properties();
properties.put("retries", 3);
properties.put("acks", "0");
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//值為字符串類型
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
public static void send(String data) throws ExecutionException, InterruptedException {
producer.send(new ProducerRecord<>("test", data)).get();
}
}
===========================
回調函數(shù)改成:
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
Result result = new Result(event);
try {
KafkaSink.send(JSON.toJSONString(result));
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
4卫旱、后記:
后來發(fā)現(xiàn)回調類里面有轉換為map的函數(shù)(toMap)顾翼,map值的類型也不會亂奈泪,可以直接轉換為map后轉化為json。如下:
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
System.out.println(JSON.toJSONString(toMap(event)));
}
}
});