半個小時搭建自己的實時監(jiān)控系統(tǒng)

首先給直觀的看看監(jiān)控效果圖:


image.png

數(shù)據(jù)流架構(gòu)如下所示芒帕,通過Flume采集日志數(shù)據(jù)余佃,并寫入到kafka中堤舒,F(xiàn)link讀取kafka數(shù)據(jù)經(jīng)過處理后再次放入到kafka中商模,監(jiān)控頁面通過websocket監(jiān)聽kafka中數(shù)據(jù)實現(xiàn)實時的數(shù)據(jù)顯示。


image.png

整體技術(shù)框架基于ruoyi單機(jī)版本搭建
新增加的文件如下:


image.png

第一步先啟動Flume厦幅,F(xiàn)lume監(jiān)聽文件,我這里通過tail命令監(jiān)聽文件新寫入的內(nèi)容

./flume-ng agent -c /Users/dbq/Documents/middleware/flume/master/apache-flume-1.9.0-bin/conf -f /Users/dbq/Documents/middleware/flume/master/apache-flume-1.9.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console

配置文件如下慨飘,F(xiàn)lume實時監(jiān)控文件數(shù)據(jù)确憨,并寫入到kafka test 主題中

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#對于source的配置描述 監(jiān)聽文件中的新增數(shù)據(jù) exec
a1.sources.r1.type = exec
a1.sources.r1.command  = tail -F /Users/d/Documents/middleware/flume/data/log.00
a1.sources.ri.shell = /bin/sh -c
#對于sink的配置描述 使用kafka做數(shù)據(jù)的消費
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9092,127.0.0.1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 5
#對于channel的配置描述 使用內(nèi)存緩沖區(qū)域做數(shù)據(jù)的臨時緩存
a1.channels.c1.type = memory

#通過channel c1將source r1和sink k1關(guān)聯(lián)起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

第二步:系統(tǒng)啟動的時候監(jiān)聽kafka topic,并通過Flink進(jìn)行流式計算瓤的,Sink負(fù)責(zé)將處理后的數(shù)據(jù)輸出到外部系統(tǒng)中休弃。

@Component
public class Runner implements CommandLineRunner {
    @Override
    public void run(String... args) throws Exception {
        System.out.println("--------------");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), getProperties());
        DataStream<String> dataStream = env.addSource(consumer);

        //模擬業(yè)務(wù)過程流式處理
        DataStream<String> after = dataStream.map((MapFunction<String, String>) s -> {
            MonitorObject mo = getMonitorObject(s);
            return JSON.toJSONString(mo);
        });
        after.addSink(new MySink());
        env.execute("spring flink demo");
    }

    /**
     * 模擬二次處理
     */
    private static MonitorObject getMonitorObject(String s) {
        MonitorObject mo = JSON.toJavaObject(JSON.parseObject(s), MonitorObject.class);
        mo.setOil(mo.getOil() % 2);
        mo.setSpeed(mo.getSpeed()%2);
        return mo;
    }

    private Properties getProperties() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:3181");
        properties.setProperty("group.id", "flink-group");
        properties.setProperty("auto.offset.reset", "latest");
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;

    }
}

第三步:sink中講數(shù)據(jù)重新寫入到kafka中,這里重寫寫入到kafka目的是起到平滑推送數(shù)據(jù)到前端頁面的效果圈膏,也方便以廣播的方式推送到其他業(yè)務(wù)系統(tǒng)塔猾,其他業(yè)務(wù)系統(tǒng)只需要訂閱test_after主題,就可以獲得Flink處理之后的數(shù)據(jù)

@Slf4j
@Component
public class MySink extends RichSinkFunction<String> {
    private AnnotationConfigApplicationContext ctx;

    private final static String topic = "test_after";

    public MySink() {
        log.info("mySink new");
    }

    @Override
    public void open(Configuration paramters) {
        this.ctx = new AnnotationConfigApplicationContext(Config.class);
        log.info("my sink open");
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        log.info("[flink監(jiān)控kafka數(shù)據(jù)]:{}", value);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<String> data = new ArrayList<>();
        data.add(value);

        DataStreamSource<String> source = env.fromCollection(data);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
                topic,
                (KafkaSerializationSchema<String>) (element, timestamp) -> new ProducerRecord<>(
                        topic,
                        element.getBytes()
                ),
                getProperties(),
                FlinkKafkaProducer.Semantic.NONE
        );
        //重新寫入到kafka
        source.addSink(producer);
        env.execute();
    }

    @Override
    public void close() {
        ctx.close();
        log.info("my sink close");
    }

    private Properties getProperties() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:3181");
        properties.setProperty("group.id", "flink-group");
        properties.setProperty("auto.offset.reset", "latest");
        return properties;

    }

第四步:監(jiān)聽kafka稽坤,并通過websocket推送到前端頁面

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(groupId = "3",topics = "test_after")
    public void listen(String msg){
        System.out.println("====================> " + msg);
        MonitorObject mo = JSON.toJavaObject(JSON.parseObject(msg), MonitorObject.class);
        WebSocketUsers.sendMessageToUsersByText(mo);
    }
}

其他代碼
WebSocketConfig.java

@Configuration
public class WebSocketConfig
{
    @Bean
    public ServerEndpointExporter serverEndpointExporter()
    {
        return new ServerEndpointExporter();
    }
}

WebSocketServer.java

@Component
@ServerEndpoint("/websocket/message")
public class WebSocketServer
{
    /**
     * WebSocketServer 日志控制器
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);

    /**
     * 默認(rèn)最多允許同時在線人數(shù)100
     */
    public static int socketMaxOnlineCount = 100;

    private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);

    /**
     * 連接建立成功調(diào)用的方法
     */
    @OnOpen
    public void onOpen(Session session) throws Exception
    {
        boolean semaphoreFlag = false;
        // 嘗試獲取信號量
        semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
        if (!semaphoreFlag)
        {
            // 未獲取到信號量
            LOGGER.error("\n 當(dāng)前在線人數(shù)超過限制數(shù)- {}", socketMaxOnlineCount);
//            WebSocketUsers.sendMessageToUserByText(session, "當(dāng)前在線人數(shù)超過限制數(shù):" + socketMaxOnlineCount);
            session.close();
        }
        else
        {
            // 添加用戶
            WebSocketUsers.put(session.getId(), session);
            LOGGER.info("\n 建立連接 - {}", session);
            LOGGER.info("\n 當(dāng)前人數(shù) - {}", WebSocketUsers.getUsers().size());
//            WebSocketUsers.sendMessageToUserByText(session, "連接成功");
        }
    }

    /**
     * 連接關(guān)閉時處理
     */
    @OnClose
    public void onClose(Session session)
    {
        LOGGER.info("\n 關(guān)閉連接 - {}", session);
        // 移除用戶
        WebSocketUsers.remove(session.getId());
        // 獲取到信號量則需釋放
        SemaphoreUtils.release(socketSemaphore);
    }

    /**
     * 拋出異常時處理
     */
    @OnError
    public void onError(Session session, Throwable exception) throws Exception
    {
        if (session.isOpen())
        {
            // 關(guān)閉連接
            session.close();
        }
        String sessionId = session.getId();
        LOGGER.info("\n 連接異常 - {}", sessionId);
        LOGGER.info("\n 異常信息 - {}", exception);
        // 移出用戶
        WebSocketUsers.remove(sessionId);
        // 獲取到信號量則需釋放
        SemaphoreUtils.release(socketSemaphore);
    }

    /**
     * 服務(wù)器接收到客戶端消息時調(diào)用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session)
    {
        String msg = message.replace("你", "我").replace("嗎", "");
        WebSocketUsers.sendMessageToUserByText(session, msg);
    }
}

WebSocketUsers.java

/**
 * websocket 客戶端用戶集
 * 
 * @author ruoyi
 */
public class WebSocketUsers
{
    /**
     * WebSocketUsers 日志控制器
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);

    /**
     * 用戶集
     */
    private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();

    /**
     * 存儲用戶
     *
     * @param key 唯一鍵
     * @param session 用戶信息
     */
    public static void put(String key, Session session)
    {
        USERS.put(key, session);
    }

    /**
     * 移除用戶
     *
     * @param session 用戶信息
     *
     * @return 移除結(jié)果
     */
    public static boolean remove(Session session)
    {
        String key = null;
        boolean flag = USERS.containsValue(session);
        if (flag)
        {
            Set<Map.Entry<String, Session>> entries = USERS.entrySet();
            for (Map.Entry<String, Session> entry : entries)
            {
                Session value = entry.getValue();
                if (value.equals(session))
                {
                    key = entry.getKey();
                    break;
                }
            }
        }
        else
        {
            return true;
        }
        return remove(key);
    }

    /**
     * 移出用戶
     *
     * @param key 鍵
     */
    public static boolean remove(String key)
    {
        LOGGER.info("\n 正在移出用戶 - {}", key);
        Session remove = USERS.remove(key);
        if (remove != null)
        {
            boolean containsValue = USERS.containsValue(remove);
            LOGGER.info("\n 移出結(jié)果 - {}", containsValue ? "失敗" : "成功");
            return containsValue;
        }
        else
        {
            return true;
        }
    }

    /**
     * 獲取在線用戶列表
     *
     * @return 返回用戶集合
     */
    public static Map<String, Session> getUsers()
    {
        return USERS;
    }

    /**
     * 群發(fā)消息文本消息
     *
     * @param message 消息內(nèi)容
     */
    public static void sendMessageToUsersByText(Object message)
    {
        Collection<Session> values = USERS.values();
        for (Session value : values)
        {
            sendMessageToUserByText(value, message);
        }
    }

    /**
     * 發(fā)送文本消息
     *
     * @param session 自己的用戶名
     * @param message 消息內(nèi)容
     */
    public static void sendMessageToUserByText(Session session, Object message)
    {
        if (session != null)
        {
            try
            {
                session.getBasicRemote().sendText(JSON.toJSONString(message));
            }
            catch (IOException e)
            {
                LOGGER.error("\n[發(fā)送消息異常]", e);
            }
        }
        else
        {
            LOGGER.info("\n[你已離線]");
        }
    }
}

前端代碼

var url = "ws://127.0.0.1:80/websocket/message";
        var ws = new WebSocket(url);
        ws.onopen = function() {
            $('#text_content').append('已經(jīng)打開連接!' + '\n');
        }
        ws.onmessage = function(event) {
            console.log(event.data)
            var obj = JSON.parse(event.data);
            gaugeChart.setOption({
                series : [
                    {
                        name: '速度',
                        data: [{value: obj.speed}]
                    },
                    {
                        name: '轉(zhuǎn)速',
                        data: [{value: obj.rotate_speed}]
                    },
                    {
                        name: '油耗',
                        data: [{value: obj.oil}]
                    }
                ]
            })
        }

模擬數(shù)據(jù)的持續(xù)生成丈甸,這里每秒鐘生成一條數(shù)據(jù),以json格式寫入到日志文件中

while true
do
 echo "{\"speed\":$((RANDOM %220)),\"rotate_speed\":$((RANDOM %7)),\"oil\":$((RANDOM %3))}" >> log.00
 sleep 1
done
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末尿褪,一起剝皮案震驚了整個濱河市睦擂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌杖玲,老刑警劉巖顿仇,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異摆马,居然都是意外死亡臼闻,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進(jìn)店門囤采,熙熙樓的掌柜王于貴愁眉苦臉地迎上來述呐,“玉大人,你說我怎么就攤上這事斑唬∈新瘢” “怎么了黎泣?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長缤谎。 經(jīng)常有香客問我抒倚,道長,這世上最難降的妖魔是什么坷澡? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任托呕,我火速辦了婚禮,結(jié)果婚禮上频敛,老公的妹妹穿的比我還像新娘项郊。我一直安慰自己,他們只是感情好斟赚,可當(dāng)我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布着降。 她就那樣靜靜地躺著,像睡著了一般拗军。 火紅的嫁衣襯著肌膚如雪任洞。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天发侵,我揣著相機(jī)與錄音交掏,去河邊找鬼。 笑死刃鳄,一個胖子當(dāng)著我的面吹牛盅弛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播叔锐,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼挪鹏,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了掌腰?” 一聲冷哼從身側(cè)響起狰住,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎齿梁,沒想到半個月后催植,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡勺择,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年创南,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片省核。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡稿辙,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出气忠,到底是詐尸還是另有隱情邻储,我是刑警寧澤赋咽,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站吨娜,受9級特大地震影響脓匿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜宦赠,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一陪毡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧勾扭,春花似錦毡琉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至身辨,卻和暖如春虱歪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背栅表。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留师枣,地道東北人怪瓶。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像践美,于是被迫代替她去往敵國和親洗贰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容