1.研究的重點和方法
本文主要就是研究rdd的執(zhí)行流程媳叨,主要重點是如何形成rdd的圖譜抖所,以及如何形成遞歸的迭代器進行數(shù)據(jù)的迭代計算的饱搏;
筆者對與rdd的執(zhí)行的流程的研究了網(wǎng)絡(luò)上的相關(guān)文章叭披、相關(guān)的rdd的代碼的閱讀辽狈,簡單的樣例代碼的調(diào)試慈参,以及原理代碼的編寫;其中樣例代碼的調(diào)試個人覺得非常重要刮萌,可以清晰的實踐一下你的代碼的執(zhí)行流程懂牧。
2.RDD圖譜的形成
2.1編寫簡單的樣例代碼
因為重點是研究rdd的執(zhí)行流程,樣例代碼只設(shè)計了窄依賴的變換尊勿,因為其流程相對于款依賴相對簡單僧凤。
文件系統(tǒng)我們只讀取本地文件
集群模式為本機模式,為了調(diào)試方便元扔,只啟動一個執(zhí)行線程
樣例代碼如下:
var sparkConf = new SparkConf().setAppName("Test").setMaster("local[1]"); //啟動一個線程
var sc = new SparkContext(sparkConf);
//讀取本地文件躯保、并調(diào)用兩次簡單的窄依賴變換
sc.textFile("/home/jack/t.txt")
.map((_, 1))
.filter(_._2 == 1)
.collect(); //通過collect 提交真正的action,開始執(zhí)行任務
2.2 SparkContext.textFile
執(zhí)行代碼進行調(diào)試模式澎语,斷點進入textFile途事,來看看textFile的執(zhí)行
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
繼續(xù)跟蹤hadoopFile方法,可以看到hadoopFile關(guān)鍵邏輯是實例化了一個hadoopRDD類擅羞,并返回尸变;
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
基于以上,textFile的關(guān)鍵代碼是調(diào)用了hadoopFile方法减俏,hadoopFile方法返回一個HadoopRDD召烂,而hadoopRDD.map又返回了一個MapPartitionRDD,最終textFile最終返回了這個RDD。
再看hadoopRDD.map 的邏輯
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
??HadoopRDD的map方法是其父類RDD實現(xiàn)的娃承,在map方法中返回了一個新的MarPartitionRDD奏夫,并將當前RDD實例和用戶的map函數(shù)傳遞進去。
??MapPartitionRDD與HadoopRDD同樣都是繼承于RDD類历筝,MapPartitionRDD的特點是會通過傳遞進來的參數(shù)酗昼,持有父RDD和用戶的執(zhí)行函數(shù)。
??而textFile為什么沒有直接返回HadoopRDD梳猪,我覺得應該是HadoopRDD的迭代器是一個key,Value迭代器麻削,其中key為行所在的字節(jié)offset值、value是行值春弥,而我們一般來說只關(guān)心value值呛哟,所以其做了一次map變換,只返回了value的迭代惕稻。
??在RDD類中竖共,同樣有filter、flatMap俺祠、distinct 等窄依賴等實現(xiàn)公给,基本套路都是生成一個新的MapPartitionRDD借帘,并將當前的RDD作為父RDD傳遞給新的RDD。
??綜上所述淌铐, SparkContext.textFile肺然,在核心流程上既是返回了new HadoopRDD().map(),也就是一個MapPartitionRDD腿准。
2.3 RDD.map际起、filter等變換方法
??如上文所述, map吐葱、filter街望、flatmap 等方法的思路其實大同小異,都是返回一個攜帶前置父RDD實例以及用戶傳遞的具體transform方法的MapParittionRDD實例弟跑。而這里要重點強調(diào)的是三個點:
-
MarPartitionRDD要持有前置的父RDD
MapPartitionRDD會持有前置的灾前、變換前的RDD實例,這樣形成一個可以向前追溯的RDD圖譜孟辑,為后續(xù)的迭代計算提供依據(jù)哎甲。 -
MapParitionRDD要持有用戶傳遞的具體transform方法
具體算法就也會在MapPartitionRDD中存儲。這樣后續(xù)的迭代變化時能夠回溯計算饲嗽; - map炭玫、filter、flatMap的具體迭代算法實現(xiàn)在scala的 Iterator中
以map方法為例展開講述以上三點貌虾,我們看到返回的MapPartitionRDD的構(gòu)造形式是:
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
??this即當前調(diào)用map的父RDD實例吞加,這個較容易理解。
??cleanF是用戶傳遞的map具體用戶函數(shù)酝惧, 但(context,pid,iter)=>iter.map(cleanF)這個形式榴鼎,如果scala不那么熟悉的話,一眼望過去著實看得有點吃力晚唇,其實這個類似于與c語言中的函數(shù)指針、java中的函數(shù)接口盗似,相當與定義了一個方法的類型哩陕;例如在scala中 var f:(Int)=>Int,即定義了一個方法類型:入?yún)镮nt赫舒,返回值為Int悍及,翻到MapPartitionRDD的源碼,可以看到第二個參數(shù)的定義為為 f: (TaskContext, Int, Iterator[T]) => Iterator[U], 那么這個f函數(shù)變量到底干了啥呢接癌,結(jié)合map具體傳遞形式心赶,我們可以看到f就是一個函數(shù),內(nèi)部調(diào)用了第三個參數(shù)傳遞進來的迭代器(Iterator[T])缺猛, 調(diào)用到Iterator的map方法缨叫,map方法的參數(shù)是用戶的cleanF椭符,并返回一個新的迭代器(Iterator[U]); 這些形成的transform的方法最終也會形成一個調(diào)用鏈耻姥, 最終由ResultTask的runTask從rdd的最后一個節(jié)點開始調(diào)用销钝,這個下文再詳細介紹。
??關(guān)于迭代器算法的實現(xiàn)琐簇,最終都在Iterator中蒸健,這些實現(xiàn)并非Spark框架中的,而都是scala語言框架的婉商,我想這也是spark首選scala來實現(xiàn)其理念的原因吧似忧。Iterator的map、filter丈秩、flatMap 等方法同樣接收用戶的方法盯捌,并返回一個新的Iterator,持續(xù)的進行迭代癣籽,以下就是Iterator中map的實現(xiàn)挽唉,可以看出他的核心代碼就是返回一個迭代器,而next方法的返回值筷狼, 是把前置的Iterator (self)的next方法返回值傳遞給用戶定義的回調(diào)方法(f)進行返回瓶籽,從而形成了一個遞歸的迭代調(diào)用:
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
def next() = f(self.next())
}
??綜上所述,RDD通過不斷的map埂材、filter塑顺、flatMap等變換,生成了一系列保存前置rdd和用戶變換函數(shù)的新的RDD俏险, 這些rdd按照調(diào)用順序形成了一個有序的圖譜严拒,等待最后的action動作最終的執(zhí)行,如下圖所示竖独。在《spark性能調(diào)優(yōu)與原理分析》一書中裤唠,作者把該過程做了一個我認為比較形象的比喻:rdd的圖譜形成的過程就像一個一步步形成的菜譜,每次transform調(diào)用都會記錄rdd的一次轉(zhuǎn)換莹痢,菜譜會記錄做菜的先后步驟种蘸,但只有要吃這道菜的時候,才會按照菜譜進行操作竞膳。而吃菜的動作就是后續(xù)我們要講述的collect航瞭、foreach等action操作觸發(fā)的數(shù)據(jù)迭代計算的過程。
3. RDD數(shù)據(jù)的遞歸迭代計算
??在rdd的圖譜形成后坦辟,終歸我們是要調(diào)用一些action動作函數(shù)觸發(fā)數(shù)據(jù)的迭代計算的刊侯,最終返回我們的數(shù)據(jù)結(jié)果。
3.1 觸發(fā)計算的時機和位置
我們通過實際的方法調(diào)用的堆棧中最終找到锉走,觸發(fā)數(shù)據(jù)計算的位置是在 ResultTask中的runTask方法中滨彻;
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
runTask的最后藕届,會調(diào)用最后一個子rdd的iterator方法,從而開始調(diào)用鏈條的頭端iterator疮绷。
3.2 iterator和compute方法
??iterator 方法是由RDD類實現(xiàn)的翰舌,其返回一個Iterator迭代器,iterator的內(nèi)部經(jīng)過幾次判定最終還是調(diào)用到rdd的compute方法冬骚;
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
而翻看到compute方法椅贱,發(fā)現(xiàn)其是個抽象方法,約定了其返回值類型是Iterator只冻,其實現(xiàn)是下方給MapPartitionRDD或者 HadoopRDD來實現(xiàn)的庇麦。
-
MapPartitionRDD的 compute
我們先來看MapPartitionRDD的實現(xiàn)
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
??可以看到調(diào)用了其保存的函數(shù)變量f, 而重要的第三個參數(shù)是firstParent.iterator()喜德, 其實就是調(diào)用了父RDD對象的iterator()方法山橄,而父iterator的調(diào)用就是父rdd的compute,例如如果是map調(diào)用舍悯,那就是iter.map(cleanF)航棱,f(,,firstParent.iterator)的展開就是 firstParent[T].iterator().map(clieanF);這樣依次的觸發(fā)上一級rdd的iterator萌衬,形成了一個迭代式的調(diào)用饮醇,最終得到返回的數(shù)據(jù)。
-
HadoopRDD的 compute
??這時我們肯定會想秕豫,sc.TextFile第一次創(chuàng)建的HadoopRDD的compute返回的迭代器是什么呢朴艰?這個迭代器總歸要去讀取數(shù)據(jù)源了吧;我們翻開HadoopRDD的compute實現(xiàn):
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
//....................刪除了很多代碼
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
}
(key, value)
}
}
new InterruptibleIterator[(K, V)](context, iter)
}
??果然compute方法不一樣了混移, 代碼比較多祠墅,刪除了很多,但其核心實現(xiàn)還是返回一個Iterator歌径;而這個Iterator的getNext方法毁嗦,是利用一個LineRecorderReader,不斷的去讀取數(shù)據(jù)源中的行來實現(xiàn)的回铛。也即是迭代一次金矛,就去數(shù)據(jù)源讀取一行,直到所在分區(qū)數(shù)據(jù)結(jié)束勺届,具體的數(shù)據(jù)的行數(shù)據(jù)的迭代原理可以參看hadoop的LineRecordReader、UncompressedSplitLineReader娶耍、SplitLineReader以及LineReader等實現(xiàn)免姿。
3.2 迭代計算數(shù)據(jù)
??最后我們可以看到數(shù)據(jù)的迭代計算是與圖譜的形成反向觸發(fā)的的,圖譜形成后榕酒,action動作觸發(fā)的計算由最后的子rdd開始觸發(fā)胚膊,通過iterator和compute方法故俐,依次向前獲取數(shù)據(jù),最終到達首個hadoopRdd的compute紊婉,讀取文件中的一行药版,這樣形成一個類似迭代計算的流,如圖所示:
而迭代的計算也有一個非常巨大的優(yōu)勢,既是我們在讀取文件的時候不用一次性的把文件都加載都內(nèi)存中,而是形成一個不斷的計算流一條條的加載都许、計算逐工、保存,節(jié)省了巨量的內(nèi)存对嚼,這種思路是非常值得我們參考和學習的。
4.使用java來實現(xiàn)一個簡單的rdd迭代調(diào)用
4.1 基本思路
??在研究完spark的rdd的調(diào)用模式后,發(fā)現(xiàn)其實現(xiàn)思路還是挺不錯的剩盒,再研究過程中因為spark的rdd部分都是用scala語言編寫的,因為scala的不熟悉帶來了一些難度慨蛙,在研究的過程中其實筆者還采取了一個辦法就是把相關(guān)的代碼看的大體明白后辽聊,用java又實現(xiàn)了一個簡約模式,當java版的rdd成功運行后期贫,也對其模式調(diào)用有了一些更深的理解跟匆。
4.2 基本實現(xiàn)類
java 簡約版的rdd實現(xiàn)包括了如下的組件:
- Iterable類, 其實就是iterator唯灵,模仿了scala的Iterator贾铝,參考其實現(xiàn)了最基礎(chǔ)的map、filter和foreach
- RDD類埠帕, java版rdd垢揩,實現(xiàn)了spark中的RDD,實現(xiàn)RDD的map敛瓷、filter和foreach叁巨,實現(xiàn)iterator,就是返回compute呐籽, 留著 compute作為抽象方法
- MapRDD類锋勺, 模仿MapPartitionRDD,重寫了compute方法
- ListRDD類狡蝶, 模仿HadoopRDD庶橱,不同的是ListRDD的數(shù)據(jù)源是一個ArrayList,compute返回的迭代器是對ArrayList的迭代贪惹。
4.2 實現(xiàn)代碼
Iterable類
import java.util.NoSuchElementException;
public abstract class Iterable<T> {
@FunctionalInterface
interface MapCallback<T1, T2> {
T2 invoke(T1 num);
}
@FunctionalInterface
interface ForeachCallback<T1> {
void invoke(T1 num);
}
@FunctionalInterface
interface FilterCallback<T1> {
boolean invoke(T1 t);
}
public <RT> Iterable<RT> map(MapCallback<T, RT> callback) {
Iterable<T> self = Iterable.this;
return new Iterable<RT>() {
@Override
public boolean hasNext() {
return self.hasNext();
}
@Override
public RT next() {
return callback.invoke(self.next());
}
};
}
public Iterable<T> filter(FilterCallback<T> callback) {
return new Iterable<T>() {
Iterable<T> self = Iterable.this;
T nextValue = null;
@Override
public boolean hasNext() {
if (self.hasNext() == false) {
nextValue = null;
return false;
}
else {
nextValue = self.next();
while (callback.invoke(nextValue) == false) {
if (self.hasNext() == false) {
nextValue = null;
return false;
}
nextValue = self.next();
}
return true;
}
}
@Override
public T next() {
if (nextValue != null) {
T curr = nextValue;
nextValue = null;
return curr;
}
else if (this.hasNext()) {
T curr = nextValue;
nextValue = null;
return curr;
}
return null;
}
};
}
public void foreach(ForeachCallback<T> callback) {
while (this.hasNext()) {
callback.invoke(this.next());
}
}
public abstract boolean hasNext();
public abstract T next();
}
RDD類
public abstract class RDD<T> {
interface RDDCallback<T1, T2> {
Iterable<T2> invoke(Iterable<T1> it);
}
public Iterable<T> iterator() {
return compute();
}
public <U> RDD<U> map(Iterable.MapCallback<T, U> callback) {
return new MapRDD<U, T>(this, (it -> it.map(callback)));
}
public RDD<T> filter(Iterable.FilterCallback<T> callback) {
return new MapRDD<T, T>(this, (it -> it.filter(callback)));
}
public abstract Iterable<T> compute();
public void foreach(Iterable.ForeachCallback<T> callback) {
this.iterator().foreach(callback);
}
}
MapRDD
public class MapRDD<U, T> extends RDD<U> {
RDDCallback<T, U> f;
RDD<T> parent;
public MapRDD(RDD<T> parent, RDDCallback<T, U> f) {
this.parent = parent;
this.f = f;
}
@Override
public Iterable<U> compute() {
Iterable<U> it = f.invoke(parent.iterator());
return it;
}
}
ListRDD
import java.util.Iterator;
import java.util.List;
public class ListRDD<T> extends RDD<T> {
List<T> list;
private ListRDD(List<T> list) {
this.list = list;
}
@Override
public Iterable<T> compute() {
return new Iterable<T>() {
Iterator<T> iterator = list.iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return iterator.next();
}
};
}
public static <T> RDD<T> makeRDD(List<T> list) {
return new ListRDD<>(list);
}
}
ok,下面是主函數(shù)苏章,開始調(diào)用
import java.util.ArrayList;
import java.util.List;
public class JavaMain {
public static void main(String[] args) {
//創(chuàng)建一個List作為數(shù)據(jù)源
List<Integer> list = new ArrayList<>();
for(int i = 0; i <= 5; i++) list.add(i);
//通過list創(chuàng)建一個ListRDD,并依次調(diào)用map、filter和foreach
ListRDD.makeRDD(list)
.map((num)->new Pair<>(String.valueOf(num), num))
.filter(p->p.getValue() > 1)
.foreach((p)->{System.out.println(p.toString());});
}
}
運行后枫绅,輸出結(jié)果
2=2
3=3
4=4
5=5