pyspark與py4j線程模型簡析

事由

上周工作中遇到一個bug跳芳,現(xiàn)象是一個spark streaming的job會不定期地hang住,不退出也不繼續(xù)運(yùn)行苟耻。這個job經(jīng)是用pyspark寫的吱瘩,以kafka為數(shù)據(jù)源,會在每個batch結(jié)束時將統(tǒng)計(jì)結(jié)果寫入mysql媒咳。經(jīng)過排查粹排,我們在driver進(jìn)程中發(fā)現(xiàn)有有若干線程都出于Sl狀態(tài)(睡眠狀態(tài)),進(jìn)而使用gdb調(diào)試發(fā)現(xiàn)了一處死鎖涩澡。

這是MySQLdb庫舊版本中的一處bug顽耳,在此不再贅述,有興趣的可以看這個issue妙同。不過這倒是提起了我對另外一件事的興趣射富,就是driver進(jìn)程——嚴(yán)格的說應(yīng)該是driver進(jìn)程的python子進(jìn)程——中的這些線程是從哪來的?當(dāng)然粥帚,這些線程的存在很容易理解胰耗,我們開啟了spark.streaming.concurrentJobs參數(shù),有多個batch可以同時執(zhí)行芒涡,每個線程對應(yīng)一個batch柴灯。但翻遍pyspark的python代碼,都沒有找到有相關(guān)線程啟動的地方费尽,于是簡單調(diào)研了一下pyspark到底是怎么工作的赠群,做個記錄。

本文概括

  1. Py4J的線程模型
  2. pyspark基本原理(driver端)
  3. CPython中的deque的線程安全

涉及軟件版本

  • spark: 2.1.0
  • py4j: 0.10.4

Py4J

spark是由scala語言編寫的依啰,pyspark并沒有像豆瓣開源的dpark用python復(fù)刻了spark乎串,而只是提供了一層可以與原生JVM通信的python API,Py4J就是python與JVM之間的這座橋梁。這個庫分為Java和Python兩部分叹誉,基本原理是:

  1. Java部分鸯两,通過py4j.GatewayServer監(jiān)聽一個tcp socket(記做server_socket)
  2. Python部分,所有對JVM中對象的訪問或者方法的調(diào)用长豁,都是通過py4j.JavaGateway向上面這個socket完成的钧唐。
  3. 另外,Python部分在創(chuàng)建JavaGateway對象時匠襟,可以選擇同時創(chuàng)建一個CallbackServer钝侠,它會在Python這冊監(jiān)聽一個tcp socket(記做callback_socket),用來給Java回調(diào)Python代碼提供一條渠道酸舍。
  4. Py4J提供了一套文本協(xié)議用來在tcp socket間傳遞命令帅韧。

pyspark driver工作流程

  1. 首先,一個spark job被提交后啃勉,如果被判定這是一個python的job忽舟,spark driver會找到相應(yīng)的入口,即org.apache.spark.deploy.PythonRunnermain函數(shù)淮阐,這個函數(shù)中會啟動GatewayServer
    // Launch a Py4J gateway server for the process to connect to; this will let it see our
    // Java system properties and such
    val gatewayServer = new py4j.GatewayServer(null, 0)
    val thread = new Thread(new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions {
        gatewayServer.start()
      }
    })
    thread.setName("py4j-gateway-init")
    thread.setDaemon(true)
    thread.start()
  1. 然后叮阅,會創(chuàng)建一個Python子進(jìn)程來運(yùn)行我們提交上來的python入口文件,并把剛才GatewayServer監(jiān)聽的那個端口寫入到子進(jìn)程的環(huán)境變量中去(這樣Python才知道要通過那個端口訪問JVM)
    // Launch Python process
    val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
    val env = builder.environment()
    env.put("PYTHONPATH", pythonPath)
    // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
    env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
    env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
    // pass conf spark.pyspark.python to python process, the only way to pass info to
    // python process is through environment variable.
    sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))
    builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
  1. Python子進(jìn)程這邊泣特,我們是通過pyspark提供的python API編寫的這個程序浩姥,在創(chuàng)建SparkContext(python)時,會初始化_gateway變量(JavaGateway對象)和_jvm變量(JVMView對象)
    @classmethod
    def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
        """
        Checks whether a SparkContext is initialized or not.
        Throws error if a SparkContext is already running.
        """
        with SparkContext._lock:
            if not SparkContext._gateway:
                SparkContext._gateway = gateway or launch_gateway(conf)
                SparkContext._jvm = SparkContext._gateway.jvm

            if instance:
                if (SparkContext._active_spark_context and
                        SparkContext._active_spark_context != instance):
                    currentMaster = SparkContext._active_spark_context.master
                    currentAppName = SparkContext._active_spark_context.appName
                    callsite = SparkContext._active_spark_context._callsite

                    # Raise error if there is already a running Spark context
                    raise ValueError(
                        "Cannot run multiple SparkContexts at once; "
                        "existing SparkContext(app=%s, master=%s)"
                        " created by %s at %s:%s "
                        % (currentAppName, currentMaster,
                            callsite.function, callsite.file, callsite.linenum))
                else:
                    SparkContext._active_spark_context = instance

其中launch_gateway函數(shù)可見pyspark/java_gateway.py状您。

  1. 上面初始化的這個_jvm對象值得一說勒叠,在pyspark中很多對JVM的調(diào)用其實(shí)都是通過它來進(jìn)行的,比如很多python種對應(yīng)的spark對象都有一個_jsc變量膏孟,它是JVM中的SparkContext對象在Python中的影子缴饭,它是這么初始化的
    def _initialize_context(self, jconf):
        """
        Initialize SparkContext in function to allow subclass specific initialization
        """
        return self._jvm.JavaSparkContext(jconf)

這里_jvm為什么能直接調(diào)用JavaSparkContext這個JVM環(huán)境中的構(gòu)造函數(shù)呢?我們看JVMView中的__getattr__方法:

    def __getattr__(self, name):
        if name == UserHelpAutoCompletion.KEY:
            return UserHelpAutoCompletion()

        answer = self._gateway_client.send_command(
            proto.REFLECTION_COMMAND_NAME +
            proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
            "\n" + proto.END_COMMAND_PART)
        if answer == proto.SUCCESS_PACKAGE:
            return JavaPackage(name, self._gateway_client, jvm_id=self._id)
        elif answer.startswith(proto.SUCCESS_CLASS):
            return JavaClass(
                answer[proto.CLASS_FQN_START:], self._gateway_client)
        else:
            raise Py4JError("{0} does not exist in the JVM".format(name))

self._gateway_client.send_command其實(shí)就是向server_socket發(fā)送訪問對象請求的命令了骆莹,最后根據(jù)響應(yīng)值生成不同類型的影子對象,針對我們這里的JavaSparkContext担猛,就是一個JavaClass對象幕垦。這個系列的類型還包括了JavaMemberJavaPackage等等傅联,他們也通過__getattr__來實(shí)現(xiàn)Java對象屬性訪問以及方法的調(diào)用先改。

  1. 我們剛才介紹Py4j時說過Python端在創(chuàng)建JavaGateway時,可以選擇同時創(chuàng)建一個CallbackClient,默認(rèn)情況下,一個普通的pyspark job是不會啟動回調(diào)服務(wù)的枫慷,因?yàn)橛貌恢捅龋械慕换ザ际?code>Python --> JVM這種模式的锌畸。那什么時候需要呢登夫?streaming job就需要(具體流程我們稍后介紹)璃诀,這就(終于T屡唷)引出了我們今天主要討論的Py4J線程模型的問題狈茉。

Py4J線程模型

我們已經(jīng)知道了Python與JVM雙方向的通信分別是通過server_socketcallack_socket來完成的夫椭,這兩個socket的處理模型都是多線程模型,即氯庆,每收到一個連接就啟動一個線程來處理蹭秋。我們只看Python --> JVM這條通路的情況,另外一邊是一樣的

Server端(Java)

    protected void processSocket(Socket socket) {
        try {
            this.lock.lock();
            if(!this.isShutdown) {
                socket.setSoTimeout(this.readTimeout);
                Py4JServerConnection gatewayConnection = this.createConnection(this.gateway, socket);
                this.connections.add(gatewayConnection);
                this.fireConnectionStarted(gatewayConnection);
            }
        } catch (Exception var6) {
            this.fireConnectionError(var6);
        } finally {
            this.lock.unlock();
        }
    }

繼續(xù)看createConnection:

    protected Py4JServerConnection createConnection(Gateway gateway, Socket socket) throws IOException {
        GatewayConnection connection = new GatewayConnection(gateway, socket, this.customCommands, this.listeners);
        connection.startConnection();
        return connection;
    }

其中connection.startConnection其實(shí)就是創(chuàng)建了一個新線程堤撵,來負(fù)責(zé)處理這個連接仁讨。

Client端(Python)

我們來看GatewayClient中的send_command方法:

    def send_command(self, command, retry=True, binary=False):
        """Sends a command to the JVM. This method is not intended to be
           called directly by Py4J users. It is usually called by
           :class:`JavaMember` instances.

        :param command: the `string` command to send to the JVM. The command
         must follow the Py4J protocol.

        :param retry: if `True`, the GatewayClient tries to resend a message
         if it fails.

        :param binary: if `True`, we won't wait for a Py4J-protocol response
         from the other end; we'll just return the raw connection to the
         caller. The caller becomes the owner of the connection, and is
         responsible for closing the connection (or returning it this
         `GatewayClient` pool using `_give_back_connection`).

        :rtype: the `string` answer received from the JVM (The answer follows
         the Py4J protocol). The guarded `GatewayConnection` is also returned
         if `binary` is `True`.
        """
        connection = self._get_connection()
        try:
            response = connection.send_command(command)
            if binary:
                return response, self._create_connection_guard(connection)
            else:
                self._give_back_connection(connection)
        except Py4JNetworkError as pne:
            if connection:
                reset = False
                if isinstance(pne.cause, socket.timeout):
                    reset = True
                connection.close(reset)
            if self._should_retry(retry, connection, pne):
                logging.info("Exception while sending command.", exc_info=True)
                response = self.send_command(command, binary=binary)
            else:
                logging.exception(
                    "Exception while sending command.")
                response = proto.ERROR

        return response

這里這個self._get_connection是這么實(shí)現(xiàn)的

    def _get_connection(self):
        if not self.is_connected:
            raise Py4JNetworkError("Gateway is not connected.")
        try:
            connection = self.deque.pop()
        except IndexError:
            connection = self._create_connection()
        return connection

這里使用了一個deque(也就是Python標(biāo)準(zhǔn)庫中的collections.deque)來維護(hù)一個連接池,如果有空閑的連接实昨,就可以直接使用洞豁,如果沒有,就新建一個連接⊥篱希現(xiàn)在問題來了族跛,如果deque不是線程安全的,那么這段代碼在多線程環(huán)境就會有問題锐墙。那么deque是不是線程安全的呢礁哄?

deque的線程安全

當(dāng)然是了,Py4J當(dāng)然不會犯這樣的低級錯誤溪北,我們看標(biāo)準(zhǔn)庫的文檔:

Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction.

是線程安全的桐绒,不過措辭有點(diǎn)模糊,沒有明確指出哪些方法是線程安全的之拨,不過可以明確的是至少append的pop都是茉继。之所以去查一下,是因?yàn)槲乙灿悬c(diǎn)含糊蚀乔,因?yàn)镻ython標(biāo)準(zhǔn)庫還有另外一個Queue.Queue烁竭,在多線程編程中經(jīng)常使用,肯定是線程安全的吉挣,于是很容易誤以為deque不是線程安全的派撕,所以我們才要一個新的Queue。這個問題睬魂,推薦閱讀stackoverflow上Jonathan的這個答案——他的回答不是被采納的最高票终吼,不過我認(rèn)為他的回答比高票更有說服力

  1. 高票答案一直強(qiáng)調(diào)說deque是線程安全的這個事實(shí)是個意外,是CPython中存在GIL造成的氯哮,其他Python解釋器就不一定遵守际跪。關(guān)于這一點(diǎn)我是不認(rèn)同的,deque在CPython中的實(shí)現(xiàn)確實(shí)依賴的GIL才變成了線程安全的,但deque的雙端append的pop是線程安全的這件事是白紙黑字寫在Python文檔中的姆打,其他虛擬機(jī)的實(shí)現(xiàn)必須遵守良姆,否則就不能稱之為合格的Python實(shí)現(xiàn)。
  2. 那為什么還要有一個內(nèi)部顯式用了鎖來做線程同步的Queue.Queue呢穴肘?Jonathan給出的回答是Queueputget可以是blocking的歇盼,而deque不行,這樣一來评抚,當(dāng)你需要在多個線程中進(jìn)行通信時(比如最簡單的一個Producer - Consumer模式的實(shí)現(xiàn))豹缀,Queue往往是最佳選擇。

關(guān)于deque是否是線程安全這個問題慨代,我將調(diào)研的結(jié)果寫在了這個知乎問題的答案下Python中的deque是線程安全的嗎?邢笙,就不在贅述了,這篇文章已經(jīng)太長了侍匙。

關(guān)于Py4J線程模型的問題氮惯,還可以參考官方文檔中的解釋

pyspark streaming與CallbackServer

剛才提到想暗,如果是streaming的job妇汗,GatewayServer在初始化時會同時創(chuàng)建一個CallbackServer,提供JVM --> Python這條通路说莫。

    @classmethod
    def _ensure_initialized(cls):
        SparkContext._ensure_initialized()
        gw = SparkContext._gateway

        java_import(gw.jvm, "org.apache.spark.streaming.*")
        java_import(gw.jvm, "org.apache.spark.streaming.api.java.*")
        java_import(gw.jvm, "org.apache.spark.streaming.api.python.*")

        # start callback server
        # getattr will fallback to JVM, so we cannot test by hasattr()
        if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
            gw.callback_server_parameters.eager_load = True
            gw.callback_server_parameters.daemonize = True
            gw.callback_server_parameters.daemonize_connections = True
            gw.callback_server_parameters.port = 0
            gw.start_callback_server(gw.callback_server_parameters)
            cbport = gw._callback_server.server_socket.getsockname()[1]
            gw._callback_server.port = cbport
            # gateway with real port
            gw._python_proxy_port = gw._callback_server.port
            # get the GatewayServer object in JVM by ID
            jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
            # update the port of CallbackClient with real port
            jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port)

        # register serializer for TransformFunction
        # it happens before creating SparkContext when loading from checkpointing
        cls._transformerSerializer = TransformFunctionSerializer(
            SparkContext._active_spark_context, CloudPickleSerializer(), gw)

為什么需要這樣呢杨箭?一個streaming job通常需要調(diào)用foreachRDD,并提供一個函數(shù)储狭,這個函數(shù)會在每個batch被回調(diào):

    def foreachRDD(self, func):
        """
        Apply a function to each RDD in this DStream.
        """
        if func.__code__.co_argcount == 1:
            old_func = func
            func = lambda t, rdd: old_func(rdd)
        jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
        api = self._ssc._jvm.PythonDStream
        api.callForeachRDD(self._jdstream, jfunc)

這里互婿,Python函數(shù)func被封裝成了一個TransformFunction對象,在scala端spark也定義了同樣接口一個trait:

/**
 * Interface for Python callback function which is used to transform RDDs
 */
private[python] trait PythonTransformFunction {
  def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]

  /**
   * Get the failure, if any, in the last call to `call`.
   *
   * @return the failure message if there was a failure, or `null` if there was no failure.
   */
  def getLastFailure: String
}

這樣是Py4J提供的機(jī)制辽狈,這樣就可以讓JVM通過這個影子接口回調(diào)Python中的對象了慈参,下面就是scala中的callForeachRDD函數(shù),它把PythonTransformFunction又封裝了一層成為scala中的TransformFunction刮萌, 但不管如何封裝驮配,最后都會調(diào)用PythonTransformFunction接口中的call方法完成對Python的回調(diào)。

  /**
   * helper function for DStream.foreachRDD(),
   * cannot be `foreachRDD`, it will confusing py4j
   */
  def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonTransformFunction) {
    val func = new TransformFunction((pfunc))
    jdstream.dstream.foreachRDD((rdd, time) => func(Some(rdd), time))
  }

所以着茸,終于要回答這個問題了僧凤,我們一開始看到的driver中的多個線程是怎么來的

  1. python調(diào)用foreachRDD提供一個TranformFunction給scala端
  2. scala端調(diào)用自己的foreachRDD進(jìn)行正常的spark streaming作業(yè)
  3. 由于我們開啟了spark.streaming.concurrentJobs元扔,多個batch可以同時運(yùn)行,這在scala端是通過線程池來進(jìn)行的旋膳,每個batch都需要回調(diào)Python中的TranformFunction澎语,而按照我們之前介紹的Py4J線程模型,多個并發(fā)的回調(diào)會發(fā)現(xiàn)沒有可用的socket連接而生成新的,而在CallbackServer(Python)這端擅羞,每個新連接都會創(chuàng)建一個新線程來處理尸变。這樣就出現(xiàn)了driver的Python進(jìn)程中出現(xiàn)多個線程的現(xiàn)象。

參考閱讀

  1. MySQLdb1中的死鎖issue
  2. queue-queue-vs-collections-deque
  3. Python中的deque是線程安全的嗎?
  4. py4j線程模型官方文檔
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末减俏,一起剝皮案震驚了整個濱河市召烂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌娃承,老刑警劉巖奏夫,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異历筝,居然都是意外死亡酗昼,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門梳猪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來麻削,“玉大人,你說我怎么就攤上這事春弥∏河矗” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵匿沛,是天一觀的道長扫责。 經(jīng)常有香客問我,道長俺祠,這世上最難降的妖魔是什么公给? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮蜘渣,結(jié)果婚禮上淌铐,老公的妹妹穿的比我還像新娘。我一直安慰自己蔫缸,他們只是感情好腿准,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著拾碌,像睡著了一般吐葱。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上校翔,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天弟跑,我揣著相機(jī)與錄音,去河邊找鬼防症。 笑死孟辑,一個胖子當(dāng)著我的面吹牛哎甲,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播饲嗽,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼炭玫,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了貌虾?” 一聲冷哼從身側(cè)響起吞加,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎尽狠,沒想到半個月后衔憨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡晚唇,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年巫财,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哩陕。...
    茶點(diǎn)故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡平项,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出悍及,到底是詐尸還是另有隱情闽瓢,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布心赶,位于F島的核電站扣讼,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏缨叫。R本人自食惡果不足惜椭符,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望耻姥。 院中可真熱鬧销钝,春花似錦、人聲如沸琐簇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽婉商。三九已至似忧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間丈秩,已是汗流浹背盯捌。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蘑秽,地道東北人饺著。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓滤祖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親瓶籽。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容