【Spark 精選】源碼閱讀 — Scala 高級語法

1.case 模式匹配

case 模式匹配的使用樣例

# 1. 匹配特定的數(shù)據(jù)類型
def processValue(value: Any): String = value match {
  case s: String => s"String: $s"
  case i: Int => s"Int: $i"
  case d: Double => s"Double: $d"
  case _ => "Other"

val result1 = processValue("Hello") // 輸出 "String: Hello"
val result2 = processValue(123) // 輸出 "Int: 123"
val result3 = processValue(3.14) // 輸出 "Double: 3.14"
val result4 = processValue(true) // 輸出 "Other"

# 2.根據(jù)不同的輸入執(zhí)行不同的邏輯
def processInput(input: Any): String = input match {
  case 1 => "One"
  case "two" => "Two"
  case _: Double => "A Double"
  case _ => "Other"

val result1 = processInput(1) // 輸出 "One"
val result2 = processInput("two") // 輸出 "Two"
val result3 = processInput(3.14) // 輸出 "A Double"
val result4 = processInput(true) // 輸出 "Other"

# 3.解構(gòu)數(shù)據(jù)結(jié)構(gòu)
val tuple = (1, "two", 3.14)
val (a, b, c) = tuple // 解構(gòu)元組
println(a) // 輸出 1
println(b) // 輸出 "two"
println(c) // 輸出 3.14

val list = List(1, 2, 3, 4, 5)
val result = list.map {
  case x if x % 2 == 0 => "Even"
  case _ => "Odd"
println(result) // 輸出 List("Odd", "Even", "Odd", "Even", "Odd")

spark-sql 源碼中的 case 模式匹配AnalyzerResolveRelations

  object ResolveRelations extends Rule[LogicalPlan] {
    def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
      case view @ View(desc, isTempView, _, child) if !child.resolved =>
          // 省略 ...
      case p @ SubqueryAlias(_, view: View) =>
        p.copy(child = resolveViews(view))
      case _ => plan

2.case 類

case 類的使用場景

  • 數(shù)據(jù)傳遞case class可以用于封裝一組相關(guān)的數(shù)據(jù)拔鹰,并且很容易進行復(fù)制和傳遞惠豺。
  • 模式匹配case class可以與模式匹配結(jié)合使用,方便地根據(jù)不同的數(shù)據(jù)類型進行處理搀暑。
// 定義一個 case class
case class Person(name: String, age: Int)

// 創(chuàng)建一個 Person 對象
val person1 = Person("Alice", 25)

// 復(fù)制一個 Person 對象,并修改部分屬性
val person2 = person1.copy(age = 30)

// 打印 person1 和 person2
println(person1) // 輸出 Person(Alice,25)
println(person2) // 輸出 Person(Alice,30)

// 模式匹配
def processPerson(person: Person): String = person match {
  case Person(name, age) if age < 30 => s"$name is young"
  case Person(name, age) if age >= 30 => s"$name is old"

val result1 = processPerson(person1) // 輸出 "Alice is young"
val result2 = processPerson(person2) // 輸出 "Alice is old"

spark-sql 源碼中的 case 類LogicalPlan 的子類 Filter

// case 類 Filter
case class Filter(condition: Expression, child: LogicalPlan)
  extends OrderPreservingUnaryNode with PredicateHelper {
  override def output: Seq[Attribute] = child.output

  override def maxRows: Option[Long] = child.maxRows

  override protected lazy val validConstraints: ExpressionSet = {
    val predicates = splitConjunctivePredicates(condition)

// case 模式匹配
// EliminateOuterJoin的apply
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
      val newJoinType = buildNewJoinType(f, j)
      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))


spark-sql 源碼中的嵌套函數(shù)QueryPlan 的嵌套函數(shù) mapExpressions

  def mapExpressions(f: Expression => Expression): this.type = {
    var changed = false

     // 嵌套函數(shù)A
    @inline def transformExpression(e: Expression): Expression = {
      val newE = CurrentOrigin.withOrigin(e.origin) {
      if (newE.fastEquals(e)) {
      } else {
        changed = true
    // 嵌套函數(shù)B
    def recursiveTransform(arg: Any): AnyRef = arg match {
      case e: Expression => transformExpression(e)    // 執(zhí)行嵌套函數(shù)B
      case Some(value) => Some(recursiveTransform(value))
      case m: Map[_, _] => m
      case d: DataType => d // Avoid unpacking Structs
      case stream: Stream[_] => stream.map(recursiveTransform).force
      case seq: Iterable[_] => seq.map(recursiveTransform)
      case other: AnyRef => other
      case null => null
    // 執(zhí)行嵌套函數(shù)A
    val newArgs = mapProductIterator(recursiveTransform)

    if (changed) makeCopy(newArgs).asInstanceOf[this.type] else this



  • 過濾和轉(zhuǎn)換數(shù)據(jù):偏函數(shù)可以用于過濾和轉(zhuǎn)換數(shù)據(jù)膀哲。通過定義適當(dāng)?shù)臈l件啃沪,可以使用偏函數(shù)來過濾掉不需要的數(shù)據(jù)或者將數(shù)據(jù)進行轉(zhuǎn)換。
  • 對特定輸入進行處理:偏函數(shù)可以對特定類型或特定條件的輸入進行處理憔晒,而對其他輸入不進行處理。
// 定義一個偏函數(shù)蔑舞,只處理正整數(shù)和字符串類型的輸入
val partialFunc: PartialFunction[Any, String] = {
  case i: Int if i > 0 => s"Positive integer: $i"
  case s: String => s"String: $s"

// applyOrElse接口接受兩個參數(shù):輸入值和默認值拒担。如果偏函數(shù)對輸入值進行定義,則返回偏函數(shù)的結(jié)果攻询;如果偏函數(shù)沒有對輸入值進行定義从撼,則返回默認值。
println(partialFunc.applyOrElse(10, (x: Any) => "Not defined")) // 輸出:Positive integer: 10
println(partialFunc.applyOrElse(-5, (x: Any) => "Not defined")) // 輸出:Not defined
println(partialFunc.applyOrElse("hello", (x: Any) => "Not defined")) // 輸出:String: hello
println(partialFunc.applyOrElse(3.14, (x: Any) => "Not defined")) // 輸出:Not define

spark-sql 源碼中的偏函數(shù)
AnalysisHelper 的函數(shù) resolveOperatorsUp

  // 跳過已經(jīng)分析過的rule钧栖,并遞歸獲取子節(jié)點
  def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
    if (!analyzed) {
      AnalysisHelper.allowInvokingTransformsInAnalyzer {
        val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
        if (self fastEquals afterRuleOnChildren) {
          CurrentOrigin.withOrigin(origin) {
            // rule是偏函數(shù)陨溅,applyOrElse會執(zhí)行這個函數(shù)
           // 如果偏函數(shù)對輸入值進行定義仆救,則返回偏函數(shù)的結(jié)果爸业;如果偏函數(shù)沒有對輸入值進行定義窒盐,則返回默認值。
            rule.applyOrElse(self, identity[LogicalPlan])
        } else {
          CurrentOrigin.withOrigin(origin) {
            val afterRule = rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
    } else {

Optimizer 的函數(shù) ConvertToLocalRelation

// Optimizer 
object ConvertToLocalRelation extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {   // 下面函數(shù)的內(nèi)容整體是偏函數(shù)潭陪,作為transform的入?yún)?    case Project(projectList, LocalRelation(output, data, isStreaming))
        if !projectList.exists(hasUnevaluableExpr) =>
      val projection = new InterpretedMutableProjection(projectList, output)
      LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)

    case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
      LocalRelation(output, data.take(limit), isStreaming)

    case Filter(condition, LocalRelation(output, data, isStreaming))
        if !hasUnevaluableExpr(condition) =>
      val predicate = Predicate.create(condition, output)
      LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming)
 // 省略...

  // TreeNode
  def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {

  def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
    val afterRule = CurrentOrigin.withOrigin(origin) {
      // rule是偏函數(shù)雄妥,applyOrElse會執(zhí)行這個函數(shù)
      // 如果偏函數(shù)對輸入值進行定義最蕾,則返回偏函數(shù)的結(jié)果;如果偏函數(shù)沒有對輸入值進行定義老厌,則返回默認值瘟则。
      rule.applyOrElse(this, identity[BaseType])

    if (this fastEquals afterRule) {
      // 獲取子節(jié)點,遞歸執(zhí)行transformDown
    } else {



  • 部分應(yīng)用:柯里化函數(shù)可以通過部分應(yīng)用的方式枝秤,先給函數(shù)提供部分參數(shù)壹粟,然后返回一個接受剩余參數(shù)的新函數(shù)。這樣可以在不同的上下文中復(fù)用同一個函數(shù)宿百。
  • 函數(shù)組合:柯里化函數(shù)可以方便地進行函數(shù)組合,將多個函數(shù)串聯(lián)起來洪添。通過將一個函數(shù)的輸出作為另一個函數(shù)的輸入垦页,可以構(gòu)建更復(fù)雜的函數(shù)邏輯。

如下案例中干奢,add是一個柯里化函數(shù)痊焊,它接受兩個參數(shù) xy。通過部分應(yīng)用的方式忿峻,我們先給 add 函數(shù)提供一個參數(shù) 1薄啥,然后返回一個新的函數(shù) addOne,這個新函數(shù)只接受一個參數(shù) y逛尚。最后垄惧,我們調(diào)用 addOne 函數(shù),傳遞剩余參數(shù) 2绰寞,得到結(jié)果 3

def add(x:Int)(y:Int): Int = x + y

val addOne = add(1) _ // 部分應(yīng)用到逊,返回一個接受一個參數(shù)的新函數(shù)

val result = addOne(2) // 調(diào)用新函數(shù),傳遞剩余參數(shù)

println(result) // 輸出 3

spark-sql 源碼中的柯里化函數(shù)ParseDriver 的方法 parse

  // 參數(shù)1是command滤钱,參數(shù)2是toResult
  protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")
    // 使用參數(shù)command
    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))

    val tokenStream = new CommonTokenStream(lexer)
    val parser = new SqlBaseParser(tokenStream)
    // 使用參數(shù)command 
    parser.addParseListener(UnclosedCommentProcessor(command, tokenStream))
    // 省略 ...
    try {
      try {
        // first, try parsing with potentially faster SLL mode
       // 使用參數(shù)toResult
       // parser里面包含參數(shù)command觉壶,parser再作為toResult函數(shù)的入?yún)?        toResult(parser)
      catch {
        case e: ParseCancellationException =>
          // 省略...
          // 使用參數(shù)toResult
          // parser里面包含參數(shù)command,parser再作為toResult函數(shù)的入?yún)?          toResult(parser)
    // 省略 ...

6.基于 Product 實現(xiàn)的 TreeNode

Product 的使用樣例

  • 元組操作Product 接口為元組類提供了一些常用的方法件缸,如 productElement 用于獲取元素值铜靶,productArity 用于獲取元素數(shù)量。
  • 模式匹配Product 接口可以與模式匹配結(jié)合使用他炊,方便地對元組進行解構(gòu)和處理争剿。
// 導(dǎo)入Product接口
import scala.Product

// 定義一個元組類,繼承自Product接口
class MyTuple(val first: Int, val second: String) extends Product {
  // 實現(xiàn)Product接口的抽象方法
  def productElement(n: Int): Any = n match {
    case 0 => first
    case 1 => second
    case _ => throw new IndexOutOfBoundsException(s"Tuple index out of range: $n")

  // 實現(xiàn)Product接口的抽象方法
  def productArity: Int = 2

  // 重寫toString方法
  override def toString: String = s"MyTuple($first, $second)"

// 創(chuàng)建一個MyTuple對象
val tuple = new MyTuple(42, "Hello")

// 獲取元素值
val firstElement = tuple.productElement(0)
val secondElement = tuple.productElement(1)

// 獲取元素數(shù)量
val arity = tuple.productArity

// 打印結(jié)果
println(firstElement)  // 輸出 42
println(secondElement) // 輸出 "Hello"
println(arity)         // 輸出 2
println(tuple)         // 輸出 "MyTuple(42, Hello)"

spark-sql 源碼中的 Product:基于 Product 實現(xiàn)的 TreeNode

abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
  // 省略 ...
  protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {    // 入?yún)⑹菂?shù)類型為B的scala function痊末,返回值是Array[B]
    val arr = Array.ofDim[B](productArity)
    var i = 0
    while (i < arr.length) {
      arr(i) = f(productElement(i))    // productElement會執(zhí)行傳入的函數(shù)mp秒梅,然后f會執(zhí)行apply
      i += 1
  // 省略 ...

  private def mapChildren(
      f: BaseType => BaseType,
      forceCopy: Boolean): BaseType = {
    // 省略...

    val newArgs = mapProductIterator {    // mapProductIterator 的入?yún)⑹窍旅娴暮瘮?shù)mp
      case arg: TreeNode[_] if containsChild(arg) =>
        // 省略...
      case Some(arg: TreeNode[_]) if containsChild(arg) =>
        // 省略...
      case m: Map[_, _] => m.mapValues {
        // 省略...
      }.view.force.toMap // `mapValues` is lazy and we need to force it to materialize
      case d: DataType => d // Avoid unpacking Structs
      case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
      case args: Iterable[_] => args.map(mapChild)
      case nonChild: AnyRef => nonChild
      case null => null
    // 省略...
