EMQ X Enterprise 新功能 Rule Engine 介紹
[TOC]
EMQ X Enterprise Rule Engine
Rule Engine (以下簡稱規(guī)則引擎) 用于配置 EMQ X 消息流與設(shè)備事件的處理几迄、響應(yīng)規(guī)則再姑。作為 2019 年度 EMQ X 新增重量級功能痕貌,規(guī)則引擎不僅提供了清晰滋觉、靈活的"配置式"的業(yè)務(wù)集成方案,用于簡化業(yè)務(wù)開發(fā)流程底瓣,提升用戶易用性笨触,降低業(yè)務(wù)系統(tǒng)與 EMQ X 的耦合度孝情;也為 EMQ X 的私有功能定制提供了一個更優(yōu)秀的基礎(chǔ)架構(gòu),提升開發(fā)交付速度著恩。
規(guī)則引擎開源版本提供了基礎(chǔ)處理能力院尔,已集成在 EMQ X v3.1.0 中發(fā)布。功能更靈活完備喉誊、可用性定制性更強的規(guī)則引擎正在緊密開發(fā)測試邀摆,計劃集成在下一 EMQ X 商業(yè)版中發(fā)布。
規(guī)則引擎典型應(yīng)用場景舉例:
- 動作監(jiān)聽:智慧家庭智能門鎖開發(fā)中伍茄,門鎖會因為網(wǎng)絡(luò)栋盹、電源故障、人為破壞等原因離線導(dǎo)致功能異常敷矫,使用規(guī)則引擎配置監(jiān)聽離線事件向應(yīng)用服務(wù)推送該故障信息例获,可以在接入層實現(xiàn)第一時間的故障檢測的能力;
- 數(shù)據(jù)篩選:車輛網(wǎng)的卡車車隊管理曹仗,車輛傳感器采集并上報了大量運行數(shù)據(jù)榨汤,應(yīng)用平臺僅關(guān)注車速大于 40 km/h 時的數(shù)據(jù),此場景下可以使用規(guī)則引擎對消息進行條件過濾怎茫,向業(yè)務(wù)消息隊列寫入滿足條件的數(shù)據(jù)收壕;
- 消息路由:智能計費應(yīng)用中,終端設(shè)備通過不同主題區(qū)分業(yè)務(wù)類型轨蛤,可通過配置規(guī)則引擎將計費業(yè)務(wù)的消息接入計費消息隊列并在消息抵達設(shè)備端后發(fā)送確認(rèn)通知到業(yè)務(wù)系統(tǒng)啼器,非計費信息接入其他消息隊列,實現(xiàn)業(yè)務(wù)消息路由配置俱萍;
- 消息編解碼:其他公共協(xié)議/私有 TCP 協(xié)議接入端壳、工控行業(yè)等應(yīng)用場景下,可以通過規(guī)則引擎的本地處理函數(shù)(可在 EMQ X 上定制開發(fā))做二進制/特殊格式消息體的編解碼工作枪蘑;亦可通過規(guī)則引擎的消息路由將相關(guān)消息流向外部計算資源如函數(shù)計算進行處理(可由用戶自行開發(fā)處理邏輯)损谦,將消息轉(zhuǎn)為業(yè)務(wù)易于處理的 JSON 格式岖免,簡化項目集成難度、提升應(yīng)用快速開發(fā)交付能力照捡。
Rule Engine 工作示意圖
[圖片上傳失敗...(image-f45c8b-1557715854322)]
規(guī)則引擎通過嵌入在 EMQ 的消息轉(zhuǎn)發(fā)過程中對數(shù)據(jù)進行過濾颅湘、轉(zhuǎn)換和豐富,實現(xiàn)高效的數(shù)據(jù)處理栗精。新的規(guī)則引擎涵蓋了 EMQ X 中多個插件的功能闯参,將原先插件中獨立的外部資源進行集中管理,實現(xiàn)資源復(fù)用悲立,降低管理監(jiān)控復(fù)雜度鹿寨。同時,規(guī)則引擎將大部分原先只能在應(yīng)用端進行的計算內(nèi)置到 EMQ X 中薪夕,通過計算脚草、過濾、篩選高價值數(shù)據(jù)提高消息處理效率的同時原献,精簡了業(yè)務(wù)架構(gòu)馏慨、減少數(shù)據(jù)傳遞路徑降低了消息處理時延。
規(guī)則引擎相關(guān)的功能包括:
消息規(guī)則:處理設(shè)備到 EMQ X 的消息姑隅,實現(xiàn)條件計算篩選写隶、消息結(jié)構(gòu)調(diào)整,消息重新發(fā)布讲仰、持久化與橋接樟澜;
事件規(guī)則:處理設(shè)備通信生命周期中的各個事件信息,可方便實現(xiàn)設(shè)備狀態(tài)記錄如上下線通知叮盘,認(rèn)證連接記錄秩贰、消息狀態(tài)記錄如消息計費統(tǒng)計等功能;
資源管理:集中管理外部資源柔吼,實現(xiàn)資源復(fù)用毒费,降低管理監(jiān)控復(fù)雜度。
與 EMQ X 其他功能一樣愈魏,規(guī)則引擎同樣提供了類似的 HTTP REST API 方便用戶應(yīng)用開發(fā)集成觅玻,EMQ X Dashboard ( EMQ X 管理控制臺)中亦實現(xiàn)了規(guī)則引擎的可視化創(chuàng)建、編輯培漏、管理功能溪厘。
[圖片上傳失敗...(image-461879-1557715854322)]
[圖片上傳失敗...(image-2b1791-1557715854322)]
消息規(guī)則
借助規(guī)則引擎中的消息規(guī)則,用戶可以將設(shè)備到 EMQ X 的消息路由或?qū)懭氲礁黝悢?shù)據(jù)庫牌柄、消息隊列畸悬、HTTP REST 網(wǎng)關(guān)等對象或資源中,或重新發(fā)送到設(shè)備以實現(xiàn)服務(wù)端計算功能珊佣。
規(guī)則引擎提供了基于 SQL 表達式的數(shù)據(jù)查詢蹋宦、處理功能披粟,讓您先篩選數(shù)據(jù)并轉(zhuǎn)換消息為預(yù)置格式,再配置后續(xù)處理動作冷冗。
SQL 表達式范例如下:
-- 選擇發(fā)往 "t/a" 主題的消息體中的 name 字段, 過濾條件為 name = 'EMQ'
select payload.name as name from "t/a" where name = 'EMQ'
-- 選擇發(fā)往 "command/#" 主題的消息體
select payload from "command/#"
消息規(guī)則典型功能與應(yīng)用場景如下:
按照消息的主題進行過濾守屉,指定要處理的消息,處理后重新發(fā)布到新主題蒿辙;
制定篩選條件拇泛,針對消息正文特定字段進行條件篩選,處理滿足條件的數(shù)據(jù)思灌;
將消息正文轉(zhuǎn)換為預(yù)置結(jié)構(gòu)再處理俺叭,削減內(nèi)部通信與外部存儲、計算開銷习瑰;
使用消息摘要、編碼轉(zhuǎn)換秽荤、數(shù)學(xué)運算等多種預(yù)處理方式處理消息正文或消息正文中指定字段甜奄,在 Broker 中完成簡單計算以降低操作延遲。
每條消息規(guī)則包含以下屬性:
屬性 | 說明 |
---|---|
Source | 要處理的數(shù)據(jù)流來源窃款,基于 MQTT 主題课兄,使用 SQL 中的 FROM 指令篩選 |
條件 | 針對消息正文(僅限 JSON 信息)、消息上下文信息(如 QoS晨继、Client ID烟阐、Username)的條件過濾表達式,用于確定該條規(guī)則的匹配條件紊扬、消息結(jié)構(gòu)蜒茄。使用 SQL 中的 WHERE 指令查詢 |
處理器 | 針對消息正文(僅限 JSON 信息)、消息上下文信息(如 QoS餐屎、Client ID檀葛、Username)的選擇表達式,用于選擇并預(yù)處理指定數(shù)據(jù)腹缩,規(guī)則引擎內(nèi)置多種預(yù)處理方法如消息摘要屿聋、編解碼與編碼轉(zhuǎn)換、簡單數(shù)學(xué)運算藏鹊,使用 SQL 的子句與 SQL 函數(shù)處理润讥。 |
動作 | 消息命中規(guī)則并處理成功后需要觸發(fā)的動作,指定具體的動作操作如寫入數(shù)據(jù)庫 SQL 語句盘寡,發(fā)送到消息隊列的對象楚殿、主題。一條規(guī)則可以定義一個或多個動作竿痰,實現(xiàn)規(guī)則的多端處理勒魔。 |
事件規(guī)則
借助規(guī)則引擎中的事件規(guī)則甫煞,用戶可以處理設(shè)備通信生命周期中的各個事件信息,事件規(guī)則典型功能與應(yīng)用場景如下:
- 設(shè)備各個事件動作 log:冠绢,如設(shè)備連接/斷開連接抚吠、消息發(fā)布、消息傳送/抵達/丟棄弟胀、設(shè)備訂閱/取消訂閱等事件楷力,用于設(shè)備操作記錄與行為分析;
- 設(shè)備上下線通知與記錄:監(jiān)聽
client.connected
與client.disconnected
兩個事件可以實現(xiàn)設(shè)備上下線記錄孵户; - 消息狀態(tài)記錄:監(jiān)聽
message
相關(guān)事件可以實現(xiàn)關(guān)鍵消息指令狀態(tài)監(jiān)測如下發(fā)成功/失敗回調(diào)等萧朝。
附:規(guī)則引擎功能列表
功能 | 說明 | 開源版 | 商業(yè)版 |
---|---|---|---|
基礎(chǔ)功能 | |||
條件篩選 | 通過事件名稱、上下文信息夏哭、消息內(nèi)容進行條件篩選检柬,選擇要處理的消息流 | 支持 | 支持 |
預(yù)處理 | 通過內(nèi)置處理函數(shù)實現(xiàn)消息的簡單數(shù)學(xué)計算、字符處理竖配、編解碼能力何址,輸出預(yù)置格式的消息 | 支持 | 支持 |
資源管理 | 集中管理外部資源,實現(xiàn)資源復(fù)用进胯,降低管理監(jiān)控復(fù)雜度 | 支持 | 支持 |
消息輸出 | |||
發(fā)布到指定主題 | 將規(guī)則處理后的消息重發(fā)布(republish)到指定主題進行載處理或供訂閱端使用 | 支持 | 支持 |
發(fā)送到 WebHook | 將消息發(fā)布到 HTTP API 網(wǎng)關(guān) | 支持 | 支持 |
發(fā)送到消息隊列 | 支持 Kafka用爪、RabbitMQ 等私有或公有云消息中間件 | 支持 | |
寫入到數(shù)據(jù)庫 | 包括 MySQL、PostgreSQL胁镐、MongoDB偎血、Redis 等私有或公有云數(shù)據(jù)庫 | 支持 | |
發(fā)送到另一個 MQTT Broker | 通過 MQTT 協(xié)議將消息發(fā)布到另一個 MQTT Broker 指定主題,包括但不限于 EMQ X盯漂、Azure IoT Hub颇玷、AWS IoT、阿里云物聯(lián)網(wǎng)平臺 | 支持 | |
擴展定制 | |||
處理功能定制 | 定制私有內(nèi)置處理函數(shù)就缆,靈活處理私有協(xié)議亚隙、特殊編碼消息 | 支持 | |
消息輸出定制 | 定制私有消息輸出方式,規(guī)則輸出端更靈活 | 支持 |