一、概念
單進(jìn)程版與Storm版的流計(jì)算實(shí)現(xiàn)有許多相似的概念,其中最重要的包括:Topology、Spout梗夸、Bolt。Topology是一個(gè)由Spout節(jié)點(diǎn)和Bolt節(jié)點(diǎn)組成的有向無(wú)環(huán)圖(或稱(chēng)有方向的樹(shù))号醉,一個(gè)輕應(yīng)用可以有一個(gè)或多個(gè)Topology反症。Spout是數(shù)據(jù)的來(lái)源,它是Topology的根節(jié)點(diǎn)畔派,每個(gè)Topology只有一個(gè)Spout铅碍。Bolt是真正負(fù)責(zé)處理業(yè)務(wù)邏輯的節(jié)點(diǎn)。
Storm將父節(jié)點(diǎn)傳輸給子節(jié)點(diǎn)的數(shù)據(jù)稱(chēng)為T(mén)uple线椰,單進(jìn)程版的流計(jì)算與此對(duì)應(yīng)的概念是BoltParameter胞谈。一組Tuple或BoltParameter組成Stream,Stream是一個(gè)抽象概念憨愉,沒(méi)有具體的實(shí)現(xiàn)呜魄。
單進(jìn)程版相當(dāng)于Storm版的簡(jiǎn)化,有一些Storm版的概念這里不會(huì)有莱衩,比如:
- 分組(grouping)
- Worker
- Task
- Reliability
上面列出的概念基本都與Storm的并行處理有關(guān),單進(jìn)程版的流計(jì)算不存在并發(fā)問(wèn)題娇澎,也就沒(méi)有這些概念笨蚁。
在編寫(xiě)Storm版的流計(jì)算程序時(shí),很多非業(yè)務(wù)邏輯的功能是Storm負(fù)責(zé)維護(hù)和處理的,而單進(jìn)程版的程序需要自己來(lái)處理括细,這引入了一些新的概念:
- SpoutProcessor伪很,這個(gè)類(lèi)負(fù)責(zé)維護(hù)Spout節(jié)點(diǎn)
- BoltProcessor,這個(gè)類(lèi)負(fù)責(zé)維護(hù)Bolt節(jié)點(diǎn)
下文將詳細(xì)解釋SpoutProcessor和BoltProcessor奋单。
二锉试、Topology的實(shí)現(xiàn)
之前提到Topology樹(shù)是由Spout和Bolt組成,實(shí)際上Spout和Bolt里邊是業(yè)務(wù)邏輯相關(guān)的定義览濒,真正讓它們組成一棵樹(shù)的是SpoutProcessor和BoltProcessor呆盖,這兩個(gè)類(lèi)都實(shí)現(xiàn)了ProcessNode接口,該接口定義如下:
public interface ProcessNode {
/**
* 提取本處理結(jié)點(diǎn)的名字
*/
String getName();
/**
* 提取本結(jié)點(diǎn)的Id贷笛,Id用來(lái)記錄節(jié)點(diǎn)間的關(guān)系应又,全局唯一
*/
String getId();
/**
* 向本處理結(jié)點(diǎn)增加一個(gè)兒子結(jié)點(diǎn)
*/
void addchild(ProcessNode child);
/**
* 調(diào)用本處理結(jié)點(diǎn)的處理邏輯對(duì)數(shù)據(jù)進(jìn)行處理
*/
void run(Object param);
}
SpoutProcesser的定義:
public class SpoutProcessor implements ProcessNode {
private static Logger logger = LoggerFactory.getLogger(SpoutProcessor.class);
/**
* 本拓?fù)浯幚淼臄?shù)據(jù)隊(duì)列
*/
KafkaDataQueue queue = new KafkaDataQueue();
/**
* 數(shù)據(jù)生成器
*/
private StreamSpout spout;
/**
* 本處理器對(duì)應(yīng)的下一代處理器
*/
private LinkedList<ProcessNode> childrens;
//后面的代碼省略
SpoutProcesser的第一個(gè)變量是KafkaDataQueue,這個(gè)數(shù)據(jù)隊(duì)列由KafkaThread類(lèi)負(fù)責(zé)寫(xiě)入數(shù)據(jù)(它的數(shù)據(jù)來(lái)源是Kafka隊(duì)列)乏苦,由ProcessThread負(fù)責(zé)讀取數(shù)據(jù)并處理株扛。關(guān)于ProcessThread等線(xiàn)程類(lèi)后邊詳細(xì)介紹。
SpoutProcesser的第二個(gè)變量是StreamSpout汇荐,這就是Storm中Spout的對(duì)應(yīng)實(shí)現(xiàn)洞就,是拓?fù)錁?shù)的根節(jié)點(diǎn)。
SpoutProcesser第三個(gè)變量:private LinkedList<ProcessNode> childrens;
掀淘,這個(gè)變量記錄了它的下一級(jí)節(jié)點(diǎn)有哪些旬蟋,這是組成Topology樹(shù)的關(guān)鍵。
public class BoltProcessor implements ProcessNode {
/**
* 本處理器對(duì)應(yīng)的Bolt
*/
private Bolt bolt;
/**
* 本處理器對(duì)應(yīng)的下一代處理器
*/
private LinkedList<ProcessNode> childrens;
//后面的代碼省略
BoltProcessor也是一樣繁疤,維護(hù)了Bolt的下一級(jí)節(jié)點(diǎn)列表咖为。
三、線(xiàn)程
單進(jìn)程版流計(jì)算有四個(gè)線(xiàn)程:
- KafkaThread稠腊,如前所述躁染,它負(fù)責(zé)讀取Kafka隊(duì)列,并把數(shù)據(jù)放到SpoutProcesser的KafkaDataQueue架忌。每個(gè)SpoutProcesser都會(huì)收到一份完全相同的數(shù)據(jù)的拷貝吞彤,有點(diǎn)類(lèi)似于Storm的AllGrouping分組方式。全局只有一個(gè)KafkaThread叹放。
- ProcessThread饰恕,它通過(guò)調(diào)用SpoutProcesser的實(shí)例來(lái)處理KafkaDataQueue中的數(shù)據(jù),每個(gè)Topology對(duì)應(yīng)一個(gè)ProcessThread井仰。
- OutputThread埋嵌,負(fù)責(zé)把流計(jì)算的結(jié)果存儲(chǔ)到MongoDB,全局只有一個(gè)OutputThread俱恶。
- ShutdownHookThread雹嗦,當(dāng)進(jìn)程退出時(shí)范舀,如kill -15 程序退出時(shí),把未處理的kafka數(shù)據(jù)退回kafka隊(duì)列了罪,把已經(jīng)處理生成的結(jié)果存入mongo锭环,減少數(shù)據(jù)丟失。ShutdownListener類(lèi)實(shí)現(xiàn)了ApplicationListener接口泊藕,當(dāng)監(jiān)聽(tīng)到ContextClosedEvent事件時(shí)啟動(dòng)ShutdownHookThread辅辩。
四、流計(jì)算業(yè)務(wù)邏輯的實(shí)現(xiàn)
關(guān)于Groovy腳本是如何在Java程序中運(yùn)行的娃圆,可以參考《Groovy腳本使用方法》玫锋、《baas系統(tǒng)腳本說(shuō)明》。這篇文檔主要介紹StreamSpout踊餐、ConvertBolt和StatBolt的實(shí)現(xiàn)(AlarmBolt與ConvertBolt類(lèi)似景醇,不再重復(fù)介紹)。
(一)StreamSpout
StreamSpout是根節(jié)點(diǎn)的實(shí)現(xiàn)吝岭,它實(shí)現(xiàn)了Spout接口:
public interface Spout {
/**
* 得到本Spout的名字
*/
String getName();
/**
* 得到本Spout的Id
*/
String getId();
/**
* 準(zhǔn)備本Bolt
*/
void prepare();
/**
* 提取設(shè)備檔案操作對(duì)象
*/
IArchives getArchives();
/**
* 設(shè)置設(shè)備檔案操作對(duì)象
*/
void setArchives(IArchives archives);
/**
* 執(zhí)行Spout內(nèi)部的判斷邏輯三痰,判別是否應(yīng)該交由本Spout進(jìn)行處理
*/
BoltParameter execute(SpoutParameter spoutParameter);
}
prepare方法只在初始化的時(shí)候執(zhí)行一次,它負(fù)責(zé)做一些準(zhǔn)備工作窜管。
execute方法每收到一個(gè)數(shù)據(jù)就會(huì)運(yùn)行一次散劫,處理真正的業(yè)務(wù)邏輯,數(shù)據(jù)通過(guò)參數(shù)BoltParameter傳遞進(jìn)來(lái)幕帆,通過(guò)返回BoltParameter傳遞給下一級(jí)節(jié)點(diǎn)获搏。
getArchives方法返回一個(gè)IArchives接口,通過(guò)這個(gè)接口提供的方法可以獲取設(shè)備檔案失乾。
StreamSpout的定義(核心片段常熙,非完整代碼):
public class StreamSpout implements Spout {
/**
* 本Spout的配置
*/
private SpoutConfig config;
/**
* 訪(fǎng)問(wèn)設(shè)備檔案的對(duì)象
*/
private Archives archives;
@Override
public void prepare() {
}
@Override
public BoltParameter execute(SpoutParameter spoutParameter) {
//省略具體實(shí)現(xiàn)
}
//省略后面的代碼
第一個(gè)成員變量是SpoutConfig,這就是用戶(hù)在輕應(yīng)用平臺(tái)上所做的配置碱茁,由單進(jìn)程流計(jì)算的入口類(lèi)Executor從數(shù)據(jù)庫(kù)中讀取填充裸卫。
第二個(gè)成員變量Archives,這個(gè)對(duì)象包含一個(gè)deviceId成員變量纽竣,當(dāng)StreamSpout收到數(shù)據(jù)時(shí)墓贿,execute方法會(huì)給deviceId賦值,在后面的節(jié)點(diǎn)中將用來(lái)獲取檔案信息蜓氨。
StreamSpout的prepare方法目前為空聋袋,沒(méi)有任何準(zhǔn)備工作要做。
execute方法首先判斷數(shù)據(jù)流名稱(chēng)是否和用戶(hù)配置的一致穴吹,然后構(gòu)造BoltParameter對(duì)象(由流計(jì)算的上下文幽勒、內(nèi)置輸入對(duì)象、檔案操作對(duì)象組成)港令,返回該對(duì)象給下一級(jí)節(jié)點(diǎn)代嗤。
(二)ConvertBolt
ConvertBolt的定義(核心片段棘钞,非完整代碼):
public class ConvertBolt extends BaseBolt {
/**
* 本Bolt的配置
*/
private ConvertBoltConfig config;
/**
* 腳本對(duì)象
*/
private IConvertProcess process;
/**
* 把數(shù)據(jù)保存到Mongo的隊(duì)列
*/
private OutputQueue outputQueue;
//省略非業(yè)務(wù)邏輯私有變量
@Override
public void prepare() {
//省略具體實(shí)現(xiàn)
}
@Override
public List<BoltParameter> execute(BoltParameter parameter) {
//省略具體實(shí)現(xiàn)
}
}
ConvertBolt繼承了BaseBolt,后者非常簡(jiǎn)單干毅,不影響整體理解,細(xì)節(jié)請(qǐng)閱讀源代碼泼返。
第一個(gè)成員變量是ConvertBoltConfig硝逢,和SpoutConfig一樣,這是用戶(hù)在輕應(yīng)用平臺(tái)上所做的配置绅喉,由單進(jìn)程流計(jì)算的入口類(lèi)Executor從數(shù)據(jù)庫(kù)中讀取填充渠鸽。
第二個(gè)成員變量IConvertProcess丸凭,用來(lái)引用Groovy腳本的實(shí)例衙传,執(zhí)行Groovy腳本的時(shí)候用到。
第三個(gè)成員變量OutputQueue驱富,OutputThread會(huì)將這個(gè)隊(duì)列的數(shù)據(jù)存儲(chǔ)到MongoDB革屠。
prepare方法只在初始化的時(shí)候執(zhí)行一次凿试,它負(fù)責(zé)做一些準(zhǔn)備工作,例如解析ConvertBoltConfig并加載Groovy腳本那婉。
execute方法每收到一個(gè)數(shù)據(jù)就會(huì)運(yùn)行一次,處理真正的業(yè)務(wù)邏輯党瓮,數(shù)據(jù)通過(guò)參數(shù)BoltParameter傳遞進(jìn)來(lái)详炬,通過(guò)返回List<BoltParameter>傳遞給下一級(jí)節(jié)點(diǎn)。
(三)StatBolt
StatBolt的定義(核心片段寞奸,非完整代碼):
public class StatBolt extends BaseBolt {
/**
* 統(tǒng)計(jì)單元的配置
*/
private StaticsBoltConfig config;
/**
* 統(tǒng)計(jì)腳本對(duì)象
*/
private IStatProcess statProcessor;
/**
* 統(tǒng)計(jì)緩存對(duì)象
*/
private Caches caches;
/**
* 統(tǒng)計(jì)過(guò)程中訪(fǎng)問(wèn)Redis的對(duì)象呛谜,用于保存和提取中間結(jié)果
*/
private RedisClient redisClient;
/**
* 把數(shù)據(jù)存儲(chǔ)到Mongo中的隊(duì)列
*/
private OutputQueue outputQueue;
@Override
public void prepare() {
//省略具體實(shí)現(xiàn)
}
@Override
public List<BoltParameter> execute(BoltParameter parameter) {
//省略具體實(shí)現(xiàn)
}
}
StatBolt和ConvertBolt結(jié)構(gòu)基本一致,相同的部分不再重復(fù)說(shuō)明枪萄。
成員變量Caches是內(nèi)置的統(tǒng)計(jì)計(jì)算所需的緩存類(lèi)隐岛,在prepare方法中初始化,它實(shí)現(xiàn)了ICaches接口呻引,包括單個(gè)設(shè)備統(tǒng)計(jì)用到的group函數(shù)和全局統(tǒng)計(jì)用到的group(Object group)礼仗。雖然名為Caches,但它更多的是作為計(jì)算所需的內(nèi)置對(duì)象逻悠,實(shí)際的緩存功能是由它內(nèi)部的RedisClient類(lèi)實(shí)現(xiàn)元践。
execute方法判斷是否到達(dá)輸出時(shí)間(上一次輸出時(shí)間可通過(guò)Caches獲取)童谒,如果到達(dá)則執(zhí)行輸出腳本单旁,如果沒(méi)有到達(dá)則執(zhí)行計(jì)算腳本。
五饥伊、緩存的實(shí)現(xiàn)
采用了兩級(jí)緩存機(jī)制:Redis和Ehcache象浑,前者是遠(yuǎn)程緩存蔫饰,后者是本地緩存。事實(shí)上正是由于遠(yuǎn)程緩存性能不夠好才引入了本地緩存愉豺,但另一方面篓吁,如果只使用本地緩存,程序意外終止時(shí)會(huì)丟失數(shù)據(jù)蚪拦,所以?xún)烧呓Y(jié)合使用杖剪。
CacheUtils類(lèi)提供了Ehcache的訪(fǎng)問(wèn),RedisClient類(lèi)除了提供了Redis的訪(fǎng)問(wèn)驰贷,還包含CacheUtils的調(diào)用并提供其他類(lèi)所需的接口方法盛嘿。
緩存的細(xì)節(jié)信息見(jiàn)下表:
緩存類(lèi)型 | 最大數(shù)量級(jí) | key值構(gòu)成 | Ehcache緩存名字 | 備注 |
---|---|---|---|---|
設(shè)備檔案 | 設(shè)備數(shù)量 | arch-、archiveId括袒、deviceId | archiveCache | |
單個(gè)統(tǒng)計(jì)中間結(jié)果 | 設(shè)備數(shù)量*統(tǒng)計(jì)節(jié)點(diǎn)數(shù)量 | cache-次兆、statId、deviceId | midStatCache | 單個(gè)統(tǒng)計(jì)時(shí)緩存數(shù)量遠(yuǎn)大于全局統(tǒng)計(jì) |
單個(gè)統(tǒng)計(jì)的上一次輸出時(shí)間 | 設(shè)備數(shù)量*統(tǒng)計(jì)節(jié)點(diǎn)數(shù)量 | stat_last_time-锹锰、statId芥炭、deviceId | lastStatOutputCache | 單個(gè)統(tǒng)計(jì)時(shí)緩存數(shù)量遠(yuǎn)大于全局統(tǒng)計(jì) |
全局統(tǒng)計(jì)中間結(jié)果 | 檔案字段數(shù)量*統(tǒng)計(jì)節(jié)點(diǎn)數(shù)量 | cache-、statId城须、group | midStatCache | group是檔案字段的值 |
全局統(tǒng)計(jì)的上一次輸出時(shí)間 | 檔案字段數(shù)量*統(tǒng)計(jì)節(jié)點(diǎn)數(shù)量 | stat_last_time-蚤认、statId、group | lastStatOutputCache | group是檔案字段的值 |
上一次告警輸出的時(shí)間(數(shù)據(jù)來(lái)源為非統(tǒng)計(jì)節(jié)點(diǎn)) | 設(shè)備數(shù)量*告警節(jié)點(diǎn)數(shù)量 | alarm_last-糕伐、alarmId砰琢、deviceId | lastAlarmCache | |
上一次告警輸出的時(shí)間(數(shù)據(jù)來(lái)源為單個(gè)統(tǒng)計(jì)節(jié)點(diǎn)) | 設(shè)備數(shù)量*告警節(jié)點(diǎn)數(shù)量 | alarm_last-、alarmId良瞧、deviceId | lastAlarmCache | key值看起來(lái)和上一行相同陪汽,但alarmId實(shí)際會(huì)不一樣。 |
上一次告警輸出的時(shí)間(數(shù)據(jù)來(lái)源為全局統(tǒng)計(jì)節(jié)點(diǎn)) | 檔案字段數(shù)量*告警節(jié)點(diǎn)數(shù)量 | alarm_last-褥蚯、alarmId挚冤、group | lastAlarmCache | group是檔案字段的值 |
全局統(tǒng)計(jì)的維度值列表 | 檔案字段數(shù)量*統(tǒng)計(jì)節(jié)點(diǎn)數(shù)量 | statId | statDimensionsCache | value是Set<String>,Set里邊放的group赞庶;單線(xiàn)程版本的流計(jì)算是存儲(chǔ)在Redis |
六训挡、程序的入口:Executor類(lèi)
Executor類(lèi)的init方法是單進(jìn)程版流計(jì)算程序的入口,init方法主要做了兩件事:創(chuàng)建Topology樹(shù)和啟動(dòng)各個(gè)線(xiàn)程歧强,當(dāng)init方法執(zhí)行完畢之后澜薄,單進(jìn)程流計(jì)算程序就開(kāi)始讀取Kafka隊(duì)列中的數(shù)據(jù)并處理,這個(gè)過(guò)程將會(huì)一直運(yùn)行下去摊册,只能用戶(hù)手動(dòng)停止肤京。