Spark中的多任務(wù)處理
Spark的一個非常常見的用例是并行運行許多作業(yè)购桑。 構(gòu)建作業(yè)DAG后认烁,Spark將這些任務(wù)分配到多個Executor上并行處理服傍。
但這并不能幫助我們在同一個Spark應(yīng)用程序中同時運行兩個完全獨立的作業(yè)延都,例如同時從多個數(shù)據(jù)源讀取數(shù)據(jù)并將它們寫到對應(yīng)的存儲蒸甜,或同時處理多個文件等棠耕。
每個spark應(yīng)用程序都需要一個SparkSession(Context)來配置和執(zhí)行操作余佛。 SparkSession對象是線程安全的,可以根據(jù)需要傳遞給你的Spark應(yīng)用程序窍荧。
一個順序作業(yè)的例子
假設(shè)我們有一個spark 2.x應(yīng)用程序辉巡,負責將幾個數(shù)據(jù)寫入到HDFS中。
import org.apache.spark.sql.SparkSession
object FancyApp {
def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
.builder
.appName("parjobs")
.getOrCreate()
val df = spark.sparkContext.parallelize(1 to 100).toDF
doFancyDistinct(df, "hdfs:///dis.parquet")
doFancySum(df, "hdfs:///sum.parquet")
}
def doFancyDistinct(df: DataFrame, outPath: String) = df.distinct.write.parquet(outPath)
def doFancySum(df: DataFrame, outPath: String) = df.agg(sum("value")).write.parquet(outPath)
}
這個程序看起來沒有什么問題蕊退,Spark將按順序執(zhí)行兩個動作郊楣。但這兩個動作是獨立, 我們可以同時執(zhí)行它們瓤荔。
一個有缺陷的并發(fā)作業(yè)的例子
如果你快速的在網(wǎng)上搜索一下 “scala異步編程”痢甘,你就會被引到Scala Future這個解決方案中。
例如以下為一個并行處理RDD的例子:
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
val n: Int = 2
val files: Array[String] = ['/tmp/test1.csv','/tmp/test2.csv']
val rdds = files.map(f => pipeline(f, n))
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = Future {
df.rdd.foreach(_ => ()) // Force computation
df
}
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
我們只要根據(jù)搜索到的文檔中提供的例子修改一下茉贡,就會得到以下類似內(nèi)容:
import org.apache.spark.sql.SparkSession
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FancyApp {
def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
.builder
.appName("parjobs")
.getOrCreate()
val df = spark.sparkContext.parallelize(1 to 100).toDF
val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
val taskB = doFancySum(df, "hdfs:///sum.parquet")
// Now wait for the tasks to finish before exiting the app
Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
}
def doFancyDistinct(df: DataFrame, outPath: String) = Future { df.distinct.write.parquet(outPath) }
def doFancySum(df: DataFrame, outPath: String) = Future { df.agg(sum("value")).write.parquet(outPath) }
}
ExecutionContext是用于==管理并行操作的Context==塞栅。 實際的線程模型可以由開發(fā)者明確提供,也可以使用全局默認值(這是一個 ForkJoinPool )腔丧,就像我們在上面的代碼中使用的一樣:
import scala.concurrent.ExecutionContext.Implicits.global
使用Global execution context 的問題在于它并不知道我們是在群集上啟動Spark作業(yè)放椰。 默認情況下,Global execution context 提供==與運行代碼的系統(tǒng)中的處理器相同數(shù)量的線程==愉粤。 在我們的Spark應(yīng)用程序中砾医,它將與Driver上的處理器相同數(shù)量的線程。
一個優(yōu)化過的并發(fā)作業(yè)的例子
我們需要控制我們的線程策略衣厘,更一般化地編寫我們的程序如蚜,以便可以在不同的線程模型中重用它們。
例如以下是我們從重寫的函數(shù)影暴,它將允許我們精確控制execution context 來管理調(diào)用函數(shù)時提供的線程數(shù)错邦。 例子中添加的隱式參數(shù)將允許調(diào)用的代碼指定運行函數(shù)時使用哪個ExecutionContext。
def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
df.distinct.write.parquet(outPath)
}
現(xiàn)在讓我們提出一個比默認的Global execution context更好的策略型宙。我們希望能夠指定我們想要的并行度撬呢。
import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._
object FancyApp {
def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
.builder
.appName("parjobs")
.getOrCreate()
// Set number of threads via a configuration property
val pool = Executors.newFixedThreadPool(5)
// create the implicit ExecutionContext based on our thread pool
implicit val xc = ExecutionContext.fromExecutorService(pool)
val df = spark.sparkContext.parallelize(1 to 100).toDF
val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
val taskB = doFancySum(df, "hdfs:///sum.parquet")
// Now wait for the tasks to finish before exiting the app
Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
}
def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
df.distinct.write.parquet(outPath)
}
def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
df.agg(sum("value")).write.parquet(outPath)
}
}
在這個例子中,我們定義了Execution context變量xc妆兑,含有五個線程魂拦。
參考資料
Spark Parallel Job Execution
How to run concurrent jobs(actions) in Apache Spark using single spark context
Processing multiple files as independent RDD's in parallel