(1)sparkstreaming從kafka接入實時數(shù)據(jù)流最終實現(xiàn)數(shù)據(jù)可視化展示,我們先看下整體方案架構(gòu):
(2)方案說明:
1)我們通過kafka與各個業(yè)務系統(tǒng)的數(shù)據(jù)對接跨琳,將各系統(tǒng)中的數(shù)據(jù)實時接到kafka自点;
2)通過sparkstreaming接入kafka數(shù)據(jù)流,定義時間窗口和計算窗口大小脉让,業(yè)務計算邏輯處理桂敛;
3)將結(jié)果數(shù)據(jù)寫入到mysql功炮;
4)通過可視化平臺接入mysql數(shù)據(jù)庫,這里使用的是NBI大數(shù)據(jù)可視化構(gòu)建平臺术唬;
5)在平臺上通過拖拽式構(gòu)建各種數(shù)據(jù)應用薪伏,數(shù)據(jù)展示;
(3)代碼演示:
定義一個kafka生產(chǎn)者粗仓,模擬數(shù)據(jù)源
package com.producers;
import com.alibaba.fastjson.JSONObject;
import com.pojo.WaterSensor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.Random;
/**
* Created by lj on 2022-07-18.
*/
public class Kafaka_Producer {
public final static String bootstrapServers = "127.0.0.1:9092";
public static void main(String[] args) {
Properties props = new Properties();
//設置Kafka服務器地址
props.put("bootstrap.servers", bootstrapServers);
//設置數(shù)據(jù)key的序列化處理類
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//設置數(shù)據(jù)value的序列化處理類
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
int i = 0;
Random r=new Random();
String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};
while(true) {
Thread.sleep(2000);
WaterSensor waterSensor = new WaterSensor(lang[r.nextInt(lang.length)]+"_kafka",i,i);
i++;
String msg = JSONObject.toJSONString(waterSensor);
System.out.println(msg);
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("kafka_data_waterSensor", null, null, msg)).get();
// System.out.println("recordMetadata: {"+ recordMetadata +"}");
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
根據(jù)業(yè)務需要嫁怀,定義各種消息對象
package com.pojo;
import java.io.Serializable;
import java.util.Date;
/**
* Created by lj on 2022-07-13.
*/
public class WaterSensor implements Serializable {
public String id;
public long ts;
public int vc;
public WaterSensor(){
}
public WaterSensor(String id,long ts,int vc){
this.id = id;
this.ts = ts;
this.vc = vc;
}
public int getVc() {
return vc;
}
public void setVc(int vc) {
this.vc = vc;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public long getTs() {
return ts;
}
public void setTs(long ts) {
this.ts = ts;
}
}
sparkstreaming數(shù)據(jù)流計算
package com.examples;
import com.alibaba.fastjson.JSONObject;
import com.pojo.WaterSensor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.*;
/**
* Created by lj on 2022-07-18.
*/
public class SparkSql_Kafka {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String topics = "kafka_data_waterSensor";
private static String brokers = "127.0.0.1:9092";
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
//獲得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(3));
/**
* 設置日志的級別: 避免日志重復
*/
ssc.sparkContext().setLogLevel("ERROR");
Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
//kafka相關(guān)參數(shù),必要借浊!缺了會報錯
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers) ;
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("group.id", "group1");
kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//通過KafkaUtils.createDirectStream(...)獲得kafka數(shù)據(jù)塘淑,kafka相關(guān)參數(shù)由kafkaParams指定
JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
);
JavaDStream<WaterSensor> mapDStream = lines.map(new Function<ConsumerRecord<Object, Object>, WaterSensor>() {
@Override
public WaterSensor call(ConsumerRecord<Object, Object> s) throws Exception {
WaterSensor waterSensor = JSONObject.parseObject(s.value().toString(),WaterSensor.class);
return waterSensor;
}
}).window(Durations.minutes(9), Durations.minutes(6)); //指定窗口大小 和 滑動頻率 必須是批處理時間的整數(shù)倍;
mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
@Override
public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());
Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
// 創(chuàng)建臨時表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select * from log");
System.out.println("========= " + time + "=========");
//輸出前20條數(shù)據(jù)
result.show();
//數(shù)據(jù)寫入mysql
writeDataToMysql(result);
}
});
//開始作業(yè)
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}
NBI大數(shù)據(jù)可視化構(gòu)建平臺對接mysql,構(gòu)建數(shù)據(jù)應用:
NBI可視化