當(dāng)前Spark Streaming-Streaming Join只支持:
- InnerJoin;
- LeftJoin;
- RightJoin;
整體思路
-
將Join的條件分為:
-
preJoinFilter
- leftSideOnly: 只依賴左表的過濾條件,針對(duì)左表input輸入,先校驗(yàn)該條件刻蚯,如果不滿足該條件一定不會(huì)關(guān)聯(lián)上库继;
- rightSideOnly:只依賴右表的過濾條件殉挽,針對(duì)右表input輸入感耙,先校驗(yàn)該條件就缆,如果不滿足該條件一定不會(huì)關(guān)聯(lián)上扰路;
postJoinFilter:同時(shí)依賴左右表的過濾條件尤溜,在滿足preJoinFilter并同另外一側(cè)表關(guān)聯(lián)后進(jìn)行該過濾;
-
-
將滿足過濾條件的新增左表數(shù)據(jù)跟右表狀態(tài)數(shù)據(jù)做Join(詳細(xì)見代碼)汗唱,同時(shí)更新所有的新增left表數(shù)據(jù)到狀態(tài) 宫莱,將結(jié)果輸出至leftOutputIter,結(jié)果分為兩部分:
關(guān)聯(lián)上并滿足過濾條件的數(shù)據(jù):這部分?jǐn)?shù)據(jù)Inner/Left/Right都是相同的哩罪;
-
關(guān)聯(lián)不上的數(shù)據(jù)授霸,分為兩種情況:
暫時(shí)沒有關(guān)聯(lián)上的巡验,這個(gè)時(shí)候僅僅將左流新增數(shù)據(jù)寫入左表狀態(tài)表,不會(huì)emit數(shù)據(jù)碘耳;
-
永不會(huì)關(guān)聯(lián)上的(不滿足leftSideOnly)显设,針對(duì)Inner/Left/Right產(chǎn)生不同的結(jié)果:
a. LeftJoin: 輸出join with null;
b. Inner/RightJoin: 輸出空;
將滿足過濾條件的新增右表數(shù)據(jù)跟左表狀態(tài)數(shù)據(jù)做Join辛辨,同時(shí)更新所有的新增right表數(shù)據(jù)到狀態(tài)捕捂,同時(shí)會(huì)生成新增左表跟新增右表的關(guān)聯(lián)數(shù)據(jù)(因?yàn)樽蟊淼男略鰯?shù)據(jù)已經(jīng)保留到了左表的狀態(tài)數(shù)據(jù)中),結(jié)果輸出至rightOutputIter(類似于上步驟)斗搞;
針對(duì)InnerJoin的輸出即為leftOutputIter + rightOutputIter;
-
針對(duì)LeftJoin同時(shí)還需要考左表已經(jīng)過期的數(shù)據(jù)指攒,這些數(shù)據(jù)分兩種情況:
- 同右表狀態(tài)數(shù)據(jù)(包括當(dāng)前批次)沒有關(guān)聯(lián):這些數(shù)據(jù)應(yīng)該join with null;
- 同右表狀態(tài)數(shù)據(jù)(包括當(dāng)前批次)有關(guān)聯(lián): 應(yīng)該忽略掉僻焚,因?yàn)檫@些數(shù)據(jù)理論上在某些時(shí)間點(diǎn)上會(huì)Join上允悦,所以不能join with null;
- 同時(shí)這些過期數(shù)據(jù)在該批次會(huì)被清理掉;
針對(duì)RightJoin同時(shí)還需要考左表已經(jīng)過期的數(shù)據(jù)溅呢,類似LeftOuterJoin澡屡;
左右表關(guān)聯(lián)代碼實(shí)現(xiàn)
?
def storeAndJoinWithOtherSide(
otherSideJoiner: OneSideHashJoiner)(
generateJoinedRow: (InternalRow, InternalRow) => JoinedRow):
Iterator[InternalRow] = {
// Step1: 過濾不符合watermark要求的數(shù)據(jù)
val watermarkAttribute = inputAttributes.find(_.metadata.contains(delayKey))
val nonLateRows =
WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match {
case Some(watermarkExpr) =>
val predicate = newPredicate(watermarkExpr, inputAttributes)
inputIter.filter { row => !predicate.eval(row) }
case None =>
inputIter
}
nonLateRows.flatMap { row =>
val thisRow = row.asInstanceOf[UnsafeRow]
// Step2: 如果輸入不滿足preJoinFilter條件,即針對(duì)Left表不滿足只依賴左表的Join條件時(shí):
// - 這種場景下該Row不會(huì)滿足Join的條件咐旧,所以不會(huì)保存到狀態(tài)數(shù)據(jù)中驶鹉;
// - 同時(shí)根據(jù)Join的類型生成關(guān)聯(lián)不上后的數(shù)據(jù);
if (preJoinFilter(thisRow)) {
val key = keyGenerator(thisRow)
// Step3: 從另外一個(gè)狀態(tài)表中獲取關(guān)聯(lián)數(shù)據(jù)進(jìn)行postJoinFilter過濾后铣墨,作為結(jié)果輸出
val outputIter = otherSideJoiner.joinStateManager.get(key).map { thatRow =>
generateJoinedRow(thisRow, thatRow)
}.filter(postJoinFilter)
// Step4: 將滿足條件的狀態(tài)數(shù)據(jù)更新至狀態(tài)結(jié)果中室埋;
val shouldAddToState = // add only if both removal predicates do not match
!stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow)
if (shouldAddToState) {
joinStateManager.append(key, thisRow)
updatedStateRowsCount += 1
}
outputIter
} else {
// 無法關(guān)聯(lián)的數(shù)據(jù),根據(jù)Join的類型生成相應(yīng)的數(shù)據(jù)
joinSide match {
case LeftSide if joinType == LeftOuter =>
Iterator(generateJoinedRow(thisRow, nullRight))
case RightSide if joinType == RightOuter =>
Iterator(generateJoinedRow(thisRow, nullLeft))
case _ => Iterator()
}
}
}
}