本文基于TalkingData 張學(xué)敏 在公司內(nèi)部KOL的分享主題《基于Spark、NoSQL實時數(shù)據(jù)處理實踐》的整理蓖柔,同時也在DTCC大會上做了同主題的分享。
主要介紹了項目的技術(shù)選型风纠、技術(shù)架構(gòu)况鸣,重點介紹下項目面臨的挑戰(zhàn)和解決辦法,還介紹了面對多維度竹观、多值镐捧、多版本等業(yè)務(wù)場景時,使用Bitmap與HBase特性解決問題方法臭增。
共分為上下兩篇懂酱,本次發(fā)布上篇,下篇敬請關(guān)注誊抛。
一列牺、數(shù)據(jù)相關(guān)情況
項目處理的數(shù)據(jù)主要來源于TalkingData的三條SASS業(yè)務(wù)線,他們主要是為移動應(yīng)用開發(fā)者提供應(yīng)用的統(tǒng)計分析拗窃、游戲運(yùn)營分析以及廣告監(jiān)測等能力瞎领。開發(fā)者使用TD的SDK將各種事件數(shù)據(jù)發(fā)送過來泌辫,然后再通過SASS平臺使用數(shù)據(jù)。
數(shù)據(jù)主要都和智能設(shè)備相關(guān)九默,包含的數(shù)據(jù)內(nèi)容主要可以分為三部分驼修,一部分是設(shè)備信息類邪锌,主要包括設(shè)備ID觅丰,比如Mac、IDFA等轻掩,還有設(shè)備的軟硬件信息聚唐,比如操作系統(tǒng)版本號,屏幕分辨率等。另一部分是業(yè)務(wù)相關(guān)信息類豫领,主要包括業(yè)務(wù)事件鼠锈,會話信息,還有行為狀態(tài)样傍。關(guān)于行為狀態(tài)撤逢,是我們在智能設(shè)備上使用算法推測終端持有者的行為狀態(tài)信息莫杈,比如靜止丁存、行走艘儒、奔跑翻斟、乘車等。第三部分是上下文信息,包括設(shè)備連接網(wǎng)絡(luò)的情況配并,使用的是蜂窩網(wǎng)絡(luò)還是WiFi等,還有設(shè)備位置相關(guān)的信息溉旋,以及其他傳感器相關(guān)的數(shù)據(jù)等低滩。
關(guān)于設(shè)備體量,目前設(shè)備日活月活分別在2.5億和6.5億以上迄委,每天的事件數(shù)在370億左右,一天數(shù)據(jù)的存儲量是在17T左右财忽。
上圖為整體的數(shù)據(jù)架構(gòu)圖倘核,數(shù)據(jù)流向是自下往上。數(shù)據(jù)采集層使用的是TalkingData自研的SDK,通過SDK將數(shù)據(jù)發(fā)往數(shù)據(jù)收集層漏益。數(shù)據(jù)收集層使用的是TalkingData自研的DataCollector酬凳,Collector會將數(shù)據(jù)發(fā)送到數(shù)據(jù)接入層的Kafka。每個業(yè)務(wù)線都有自己的Kafka集群遭庶,在Collector可以控制數(shù)據(jù)的流向宁仔,大多數(shù)據(jù)都是業(yè)務(wù)線一份,數(shù)據(jù)中心一份峦睡。數(shù)據(jù)處理層有兩部分翎苫,一部分是使用Spark core或sql的離線計算。其中Spark是on yarn模式榨了,使用yarn進(jìn)行資源管理煎谍,中間通過Alluxio進(jìn)行加速,使用Jenkins進(jìn)行作業(yè)管理和調(diào)度龙屉,主要負(fù)責(zé)為業(yè)務(wù)方提供數(shù)據(jù)集和數(shù)據(jù)服務(wù)呐粘。
另一部分是使用Spark Streaming的實時計算,主要是為TalkingData管理層提供運(yùn)營數(shù)據(jù)報表转捕。數(shù)據(jù)存儲層作岖,主要功能是存放數(shù)據(jù)處理后的結(jié)果,使用分布式文件系統(tǒng)HDFS五芝、Alluxio存放數(shù)據(jù)集痘儡,使用分布式數(shù)據(jù)庫HBase、ScyllaDB枢步,關(guān)系型數(shù)據(jù)庫MySQL以及MPP型數(shù)據(jù)庫GreenPlum存放服務(wù)相關(guān)的數(shù)據(jù)沉删。數(shù)據(jù)應(yīng)用層?xùn)|西就比較多了,有供TalkingData內(nèi)部使用的數(shù)據(jù)分析醉途、探索平臺矾瑰,也有對外內(nèi)外都可的數(shù)據(jù)服務(wù)、數(shù)據(jù)模型商城隘擎,以及智能營銷云殴穴、觀象臺等。
二嵌屎、項目面臨的業(yè)務(wù)訴求
主要的可總結(jié)為四部分:
首先是數(shù)據(jù)修正:離線計算是將數(shù)據(jù)存放在了HDFS上推正,如果數(shù)據(jù)有延遲,比如事件時間是昨天的數(shù)據(jù)今天才到宝惰,那么數(shù)據(jù)將會被錯誤的存放在今天的時間分區(qū)內(nèi)。因為HDFS不支持隨機(jī)讀寫再沧,也不好預(yù)測數(shù)據(jù)會延遲多久尼夺,所以在離線計算想要完全修正這些數(shù)據(jù),成本還是比較高的。
其次是時序數(shù)據(jù)需求:之前的業(yè)務(wù)都是以小時淤堵、天寝衫、周、月等時間周期拐邪,面向時間斷面? 的宏觀數(shù)據(jù)分析慰毅,隨著公司業(yè)務(wù)擴(kuò)展,比如營銷扎阶、風(fēng)控等行業(yè)汹胃,面向個體的微觀數(shù)據(jù)分析的需求越來越多,所以需要能夠低成本的把一個設(shè)備的相關(guān)的數(shù)據(jù)都取出來做分析东臀。而面向時間斷面的數(shù)據(jù)每天十幾T着饥,想從中抽出某些設(shè)備近1個月的數(shù)據(jù)就會涉及到500多T的數(shù)據(jù)。所以需要建立時序數(shù)據(jù)處理惰赋、查詢的能力宰掉,能方便的獲取設(shè)備歷史上所有數(shù)據(jù)。
第三是實時處理:離線計算少則延遲一個小時赁濒,多則一天或者更久轨奄,而有些行業(yè)對數(shù)據(jù)時效性要求是比較高的,比如金融拒炎、風(fēng)控等業(yè)務(wù)戚绕,所以需要實時數(shù)據(jù)處理。同時枝冀,為了更多的豐富設(shè)備位置相關(guān)數(shù)據(jù)舞丛,我們還建立了WiFi、基站等實體的位置庫果漾,所以在實時數(shù)據(jù)處理時球切,需要實時讀取這些庫為那些連接了WiFi、基站但沒位置數(shù)據(jù)的設(shè)備補(bǔ)充位置相關(guān)信息绒障。
第四是實時查詢吨凑,這里描述的是面向?qū)嶓w、多維度户辱、多值鸵钝、多版本,接下來我詳細(xì)介紹下庐镐。
我們將事件數(shù)據(jù)抽象出了各種實體恩商,比如設(shè)備、位置必逆、WiFi基站等實體怠堪,其中位置實體可以使用GeoHash或者網(wǎng)格表達(dá)揽乱。每個實體都有唯一ID以及多個維度信息,以設(shè)備實體為例粟矿,包括ID凰棉、軟硬件信息等維度。單個維度又可能會包含多個值陌粹,比如WiFi撒犀,在家我連接的是WiFi1,到公司鏈接的是WiFi2掏秩,所以WiFi維度有WiFi1和WiFi2兩個值或舞。單個值又可能有多個時間版本,比如我在家連接WiFi1可能6點被捕獲到一次哗讥,7點被捕獲到兩次嚷那。所以,最終建立可以通過指定實體ID杆煞,查詢維度魏宽、列及時間窗口獲取數(shù)據(jù)的能力。
三决乎、技術(shù)選型和架構(gòu)
數(shù)據(jù)接入層我們選擇的是Kafka队询,Kafka在大數(shù)據(jù)技術(shù)圈里出鏡率還是比較高的。Kafka是LinkedIn在2011年開源的构诚,創(chuàng)建初衷是解決系統(tǒng)間消息傳遞的問題蚌斩。傳統(tǒng)消息系統(tǒng)有兩種模型,一種是隊列模型范嘱,一種是訂閱發(fā)布模型送膳。兩者各有優(yōu)缺,比如隊列模型的消息系統(tǒng)可以支持多個客戶端同時消費(fèi)不同的數(shù)據(jù)丑蛤,也就是可以很方便的擴(kuò)展消費(fèi)端的能力叠聋,但訂閱發(fā)布模型就不好擴(kuò)展,因為它是使用的廣播模式受裹。另一個就是碌补,隊列模型的消息只能被消費(fèi)一次,一旦一個消息被某個消費(fèi)者處理了棉饶,其他消費(fèi)者將不能消費(fèi)到該消息厦章,而發(fā)布訂閱模型同一消息可以被所有消費(fèi)者消費(fèi)到。Kafka使用Topic分類數(shù)據(jù)照藻,一個Topic類似一個消息隊列袜啃。Kafka還有個概念,叫consumer?group岩梳,一個group里可以有多個消費(fèi)者囊骤,同一個topic可以被一個group內(nèi)的多個消費(fèi)者同時消費(fèi)不同的消息晃择,也就是類似隊列模型可以方便的擴(kuò)展消費(fèi)端能力冀值。一個Topic也可以被多個group消費(fèi)也物,group之間相互沒有影響,也就是類似發(fā)布訂閱模型列疗,Topic中的一條消息可以被消費(fèi)多次滑蚯。所以Kafka等于說是使用Topic和Consumer?group等概念,將隊列模型和訂閱發(fā)布模型的優(yōu)勢都糅合了進(jìn)來抵栈。
現(xiàn)在Kafka官方將Kafka的介紹做了調(diào)整告材,不再滿足大家簡單的將其定位為消息隊列,新的介紹描述是:可以被用來創(chuàng)建實時數(shù)據(jù)管道和流式應(yīng)用古劲,且具有可擴(kuò)展斥赋、高容錯,高吞吐等優(yōu)勢产艾。另外绍豁,經(jīng)過7年的發(fā)展站叼,kafka也比較成熟了,與周邊其他組件可以很方便的集成。但目前也有兩個比較明顯的劣勢英染,一個是不能保證Topic級別的數(shù)據(jù)有序,另一個是開源的管理工具不夠完善报账。
Spark現(xiàn)在聽起來不像前幾年那么性感了当宴,但因為我們離線計算使用的Spark,有一定的技術(shù)積累踱阿,所以上手比較快管钳。另外,Spark Streaming并不是真正意義上的流式處理软舌,而是微批才漆,相比Storm、Flink延遲還是比較高的葫隙,但目前也能完全滿足業(yè)務(wù)需求栽烂,另外,為了技術(shù)統(tǒng)一恋脚,資源管理和調(diào)度統(tǒng)一腺办,所以我們最終選用了Spark Streaming。
Spark Streaming是Spark核心API的擴(kuò)展糟描,可實現(xiàn)高擴(kuò)展怀喉、高吞吐、高容錯的實時流數(shù)據(jù)處理應(yīng)用船响。支持從Kafka躬拢、Flum躲履、HDFS、S3等多種數(shù)據(jù)源獲取數(shù)據(jù)聊闯,并根據(jù)一定的時間間隔拆分成一批批的數(shù)據(jù)工猜,然后可以使用map、reduce菱蔬、join篷帅、window等高級函數(shù)或者使用SQL進(jìn)行復(fù)雜的數(shù)據(jù)處理,最終得到處理后的一批批結(jié)果數(shù)據(jù)拴泌,其還可以方便的將處理結(jié)果存放到文件系統(tǒng)魏身、數(shù)據(jù)庫或者儀表盤,功能還是很完善的蚪腐。
Spark Streaming將處理的數(shù)據(jù)流抽象為Dstream箭昵,DStream本質(zhì)上表示RDD的序列,所以任何對DStream的操作都會轉(zhuǎn)變?yōu)閷Φ讓覴DD的操作回季。
HBase是以分布式文件系統(tǒng)HDSF為底層存儲的分布式列式數(shù)據(jù)庫家制,它是對Google BigTable開源的實現(xiàn),主要解決超大規(guī)模數(shù)據(jù)集的實時讀寫茧跋、隨機(jī)訪問的問題慰丛,并且具有可擴(kuò)展、高吞吐瘾杭、高容錯等優(yōu)點诅病。HBase這些優(yōu)點取決于其架構(gòu)和數(shù)據(jù)結(jié)構(gòu)的設(shè)計,他的數(shù)據(jù)寫入并不是直接寫入文件粥烁,當(dāng)然HDFS不支持隨機(jī)寫入贤笆,而是先寫入被稱作MemStore的內(nèi)存,然后再異步刷寫至HDFS,等于是將隨機(jī)寫入轉(zhuǎn)換成了順序?qū)懱肿瑁源蠖鄷r候?qū)懭胨俣雀卟⑶液芊€(wěn)定芥永。
?而讀數(shù)據(jù)快,是使用字典有序的主鍵RowKey通過Zookeeper先定位到數(shù)據(jù)可能所在的RegionServer钝吮,然后先查找RegionServer的讀緩存BlockCache埋涧,如果沒找到會再查MemStore,只有這兩個地方都找不到時奇瘦,才會加載HDFS中的內(nèi)容棘催,但因為其使用了LSM樹型結(jié)構(gòu),所以讀取耗時一般也不長耳标。還有就是醇坝,HBase還可以使用布隆過濾器通過判存提高查詢速度。
HBase的數(shù)據(jù)模型也很有意思次坡,跟關(guān)系型數(shù)據(jù)庫類似呼猪,也有表的概念画畅,也是有行有列的二維表。和關(guān)系型數(shù)據(jù)庫不一樣一個地方是他有ColumnFamily的概念宋距,并且一個ColumnFamily下可以有很多個列轴踱,這些列在建表時不用聲明,而是在寫入數(shù)據(jù)時確定乡革,也就是所謂的Free Schema寇僧。
HBase的缺點一個是運(yùn)維成本相對較高摊腋,像compact沸版、split、flush等問題處理起來都是比較棘手的兴蒸,都需要不定期的投入時間做調(diào)優(yōu)视粮。還有個缺點是延遲不穩(wěn)定,影響原因除了其copmact橙凳、flush外還有JVM的GC以及緩存命中情況蕾殴。
ScyllaDB算是個新秀,可以與Cassandra對比了解岛啸,其實它就是用C++重寫的Cassandra钓觉,客戶端完全與Cassandra兼容,其官網(wǎng)Benchmark對標(biāo)的也是Cassandra坚踩,性能有10倍以上的提升荡灾,單節(jié)點也可以每秒可以處理100萬TPS,整體性能還是比較喜人的瞬铸。與HBase批幌、Cassandra一樣也有可擴(kuò)展、高吞吐嗓节、高容錯的特點荧缘,另外他的延遲也比較低,并且比較穩(wěn)定拦宣。
他和Cassandra與HBase都可以以做到CAP理論里的P截粗,即保證分區(qū)容忍性,也就是在某個或者某些節(jié)點出現(xiàn)網(wǎng)絡(luò)故障或者系統(tǒng)故障時候鸵隧,不會影響到整個DataBase的使用绸罗。而他倆與HBase不一樣的一個地方在于分區(qū)容忍性包證的情況下,一致性與高可用的取舍掰派,也就是CAP理論里从诲,在P一定時C與A的選擇。HBase選擇的是C靡羡,即強(qiáng)一致性系洛,比如在region failover 及后續(xù)工作完成前俊性,涉及的region的數(shù)據(jù)是不能讀取的,而ScyllaDB描扯、Cassandra選擇的A定页,即高可用的,但有些情況下數(shù)據(jù)可能會不一致绽诚。所以典徊,選型時需要根據(jù)業(yè)務(wù)場景來定。
ScyllaDB的劣勢也比較明顯恩够,就是項目比較新卒落,Bug和使用的坑比較多, 我在這里就不一一去說了蜂桶。
前面分別簡單介紹了選定的技術(shù)組件儡毕,及他們的優(yōu)缺點,最終項目整體架構(gòu)如上圖所示扑媚,數(shù)據(jù)流向用灰色箭頭代表腰湾,數(shù)據(jù)采集和收集都與離線計算一樣,不同的是在Spark Streaming從Kafka消費(fèi)數(shù)據(jù)時疆股,會同時實時從ScyllaDB讀取wifi费坊、基站定位庫的數(shù)據(jù)參與位置補(bǔ)充的計算,然后將處理的結(jié)果數(shù)據(jù)寫入HBase旬痹。再往下類似Lambda架構(gòu)附井,會對HBase中的數(shù)據(jù)離線做進(jìn)一步的處理,然后再將數(shù)據(jù)離線通過Bulkload方式寫入HBase唱凯,關(guān)于其中的Bitmap應(yīng)用羡忘,后邊再聊。
架構(gòu)右邊部分是服務(wù)相關(guān)的磕昼,首先是中間件卷雕,主要屏蔽了異構(gòu)數(shù)據(jù)庫對應(yīng)用層服務(wù)的影響,再往上是規(guī)則引擎服務(wù)票从,因為我們上線在SDMK的應(yīng)用服務(wù)有100多個漫雕,導(dǎo)致服務(wù)管理成本很高,并且也不利于物理資源的合理運(yùn)用峰鄙,所以上線了規(guī)則引擎服務(wù)浸间,將所有服務(wù)的業(yè)務(wù)邏輯都通過規(guī)則表達(dá),這樣上線新服務(wù)就不需要重新申請服務(wù)器吟榴,只需要添加一條規(guī)則即可魁蒜。等于是就將一百多個服務(wù)轉(zhuǎn)換成了一個服務(wù),當(dāng)規(guī)則引擎負(fù)載較高時或者大幅降低后,可以很方便的進(jìn)行資源的擴(kuò)充和減少兜看。SDMK是TalkingData研發(fā)的類似淘寶的交易平臺锥咸,公司內(nèi)、外的數(shù)據(jù)服務(wù)细移、數(shù)據(jù)模型都可以像商品一樣在上面進(jìn)行售賣搏予。