首先給直觀的看看監(jiān)控效果圖:
數(shù)據(jù)流架構(gòu)如下所示芒帕,通過Flume采集日志數(shù)據(jù)余佃,并寫入到kafka中堤舒,F(xiàn)link讀取kafka數(shù)據(jù)經(jīng)過處理后再次放入到kafka中商模,監(jiān)控頁面通過websocket監(jiān)聽kafka中數(shù)據(jù)實現(xiàn)實時的數(shù)據(jù)顯示。
整體技術(shù)框架基于ruoyi單機(jī)版本搭建
新增加的文件如下:
第一步先啟動Flume厦幅,F(xiàn)lume監(jiān)聽文件,我這里通過tail命令監(jiān)聽文件新寫入的內(nèi)容
./flume-ng agent -c /Users/dbq/Documents/middleware/flume/master/apache-flume-1.9.0-bin/conf -f /Users/dbq/Documents/middleware/flume/master/apache-flume-1.9.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console
配置文件如下慨飘,F(xiàn)lume實時監(jiān)控文件數(shù)據(jù)确憨,并寫入到kafka test 主題中
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#對于source的配置描述 監(jiān)聽文件中的新增數(shù)據(jù) exec
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /Users/d/Documents/middleware/flume/data/log.00
a1.sources.ri.shell = /bin/sh -c
#對于sink的配置描述 使用kafka做數(shù)據(jù)的消費
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9092,127.0.0.1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 5
#對于channel的配置描述 使用內(nèi)存緩沖區(qū)域做數(shù)據(jù)的臨時緩存
a1.channels.c1.type = memory
#通過channel c1將source r1和sink k1關(guān)聯(lián)起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
第二步:系統(tǒng)啟動的時候監(jiān)聽kafka topic,并通過Flink進(jìn)行流式計算瓤的,Sink負(fù)責(zé)將處理后的數(shù)據(jù)輸出到外部系統(tǒng)中休弃。
@Component
public class Runner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
System.out.println("--------------");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), getProperties());
DataStream<String> dataStream = env.addSource(consumer);
//模擬業(yè)務(wù)過程流式處理
DataStream<String> after = dataStream.map((MapFunction<String, String>) s -> {
MonitorObject mo = getMonitorObject(s);
return JSON.toJSONString(mo);
});
after.addSink(new MySink());
env.execute("spring flink demo");
}
/**
* 模擬二次處理
*/
private static MonitorObject getMonitorObject(String s) {
MonitorObject mo = JSON.toJavaObject(JSON.parseObject(s), MonitorObject.class);
mo.setOil(mo.getOil() % 2);
mo.setSpeed(mo.getSpeed()%2);
return mo;
}
private Properties getProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:3181");
properties.setProperty("group.id", "flink-group");
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
}
第三步:sink中講數(shù)據(jù)重新寫入到kafka中,這里重寫寫入到kafka目的是起到平滑推送數(shù)據(jù)到前端頁面的效果圈膏,也方便以廣播的方式推送到其他業(yè)務(wù)系統(tǒng)塔猾,其他業(yè)務(wù)系統(tǒng)只需要訂閱test_after主題,就可以獲得Flink處理之后的數(shù)據(jù)
@Slf4j
@Component
public class MySink extends RichSinkFunction<String> {
private AnnotationConfigApplicationContext ctx;
private final static String topic = "test_after";
public MySink() {
log.info("mySink new");
}
@Override
public void open(Configuration paramters) {
this.ctx = new AnnotationConfigApplicationContext(Config.class);
log.info("my sink open");
}
@Override
public void invoke(String value, Context context) throws Exception {
log.info("[flink監(jiān)控kafka數(shù)據(jù)]:{}", value);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<String> data = new ArrayList<>();
data.add(value);
DataStreamSource<String> source = env.fromCollection(data);
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
topic,
(KafkaSerializationSchema<String>) (element, timestamp) -> new ProducerRecord<>(
topic,
element.getBytes()
),
getProperties(),
FlinkKafkaProducer.Semantic.NONE
);
//重新寫入到kafka
source.addSink(producer);
env.execute();
}
@Override
public void close() {
ctx.close();
log.info("my sink close");
}
private Properties getProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:3181");
properties.setProperty("group.id", "flink-group");
properties.setProperty("auto.offset.reset", "latest");
return properties;
}
第四步:監(jiān)聽kafka稽坤,并通過websocket推送到前端頁面
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(groupId = "3",topics = "test_after")
public void listen(String msg){
System.out.println("====================> " + msg);
MonitorObject mo = JSON.toJavaObject(JSON.parseObject(msg), MonitorObject.class);
WebSocketUsers.sendMessageToUsersByText(mo);
}
}
其他代碼
WebSocketConfig.java
@Configuration
public class WebSocketConfig
{
@Bean
public ServerEndpointExporter serverEndpointExporter()
{
return new ServerEndpointExporter();
}
}
WebSocketServer.java
@Component
@ServerEndpoint("/websocket/message")
public class WebSocketServer
{
/**
* WebSocketServer 日志控制器
*/
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 默認(rèn)最多允許同時在線人數(shù)100
*/
public static int socketMaxOnlineCount = 100;
private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
/**
* 連接建立成功調(diào)用的方法
*/
@OnOpen
public void onOpen(Session session) throws Exception
{
boolean semaphoreFlag = false;
// 嘗試獲取信號量
semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
if (!semaphoreFlag)
{
// 未獲取到信號量
LOGGER.error("\n 當(dāng)前在線人數(shù)超過限制數(shù)- {}", socketMaxOnlineCount);
// WebSocketUsers.sendMessageToUserByText(session, "當(dāng)前在線人數(shù)超過限制數(shù):" + socketMaxOnlineCount);
session.close();
}
else
{
// 添加用戶
WebSocketUsers.put(session.getId(), session);
LOGGER.info("\n 建立連接 - {}", session);
LOGGER.info("\n 當(dāng)前人數(shù) - {}", WebSocketUsers.getUsers().size());
// WebSocketUsers.sendMessageToUserByText(session, "連接成功");
}
}
/**
* 連接關(guān)閉時處理
*/
@OnClose
public void onClose(Session session)
{
LOGGER.info("\n 關(guān)閉連接 - {}", session);
// 移除用戶
WebSocketUsers.remove(session.getId());
// 獲取到信號量則需釋放
SemaphoreUtils.release(socketSemaphore);
}
/**
* 拋出異常時處理
*/
@OnError
public void onError(Session session, Throwable exception) throws Exception
{
if (session.isOpen())
{
// 關(guān)閉連接
session.close();
}
String sessionId = session.getId();
LOGGER.info("\n 連接異常 - {}", sessionId);
LOGGER.info("\n 異常信息 - {}", exception);
// 移出用戶
WebSocketUsers.remove(sessionId);
// 獲取到信號量則需釋放
SemaphoreUtils.release(socketSemaphore);
}
/**
* 服務(wù)器接收到客戶端消息時調(diào)用的方法
*/
@OnMessage
public void onMessage(String message, Session session)
{
String msg = message.replace("你", "我").replace("嗎", "");
WebSocketUsers.sendMessageToUserByText(session, msg);
}
}
WebSocketUsers.java
/**
* websocket 客戶端用戶集
*
* @author ruoyi
*/
public class WebSocketUsers
{
/**
* WebSocketUsers 日志控制器
*/
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);
/**
* 用戶集
*/
private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();
/**
* 存儲用戶
*
* @param key 唯一鍵
* @param session 用戶信息
*/
public static void put(String key, Session session)
{
USERS.put(key, session);
}
/**
* 移除用戶
*
* @param session 用戶信息
*
* @return 移除結(jié)果
*/
public static boolean remove(Session session)
{
String key = null;
boolean flag = USERS.containsValue(session);
if (flag)
{
Set<Map.Entry<String, Session>> entries = USERS.entrySet();
for (Map.Entry<String, Session> entry : entries)
{
Session value = entry.getValue();
if (value.equals(session))
{
key = entry.getKey();
break;
}
}
}
else
{
return true;
}
return remove(key);
}
/**
* 移出用戶
*
* @param key 鍵
*/
public static boolean remove(String key)
{
LOGGER.info("\n 正在移出用戶 - {}", key);
Session remove = USERS.remove(key);
if (remove != null)
{
boolean containsValue = USERS.containsValue(remove);
LOGGER.info("\n 移出結(jié)果 - {}", containsValue ? "失敗" : "成功");
return containsValue;
}
else
{
return true;
}
}
/**
* 獲取在線用戶列表
*
* @return 返回用戶集合
*/
public static Map<String, Session> getUsers()
{
return USERS;
}
/**
* 群發(fā)消息文本消息
*
* @param message 消息內(nèi)容
*/
public static void sendMessageToUsersByText(Object message)
{
Collection<Session> values = USERS.values();
for (Session value : values)
{
sendMessageToUserByText(value, message);
}
}
/**
* 發(fā)送文本消息
*
* @param session 自己的用戶名
* @param message 消息內(nèi)容
*/
public static void sendMessageToUserByText(Session session, Object message)
{
if (session != null)
{
try
{
session.getBasicRemote().sendText(JSON.toJSONString(message));
}
catch (IOException e)
{
LOGGER.error("\n[發(fā)送消息異常]", e);
}
}
else
{
LOGGER.info("\n[你已離線]");
}
}
}
前端代碼
var url = "ws://127.0.0.1:80/websocket/message";
var ws = new WebSocket(url);
ws.onopen = function() {
$('#text_content').append('已經(jīng)打開連接!' + '\n');
}
ws.onmessage = function(event) {
console.log(event.data)
var obj = JSON.parse(event.data);
gaugeChart.setOption({
series : [
{
name: '速度',
data: [{value: obj.speed}]
},
{
name: '轉(zhuǎn)速',
data: [{value: obj.rotate_speed}]
},
{
name: '油耗',
data: [{value: obj.oil}]
}
]
})
}
模擬數(shù)據(jù)的持續(xù)生成丈甸,這里每秒鐘生成一條數(shù)據(jù),以json格式寫入到日志文件中
while true
do
echo "{\"speed\":$((RANDOM %220)),\"rotate_speed\":$((RANDOM %7)),\"oil\":$((RANDOM %3))}" >> log.00
sleep 1
done