企業(yè)級大數(shù)據(jù)技術體系概述
大數(shù)據(jù)架構的6層
大數(shù)據(jù)從數(shù)據(jù)源開始,經(jīng)過分析撩扒、挖掘到最終獲得價值一般需要經(jīng)過6個主要環(huán)節(jié):
包括數(shù)據(jù)收集、數(shù)據(jù)存儲、資源管理與服務協(xié)調(diào)、計算引擎、數(shù)據(jù)分析和數(shù)據(jù)可視化
Hadoop與Spark開源大數(shù)據(jù)技術棧:
大數(shù)據(jù)架構:Lambda Architecture
Hadoop MapReduce這樣的批處理系統(tǒng)猾编,可靠性高,而實時性差;Storm這樣的流式處理系統(tǒng)來說,則情況正好相反佩谣。
通過結合這兩類計算技術安皱,LA可以在延遲腾窝、吞吐量和容錯之間找到平衡點驴娃。
LA主要思想是將數(shù)據(jù)處理流程分解成三層:批處理層暇榴、流式處理層和服務層婆硬。
一個經(jīng)典的LA應用案例是推薦系統(tǒng):推薦系統(tǒng)的設計目的是根據(jù)用戶的興趣特點和購買行為谐区,向用戶推薦感興趣的信息和商品。
推薦系統(tǒng)最核心的模塊是推薦算法,推薦算法通常會根據(jù)用戶的興趣特點和歷史行為數(shù)據(jù)構建推薦模型坤邪,以預測用戶可能感興趣的信息和商品,進而推薦給用戶。
典型的推薦系統(tǒng)架構
數(shù)據(jù)首先流入Kafka或粮,之后按照不同時間粒度導入批處理和流式處理兩個系統(tǒng)中硝岗。
批處理層擁有所有歷史數(shù)據(jù)(通常保存到HDFS/HBase中),通常用以實現(xiàn)推薦模型,它以當前數(shù)據(jù)(比如最近一小時數(shù)據(jù))和歷史數(shù)據(jù)為輸入,通過特征工程、模型構建(通常是迭代算法,使用MapReduce/Spark實現(xiàn))及模型評估等計算環(huán)節(jié)后,最終獲得最優(yōu)的模型并將產(chǎn)生的推薦結果存儲(比如Redis)起來始藕,整個過程延遲較大(分鐘甚至小時級別)伍派;
為了解決推薦系統(tǒng)中的冷啟動問題(新用戶推薦)诉植,往往會引入流式處理層:它會實時收集用戶的行為,并基于這些行為數(shù)據(jù)通過簡單的推薦算法(通常使用Storm/SparkStreaming實現(xiàn))快速產(chǎn)生推薦結果并存儲起來舌稀。為了便于其他系統(tǒng)獲取推薦結果壁查,推薦系統(tǒng)往往通過服務層對外提供訪問接口睡腿,比如網(wǎng)站后臺在渲染某個訪問頁面時,可能從廣告系統(tǒng)应闯、推薦系統(tǒng)以及內(nèi)容存儲系統(tǒng)中獲取對應的結果,并返回給客戶端骨田。
備注:用戶冷啟動盛撑,指“產(chǎn)品初期捧搞,從目標用戶轉化為種子用戶的過程”
數(shù)據(jù)收集
關系型數(shù)據(jù)的收集
Sqoop(SQL to Hadoop)
為了能夠利用大數(shù)據(jù)技術處理和存儲這些關系型數(shù)據(jù)胎撇,首先需將數(shù)據(jù)導入到像HDFS晚树、HBase這樣的大數(shù)據(jù)存儲系統(tǒng)中爵憎,以便使用MapReduce、Spark這樣的分布式計算技術進行高效分析和處理刑棵。另一方面蛉签,為了便于與前端的數(shù)據(jù)可視化系統(tǒng)對接沥寥,我們通常需要將Hadoop大數(shù)據(jù)系統(tǒng)分析產(chǎn)生的結果(比如報表邑雅,通常數(shù)據(jù)量不會太大)導回到關系型數(shù)據(jù)庫中淮野。為了解決上述問題狂塘,高效地實現(xiàn)關系型數(shù)據(jù)庫與Hadoop之間的數(shù)據(jù)導入導出,Hadoop生態(tài)系統(tǒng)提供了工具Sqoop(SQL toHadoop)
Sqoop采用MapReduce可進行全量關系型數(shù)據(jù)的收集了嚎。
數(shù)據(jù)增量收集CDC
除了收集數(shù)據(jù)庫全量數(shù)據(jù)外歪泳,我們還希望只獲取增量數(shù)據(jù)露筒,即MySQL某個表從某個時刻開始修改/插入/刪除的數(shù)據(jù)慎式。捕獲數(shù)據(jù)源中數(shù)據(jù)的更新瘪吏,進而獲取增量數(shù)據(jù)的過程掌眠,被稱為CDC(“Change Data Capture”)。
CDC幾種應用場景
- 異地機房同步级遭。實現(xiàn)數(shù)據(jù)異地機房容災挫鸽。
- 數(shù)據(jù)庫實時備份掠兄。類似于master/slave架構锌雀,實時對數(shù)據(jù)庫進行備份腋逆。
- 業(yè)務Cache刷新惩歉。更新數(shù)據(jù)庫成功的同時,刷新cache中的值上遥。
- 數(shù)據(jù)全庫遷移粉楚。創(chuàng)建任務隊列表模软,逐步完成全庫所有表的遷移燃异。
Alibaba Canal 組件
Canal的主要定位是基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費逛腿,目前主要支持了MySQL關系型數(shù)據(jù)庫鳄逾。Canal的主要原理是雕凹,模擬數(shù)據(jù)庫的主備復制協(xié)議枚抵,接收主數(shù)據(jù)庫產(chǎn)生的binary log(簡稱“binlog”)汽摹,進而捕獲更新數(shù)據(jù)苦锨。
步驟1:Canal實現(xiàn)MySQL主備復制協(xié)議舟舒,向MySQL Server發(fā)送dump協(xié)議秃励。
步驟2:MySQL收到dump請求夺鲜,開始推送binlog給Canal。
步驟3:Canal解析binlog對象珊拼,并發(fā)送給各個消費者砌们。
Databus
相比于阿里巴巴的Canal系統(tǒng)浪感,LinkedIn的Databus更加強大影兽,包括支持更多數(shù)據(jù)源(Oracle和MySQL等)、擴展性更優(yōu)的架構(比如高擴展的架構允許保存更長時間的更新數(shù)據(jù))等
多機房數(shù)據(jù)同步系統(tǒng)Otter
Otter基于Canal開源產(chǎn)品莱革,獲取數(shù)據(jù)庫增量日志數(shù)據(jù)峻堰,本身采用典型的管理系統(tǒng)架構:Manager(Web管理)+Node(工作節(jié)點)
- Manager負責發(fā)布同步任務配置,接收同步任務反饋的狀態(tài)信息等盅视。
- 工作節(jié)點負責執(zhí)行同步任務捐名,并將同步狀態(tài)反饋給Manager。
為了解決分布式狀態(tài)調(diào)度闹击,允許多Node節(jié)點之間協(xié)同工作镶蹋,Otter采用了開源分布式協(xié)調(diào)組件ZooKeeper赏半。
為了讓系統(tǒng)具有良好的擴展性和靈活性贺归,Otter將整個同步流程抽象為Select(與數(shù)據(jù)源對接的階段,為解決數(shù)據(jù)來源的差異性而引入)断箫、Extract拂酣、Transform、Load(簡稱S仲义、E婶熬、T、L)四個階段(類似于數(shù)據(jù)倉庫的ETL模型埃撵,即數(shù)據(jù)提取赵颅、數(shù)據(jù)轉換和數(shù)據(jù)載入三個階段)
Otter跨機房數(shù)據(jù)同步
數(shù)據(jù)涉及網(wǎng)絡傳輸,S盯另、E性含、T、L幾個階段會分散在2個或者更多Node節(jié)點上鸳惯,多個Node之間通過ZooKeeper進行協(xié)同工作(一般是Select和Extract在一個機房的Node, Transform/Load落在另一個機房的Node)商蕴。
非關系型數(shù)據(jù)的收集
在現(xiàn)實世界中叠萍,非關系型數(shù)據(jù)量遠大于關系型數(shù)據(jù)。非關系型數(shù)據(jù)種類繁多绪商,包括網(wǎng)頁苛谷、視頻、圖片格郁、用戶行為日志腹殿、機器日志等,其中日志類數(shù)據(jù)直接反映了(日志)生產(chǎn)者的現(xiàn)狀和行為特征例书,通常會用在行為分析系統(tǒng)锣尉、推薦系統(tǒng)、廣告系統(tǒng)中决采。日志數(shù)據(jù)具有流式自沧、數(shù)據(jù)量大等特點,通常分散在各種設備上树瞭,由不同服務和組件產(chǎn)生拇厢,為了高效地收集這些流式日志,需要采用具有良好擴展性晒喷、伸縮性和容錯性的分布式系統(tǒng)孝偎。
日志收集面臨以下問題:
? 數(shù)據(jù)源種類繁多:各種服務均會產(chǎn)生日志,這些日志格式不同凉敲,產(chǎn)生日志的方式也不同(有的寫到本地日志文件中衣盾,有的通過HTTP發(fā)到遠端等)。
? 數(shù)據(jù)源是物理分布的:各種服務運行在不同機器上荡陷,有的甚至是跨機房的雨效。設計日志收集系統(tǒng)時需考慮這種天然的分布式特征。
? 流式的废赞,不間斷產(chǎn)生:日志是實時產(chǎn)生的碑幅,需要實時或近實時收集到脉执,以便于后端的分析和挖掘姥芥。
? 對可靠性有一定要求:日志收集過程中幼驶,希望能做到不丟數(shù)據(jù)(比如銀行用戶轉賬日志),或只丟失可控的少量數(shù)據(jù)(比如用戶搜索日志)耘沼。
Flume
Cloudera公司開源的Flume系統(tǒng)便是解決以上這些流式數(shù)據(jù)收集問題的极颓,它是一個通用的流式數(shù)據(jù)收集系統(tǒng),可以將不同數(shù)據(jù)源產(chǎn)生的流式數(shù)據(jù)近實時地發(fā)送到后端中心化的存儲系統(tǒng)中群嗤,具有分布式菠隆、良好的可靠性以及可用性等優(yōu)點。
Flume NG基本架構
Flume的數(shù)據(jù)流是通過一系列稱為Agent的組件構成的,如圖3-2所示骇径,一個Agent可從客戶端或前一個Agent接收數(shù)據(jù)躯肌,經(jīng)過過濾(可選)、路由等操作后破衔,傳遞給下一個或多個Agent(完全分布式)清女,直到抵達指定的目標系統(tǒng)。
Agent內(nèi)部主要由三個組件構成晰筛,分別是Source, Channel和Sink:
- Source:
Flume將數(shù)據(jù)流水線中傳遞的數(shù)據(jù)稱為“Event”
Flume數(shù)據(jù)流中接收Event的組件嫡丙,通常從Client程序或上一個Agent接收數(shù)據(jù) - Channel:
Channel是一個緩存區(qū),它暫存Source寫入的Event读第,直到被Sink發(fā)送出去曙博。 - Sink:
Sink負責從Channel中讀取數(shù)據(jù),并發(fā)送給下一個Agent(的Source)卦方。
Flume NG高級組件
除了Source羊瘩、Channel和Sink外,F(xiàn)lume Agent還允許用戶設置其他組件更靈活地控制數(shù)據(jù)流盼砍,包括Interceptor, Channel Selector和Sink Processor等
- Interceptor:
Interceptor組件允許用戶修改(Timestamp Interceptor/Host Interceptor/UUID Interceptor)或丟棄(Regex Filtering Interceptor/Regex Extractor Interceptor)傳輸過程中的Event。 - Channel Selector:
Channel Selector允許Flume Source選擇一個或多個目標Channel逝她,并將當前Event寫入這些Channel浇坐。 - Sink Processor:
Flume允許將多個Sink組裝在一起形成一個邏輯實體(稱為“Sink Group”),而SinkProcessor則在Sink Group基礎上提供負載均衡以及容錯的功能(當一個Sink掛掉了黔宛,可由另一個Sink接替)近刘。
分布式消息隊列Kafka
在實際應用中,不同服務器(數(shù)據(jù)生產(chǎn)者)產(chǎn)生的日志臀晃,比如指標監(jiān)控數(shù)據(jù)觉渴、用戶搜索日志、用點擊日志等徽惋,需要同時傳送到多個系統(tǒng)中以便進行相應的邏輯處理和挖掘案淋,比如指標監(jiān)控數(shù)據(jù)可能被同時寫入Hadoop和Storm集群(數(shù)據(jù)消費者)進行離線和實時分析。為了降低數(shù)據(jù)生產(chǎn)者和消費者之間的耦合性险绘、平衡兩者處理能力的不對等踢京,消息隊列出現(xiàn)了。消息隊列是位于生產(chǎn)者和消費者之間的“中間件”宦棺,它解除了生產(chǎn)者和消費者的直接依賴關系瓣距,使得軟件架構更容易擴展和伸縮;它能夠緩沖生產(chǎn)者產(chǎn)生的數(shù)據(jù)代咸,防止消費者無法及時處理生產(chǎn)者產(chǎn)生的數(shù)據(jù)蹈丸。
Kafka設計架構
Kafka架構由Producer、Broker和Consumer三類組件構成,其中Producer將數(shù)據(jù)寫入Broker, Consumer則從Broker上讀取數(shù)據(jù)進行處理逻杖,而Broker構成了連接Producer和Consumer的“緩沖區(qū)”奋岁。Broker和Consumer通過ZooKeeper做協(xié)調(diào)和服務發(fā)現(xiàn)[插圖]。多個Broker構成一個可靠的分布式消息存儲系統(tǒng)弧腥,避免數(shù)據(jù)丟失厦取。Broker中的消息被劃分成若干個topic,同屬一個topic的所有數(shù)據(jù)按照某種策略被分成多個partition管搪,以實現(xiàn)負載分攤和數(shù)據(jù)并行處理虾攻。
Kafka各組件詳解
Kafka Producer
Kafka Producer是由用戶使用Kafka提供的SDK開發(fā)的,Producer將數(shù)據(jù)轉化成“消息”更鲁,并通過網(wǎng)絡發(fā)送給Broker霎箍。在Kafka中,每條數(shù)據(jù)被稱為“消息”澡为,每條消息表示為一個三元組:
<topic漂坏,Key,Message>
? topic:表示該條消息所屬的topic媒至。topic是劃分消息的邏輯概念顶别,一個topic可以分布到多個不同的broker上。
? key:表示該條消息的主鍵拒啰。Kafka會根據(jù)主鍵將同一個topic下的消息劃分成不同的分區(qū)(partition)驯绎,默認是基于哈希取模的算法,用戶也可以根據(jù)自己需要設計分區(qū)算法谋旦。
? message:表示該條消息的值剩失。該數(shù)值的類型為字節(jié)數(shù)組,可以是普通字符串册着、JSON對象拴孤,或者經(jīng)JSON, Avro, Thrift或Protobuf等序列化框架序列化后的對象。
Kafka Broker
在Kafka中甲捏,Broker一般有多個演熟,它們組成一個分布式高容錯的集群。Broker的主要職責是接受Producer和Consumer的請求摊鸡,并把消息持久化到本地磁盤绽媒。如圖所示,Broker以topic為單位將消息分成不同的分區(qū)(partition)免猾,每個分區(qū)可以有多個副本是辕,通過數(shù)據(jù)冗余的方式實現(xiàn)容錯。
當partition存在多個副本時猎提,其中有一個是leader获三,對外提供讀寫請求旁蔼,其他均是follower,不對外提供讀寫服務疙教,只是同步leader中的數(shù)據(jù)棺聊,并在leader出現(xiàn)問題時,通過選舉算法將其中的某一個提升為leader贞谓。
Kafka Broker能夠保證同一topic下同一partition內(nèi)部的消息是有序的限佩,但無法保證partition之間的消息全局有序,這意味著一個Consumer讀取某個topic下(多個分區(qū)中裸弦,如下圖所示)的消息時祟同,可能得到跟寫入順序不一致的消息序列。但在實際應用中理疙,合理利用分區(qū)內(nèi)部有序這一特征即可完成時序相關的需求晕城。
Kafka Broker以追加的方式將消息寫到磁盤文件中,且每個分區(qū)中的消息被賦予了唯一整數(shù)標識窖贤,稱之為“offset”(偏移量)砖顷,如圖上所示,Broker僅提供基于offset的讀取方式赃梧,不會維護各個Consumer當前已消費消息的offset值滤蝠,而是由Consumer各自維護當前讀取的進度。 Consumer讀取數(shù)據(jù)時告訴Broker請求消息的起始offset值授嘀,Broker將之后的消息流式發(fā)送過去几睛。Broker中保存的數(shù)據(jù)是有有效期的,比如7天粤攒,一旦超過了有效期,對應的數(shù)據(jù)將被移除以釋放磁盤空間囱持。只要數(shù)據(jù)在有效期內(nèi)夯接,Consumer可以重復讀取而不受限制。
Kafka Consumer
Kafka Consumer主動從Kafka Broker拉取消息進行處理纷妆。每個Kafka Consumer自己維護最后一個已讀取消息的offset盔几,并在下次請求從這個offset開始的消息,這一點不同于ZeroMQ掩幢、RabbitMQ等其他消息隊列逊拍,這種基于pull的機制大大降低了Broker的壓力,使得Kafka Broker的吞吐率很高际邻。
Kafka允許多個Consumer構成一個Consumer Group芯丧,共同讀取同一topic中的數(shù)據(jù),提高數(shù)據(jù)讀取效率世曾。Kafka可自動為同一Group中的Consumer分攤負載缨恒,從而實現(xiàn)消息的并發(fā)讀取,并在某個Consumer發(fā)生故障時,自動將它處理的partition轉移給同Group中其他Consumer處理骗露。
ZooKeeper
在一個Kafka集群中岭佳,ZooKeeper擔任分布式服務協(xié)調(diào)的作用,Broker和Consumer直接依賴于ZooKeeper才能正常工作:
? Broker與ZooKeeper:所有Broker會向ZooKeeper注冊萧锉,將自己的位置珊随、健康狀態(tài)、維護的topic柿隙、partition等信息寫入ZooKeeper叶洞,以便于其他Consumer可以發(fā)現(xiàn)和獲取這些數(shù)據(jù),當一個Consumer宕掉后优俘,其他Consumer會通過ZooKeeper發(fā)現(xiàn)這一故障京办,并自動分攤該Consumer的負載,進而觸發(fā)相應的容錯機制帆焕。
? Consumer與ZooKeeper: Consumer Group通過ZooKeeper保證內(nèi)部各個Consumer的負載均衡惭婿,并在某個Consumer或Broker出現(xiàn)故障時,重新分攤負載叶雹;Consumer(僅限于high-level API财饥,如果是low-level API,用戶需自己保存和恢復offset)會將最近所獲取消息的offset寫入ZooKeeper折晦,以便出現(xiàn)故障重啟后钥星,能夠接著故障前的斷點繼續(xù)讀取數(shù)據(jù)。