利用Akka獲取Spark任務的返回結果

通過spark-submit提交的任務都需要指定Main類作為程序的入口屿良,Main類執(zhí)行結束即Spark任務終結偎箫。如果需要通過外部程序實時向Spark任務提交數(shù)據并獲取結果又該如何呢窒升?

思路很簡單蓝晒,讓Spark任務的Main方法不終止声功,外部程序與Spark任務進行通信,交互數(shù)據仔引。

通信方式很多,比如Socket褐奥,netty或者內置Tomcat,Jetty等咖耘,不過考慮編碼的快捷,通過Akka是比較不錯的選擇撬码。

開發(fā)分為2部分儿倒。1.編寫Spark任務,該部分會提交到Spark集群中蚕泽。2.外部調用代碼吊骤,該部分模擬客戶端代碼丽惭。2者食用Akka Actor進行通信行您。

先看Spark任務部分

SparkConfig 定義SparkContext對象

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SparkConfig {
  val conf = new SparkConf().setAppName("testSpark")
  val sc = new SparkContext(conf)
}

DataService 作為調用Spark RDD操作的業(yè)務類瞧挤。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.sam.spark.demo.data.config.SparkConfig
import scala.collection.mutable.ArrayBuffer


class DataService {

  def handler(list: ArrayBuffer[String]) : String = {
    val array = SparkConfig.sc.parallelize(list).max()
    array
  }
}

Worker Akka的Actor對象龄砰,接收外部入參肛宋,調用DataService對象恋捆,并返回結果

import akka.actor.Actor
import org.slf4j.LoggerFactory
import com.sam.spark.demo.data.service.DataService
import com.sam.spark.demo.akka.msg.TextMessage
import java.util.UUID
import scala.collection.mutable.ArrayBuffer

class Worker extends Actor {
  
  val dataService = new DataService()
  
  def receive = {
    case x: ArrayBuffer[String] => {
      val tm = new TextMessage()
      tm.msg = dataService.handler(x)
      sender ! tm
    }
  }
}

TextMessage 作為返回的消息對象

class TextMessage extends Serializable {
  
  var msg : String = null
}

AkkaConfig Actor配置類,創(chuàng)建Worker對象

import akka.actor.ActorSystem
import akka.actor.Props
import com.typesafe.config.ConfigFactory

object AkkaConfig {

  val system = ActorSystem("ReactiveEnterprise",ConfigFactory.load().getConfig("serverSystem"))

  val workerRef = system.actorOf(Props[Worker], "worker")
}

程序入口類

import com.sam.spark.demo.akka.AkkaConfig
import com.sam.spark.demo.data.config.SparkConfig
import scala.concurrent.duration.Duration
import scala.concurrent.Await
import java.util.concurrent.TimeUnit

object AppStart {
  def main(args: Array[String]): Unit = {
    SparkConfig
    AkkaConfig
  }
}

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.3</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.5</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>

    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.0</version>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>Main類名</mainClass>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>reference.conf</resource>
                            </transformer>
                        </transformers>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Akka配置文件

serverSystem {
    akka {
        actor {
            provider = "akka.remote.RemoteActorRefProvider"
            default-dispatcher {
                throughput = 2
            }
            
            serializers {
                java = "akka.serialization.JavaSerializer"
            }
            
            serialization-bindings {
                "需要序列化的消息類名" = java
            }
        }
        remote { 
            enabled-transports = ["akka.remote.netty.tcp"] 
            netty.tcp { 
                hostname = "Akka Remote服務地址" 
                port = Akka Remote端口
            } 
        }
    }
}

打包

mvn clean scala:compile package -DskipTests=true

發(fā)布到Spark集群

./spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class Main類名 --master spark://Spark Master地址 ./spark.demo-0.0.1-SNAPSHOT.jar
再看客戶端實現(xiàn)

本地Actor 獲取遠程ActorRef并發(fā)送消息

import akka.actor.Actor
import akka.actor.ActorSelection
import com.sam.spark.demo.akka.msg.TextMessage
import scala.collection.mutable.ArrayBuffer

class Client extends Actor {
  
  var remoteActor : ActorSelection = context.actorSelection("akka.tcp://ReactiveEnterprise@10.16.64.146:2555/user/processManagers/worker")

  override def receive: Receive = {
    case msg: ArrayBuffer[String] => {
      remoteActor ! msg
    }
    case msg: TextMessage => {
      println(msg.msg)
    }
  }
}

本地Main類 模擬向遠端Actor發(fā)送消息

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import akka.actor.Props
import akka.pattern.Patterns
import scala.concurrent.duration.Duration
import scala.concurrent.Await
import akka.util.Timeout
import java.util.concurrent.TimeUnit
import java.util.UUID
import scala.collection.mutable.ArrayBuffer

object ClientStart {
  def main(args: Array[String]): Unit = {

    val serverSystem = ActorSystem("clientSystem", ConfigFactory.load().getConfig("clientSystem"))
    val clientRef = serverSystem.actorOf(Props[Client], "client")
    

    while (true) {
      var list = new ArrayBuffer[String]
      for (i <- 1 to 100) {
        list += UUID.randomUUID().toString()
      }
      clientRef ! list
      Thread.sleep(500)
    }

    //    val future = Patterns.ask(clientRef, "world", Timeout.apply(10L, TimeUnit.SECONDS));
    //    val result = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
    //    println(result)
  }
}

Akka配置文件

clientSystem {
    akka {
        actor {
            provider = "akka.remote.RemoteActorRefProvider"
            default-dispatcher {
                throughput = 2
            }
            
            serializers {
                java = "akka.serialization.JavaSerializer"
            }
            
            serialization-bindings {
                "需要序列化的消息類名" = java
            }   
        }
    }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末溉瓶,一起剝皮案震驚了整個濱河市急鳄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌堰酿,老刑警劉巖疾宏,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異触创,居然都是意外死亡坎藐,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門哼绑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來岩馍,“玉大人,你說我怎么就攤上這事抖韩≈鳎” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵茂浮,是天一觀的道長双谆。 經常有香客問我,道長席揽,這世上最難降的妖魔是什么顽馋? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮幌羞,結果婚禮上寸谜,老公的妹妹穿的比我還像新娘。我一直安慰自己属桦,他們只是感情好熊痴,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著聂宾,像睡著了一般愁拭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上亏吝,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天岭埠,我揣著相機與錄音,去河邊找鬼。 笑死惜论,一個胖子當著我的面吹牛许赃,可吹牛的內容都是我干的。 我是一名探鬼主播馆类,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼混聊,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了乾巧?” 一聲冷哼從身側響起句喜,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎沟于,沒想到半個月后咳胃,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡旷太,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年展懈,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片供璧。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡存崖,死狀恐怖,靈堂內的尸體忽然破棺而出睡毒,到底是詐尸還是另有隱情来惧,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布演顾,位于F島的核電站供搀,受9級特大地震影響,放射性物質發(fā)生泄漏偶房。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一军浆、第九天 我趴在偏房一處隱蔽的房頂上張望棕洋。 院中可真熱鬧,春花似錦乒融、人聲如沸掰盘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽愧捕。三九已至,卻和暖如春申钩,著一層夾襖步出監(jiān)牢的瞬間次绘,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留邮偎,地道東北人管跺。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像禾进,于是被迫代替她去往敵國和親豁跑。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內容