1.導入依賴包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.0</version>
</dependency>
2.實現(xiàn)
public class kafkaStreamDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667.node3:6667");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("kafkaStream");
//對value進行操作,構造一個ValueMapper
final KTable<String, Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
//數(shù)據(jù)格式:java,scala,python,c
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(","));//按照逗號切割藏澳,并變?yōu)榧? }
}).map(new KeyValueMapper<String, String, KeyValue<String, ?>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {//只取value,按照單詞進行分組
return new KeyValue<>(value, value);
}
}).groupByKey().count("countstore");
counts.print();
final KafkaStreams streams = new KafkaStreams(builder, props);
//啟動與關閉,開啟一個任務執(zhí)行
final CountDownLatch latch = new CountDownLatch(1);
//線程完畢以后釋放流
Runtime.getRuntime().addShutdownHook(new Thread("word-count") {
@Override
public void run() {
streams.close();
latch.countDown();//流關閉的同時赶舆,latch值變?yōu)?
}
});
try {
streams.start();
latch.await();//線程被掛起,等待latch的值變?yōu)?才重新開始執(zhí)行
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (StreamsException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.結果
輸入數(shù)據(jù)格式:
java,scala,python,c
java,java,c
scala,java,java
java,scala,python,c
java,scala,python,c
.....
得到的結果:
[KSTREAM-AGGREGATE-0000000003]: java , (9<-null)
[KSTREAM-AGGREGATE-0000000003]: scala , (7<-null)
[KSTREAM-AGGREGATE-0000000003]: python , (5<-null)
[KSTREAM-AGGREGATE-0000000003]: c , (5<-null)
kafka-stream作為輕量級的流式處理腺逛,處理簡單的流業(yè)務敷燎,如日志監(jiān)控等抖拴,簡單指標監(jiān)控等還是很有必要的绵跷。