事由
上周工作中遇到一個bug跳芳,現(xiàn)象是一個spark streaming的job會不定期地hang住,不退出也不繼續(xù)運(yùn)行苟耻。這個job經(jīng)是用pyspark寫的吱瘩,以kafka為數(shù)據(jù)源,會在每個batch結(jié)束時將統(tǒng)計(jì)結(jié)果寫入mysql媒咳。經(jīng)過排查粹排,我們在driver進(jìn)程中發(fā)現(xiàn)有有若干線程都出于Sl狀態(tài)(睡眠狀態(tài)),進(jìn)而使用gdb調(diào)試發(fā)現(xiàn)了一處死鎖涩澡。
這是MySQLdb庫舊版本中的一處bug顽耳,在此不再贅述,有興趣的可以看這個issue妙同。不過這倒是提起了我對另外一件事的興趣射富,就是driver進(jìn)程——嚴(yán)格的說應(yīng)該是driver進(jìn)程的python子進(jìn)程——中的這些線程是從哪來的?當(dāng)然粥帚,這些線程的存在很容易理解胰耗,我們開啟了spark.streaming.concurrentJobs參數(shù),有多個batch可以同時執(zhí)行芒涡,每個線程對應(yīng)一個batch柴灯。但翻遍pyspark的python代碼,都沒有找到有相關(guān)線程啟動的地方费尽,于是簡單調(diào)研了一下pyspark到底是怎么工作的赠群,做個記錄。
本文概括
- Py4J的線程模型
- pyspark基本原理(driver端)
- CPython中的deque的線程安全
涉及軟件版本
- spark: 2.1.0
- py4j: 0.10.4
Py4J
spark是由scala語言編寫的依啰,pyspark并沒有像豆瓣開源的dpark用python復(fù)刻了spark乎串,而只是提供了一層可以與原生JVM通信的python API,Py4J就是python與JVM之間的這座橋梁。這個庫分為Java和Python兩部分叹誉,基本原理是:
- Java部分鸯两,通過
py4j.GatewayServer
監(jiān)聽一個tcp socket(記做server_socket) - Python部分,所有對JVM中對象的訪問或者方法的調(diào)用长豁,都是通過
py4j.JavaGateway
向上面這個socket完成的钧唐。 - 另外,Python部分在創(chuàng)建
JavaGateway
對象時匠襟,可以選擇同時創(chuàng)建一個CallbackServer
钝侠,它會在Python這冊監(jiān)聽一個tcp socket(記做callback_socket),用來給Java回調(diào)Python代碼提供一條渠道酸舍。 - Py4J提供了一套文本協(xié)議用來在tcp socket間傳遞命令帅韧。
pyspark driver工作流程
- 首先,一個spark job被提交后啃勉,如果被判定這是一個python的job忽舟,spark driver會找到相應(yīng)的入口,即
org.apache.spark.deploy.PythonRunner
的main
函數(shù)淮阐,這個函數(shù)中會啟動GatewayServer
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val gatewayServer = new py4j.GatewayServer(null, 0)
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()
- 然后叮阅,會創(chuàng)建一個Python子進(jìn)程來運(yùn)行我們提交上來的python入口文件,并把剛才
GatewayServer
監(jiān)聽的那個端口寫入到子進(jìn)程的環(huán)境變量中去(這樣Python才知道要通過那個端口訪問JVM)
// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
// pass conf spark.pyspark.python to python process, the only way to pass info to
// python process is through environment variable.
sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
- Python子進(jìn)程這邊泣特,我們是通過pyspark提供的python API編寫的這個程序浩姥,在創(chuàng)建
SparkContext
(python)時,會初始化_gateway
變量(JavaGateway
對象)和_jvm
變量(JVMView
對象)
@classmethod
def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
"""
Checks whether a SparkContext is initialized or not.
Throws error if a SparkContext is already running.
"""
with SparkContext._lock:
if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway(conf)
SparkContext._jvm = SparkContext._gateway.jvm
if instance:
if (SparkContext._active_spark_context and
SparkContext._active_spark_context != instance):
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite
# Raise error if there is already a running Spark context
raise ValueError(
"Cannot run multiple SparkContexts at once; "
"existing SparkContext(app=%s, master=%s)"
" created by %s at %s:%s "
% (currentAppName, currentMaster,
callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance
其中launch_gateway
函數(shù)可見pyspark/java_gateway.py
状您。
- 上面初始化的這個
_jvm
對象值得一說勒叠,在pyspark中很多對JVM的調(diào)用其實(shí)都是通過它來進(jìn)行的,比如很多python種對應(yīng)的spark對象都有一個_jsc
變量膏孟,它是JVM中的SparkContext對象在Python中的影子
缴饭,它是這么初始化的
def _initialize_context(self, jconf):
"""
Initialize SparkContext in function to allow subclass specific initialization
"""
return self._jvm.JavaSparkContext(jconf)
這里_jvm
為什么能直接調(diào)用JavaSparkContext
這個JVM環(huán)境中的構(gòu)造函數(shù)呢?我們看JVMView
中的__getattr__
方法:
def __getattr__(self, name):
if name == UserHelpAutoCompletion.KEY:
return UserHelpAutoCompletion()
answer = self._gateway_client.send_command(
proto.REFLECTION_COMMAND_NAME +
proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
"\n" + proto.END_COMMAND_PART)
if answer == proto.SUCCESS_PACKAGE:
return JavaPackage(name, self._gateway_client, jvm_id=self._id)
elif answer.startswith(proto.SUCCESS_CLASS):
return JavaClass(
answer[proto.CLASS_FQN_START:], self._gateway_client)
else:
raise Py4JError("{0} does not exist in the JVM".format(name))
self._gateway_client.send_command
其實(shí)就是向server_socket
發(fā)送訪問對象請求的命令了骆莹,最后根據(jù)響應(yīng)值生成不同類型的影子
對象,針對我們這里的JavaSparkContext
担猛,就是一個JavaClass
對象幕垦。這個系列的類型還包括了JavaMember
,JavaPackage
等等傅联,他們也通過__getattr__
來實(shí)現(xiàn)Java對象屬性訪問以及方法的調(diào)用先改。
- 我們剛才介紹Py4j時說過Python端在創(chuàng)建JavaGateway時,可以選擇同時創(chuàng)建一個
CallbackClient
,默認(rèn)情況下,一個普通的pyspark job是不會啟動回調(diào)服務(wù)的枫慷,因?yàn)橛貌恢捅龋械慕换ザ际?code>Python --> JVM這種模式的锌畸。那什么時候需要呢登夫?streaming job就需要(具體流程我們稍后介紹)璃诀,這就(終于T屡唷)引出了我們今天主要討論的Py4J線程模型的問題狈茉。
Py4J線程模型
我們已經(jīng)知道了Python與JVM雙方向的通信分別是通過server_socket
和callack_socket
來完成的夫椭,這兩個socket的處理模型都是多線程模型,即氯庆,每收到一個連接就啟動一個線程來處理蹭秋。我們只看Python --> JVM
這條通路的情況,另外一邊是一樣的
Server端(Java)
protected void processSocket(Socket socket) {
try {
this.lock.lock();
if(!this.isShutdown) {
socket.setSoTimeout(this.readTimeout);
Py4JServerConnection gatewayConnection = this.createConnection(this.gateway, socket);
this.connections.add(gatewayConnection);
this.fireConnectionStarted(gatewayConnection);
}
} catch (Exception var6) {
this.fireConnectionError(var6);
} finally {
this.lock.unlock();
}
}
繼續(xù)看createConnection
:
protected Py4JServerConnection createConnection(Gateway gateway, Socket socket) throws IOException {
GatewayConnection connection = new GatewayConnection(gateway, socket, this.customCommands, this.listeners);
connection.startConnection();
return connection;
}
其中connection.startConnection
其實(shí)就是創(chuàng)建了一個新線程堤撵,來負(fù)責(zé)處理這個連接仁讨。
Client端(Python)
我們來看GatewayClient
中的send_command
方法:
def send_command(self, command, retry=True, binary=False):
"""Sends a command to the JVM. This method is not intended to be
called directly by Py4J users. It is usually called by
:class:`JavaMember` instances.
:param command: the `string` command to send to the JVM. The command
must follow the Py4J protocol.
:param retry: if `True`, the GatewayClient tries to resend a message
if it fails.
:param binary: if `True`, we won't wait for a Py4J-protocol response
from the other end; we'll just return the raw connection to the
caller. The caller becomes the owner of the connection, and is
responsible for closing the connection (or returning it this
`GatewayClient` pool using `_give_back_connection`).
:rtype: the `string` answer received from the JVM (The answer follows
the Py4J protocol). The guarded `GatewayConnection` is also returned
if `binary` is `True`.
"""
connection = self._get_connection()
try:
response = connection.send_command(command)
if binary:
return response, self._create_connection_guard(connection)
else:
self._give_back_connection(connection)
except Py4JNetworkError as pne:
if connection:
reset = False
if isinstance(pne.cause, socket.timeout):
reset = True
connection.close(reset)
if self._should_retry(retry, connection, pne):
logging.info("Exception while sending command.", exc_info=True)
response = self.send_command(command, binary=binary)
else:
logging.exception(
"Exception while sending command.")
response = proto.ERROR
return response
這里這個self._get_connection
是這么實(shí)現(xiàn)的
def _get_connection(self):
if not self.is_connected:
raise Py4JNetworkError("Gateway is not connected.")
try:
connection = self.deque.pop()
except IndexError:
connection = self._create_connection()
return connection
這里使用了一個deque
(也就是Python標(biāo)準(zhǔn)庫中的collections.deque
)來維護(hù)一個連接池,如果有空閑的連接实昨,就可以直接使用洞豁,如果沒有,就新建一個連接⊥篱希現(xiàn)在問題來了族跛,如果deque不是線程安全的,那么這段代碼在多線程環(huán)境就會有問題锐墙。那么deque是不是線程安全的呢礁哄?
deque的線程安全
當(dāng)然是了,Py4J當(dāng)然不會犯這樣的低級錯誤溪北,我們看標(biāo)準(zhǔn)庫的文檔:
Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction.
是線程安全的桐绒,不過措辭有點(diǎn)模糊,沒有明確指出哪些方法是線程安全的之拨,不過可以明確的是至少append的pop都是茉继。之所以去查一下,是因?yàn)槲乙灿悬c(diǎn)含糊蚀乔,因?yàn)镻ython標(biāo)準(zhǔn)庫還有另外一個Queue.Queue
烁竭,在多線程編程中經(jīng)常使用,肯定是線程安全的吉挣,于是很容易誤以為deque不是線程安全的派撕,所以我們才要一個新的Queue。這個問題睬魂,推薦閱讀stackoverflow上Jonathan的這個答案——他的回答不是被采納的最高票终吼,不過我認(rèn)為他的回答比高票更有說服力
- 高票答案一直強(qiáng)調(diào)說
deque是線程安全的
這個事實(shí)是個意外,是CPython中存在GIL造成的氯哮,其他Python解釋器就不一定遵守际跪。關(guān)于這一點(diǎn)我是不認(rèn)同的,deque在CPython中的實(shí)現(xiàn)確實(shí)依賴的GIL才變成了線程安全的,但deque的雙端append的pop是線程安全的
這件事是白紙黑字寫在Python文檔中的姆打,其他虛擬機(jī)的實(shí)現(xiàn)必須遵守良姆,否則就不能稱之為合格的Python實(shí)現(xiàn)。 - 那為什么還要有一個內(nèi)部顯式用了鎖來做線程同步的
Queue.Queue
呢穴肘?Jonathan給出的回答是Queue
的put
和get
可以是blocking的歇盼,而deque
不行,這樣一來评抚,當(dāng)你需要在多個線程中進(jìn)行通信時(比如最簡單的一個Producer - Consumer模式的實(shí)現(xiàn))豹缀,Queue
往往是最佳選擇。
關(guān)于deque是否是線程安全這個問題慨代,我將調(diào)研的結(jié)果寫在了這個知乎問題的答案下Python中的deque是線程安全的嗎?邢笙,就不在贅述了,這篇文章已經(jīng)太長了侍匙。
關(guān)于Py4J線程模型的問題氮惯,還可以參考官方文檔中的解釋。
pyspark streaming與CallbackServer
剛才提到想暗,如果是streaming的job妇汗,GatewayServer在初始化時會同時創(chuàng)建一個CallbackServer,提供JVM --> Python
這條通路说莫。
@classmethod
def _ensure_initialized(cls):
SparkContext._ensure_initialized()
gw = SparkContext._gateway
java_import(gw.jvm, "org.apache.spark.streaming.*")
java_import(gw.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gw.jvm, "org.apache.spark.streaming.api.python.*")
# start callback server
# getattr will fallback to JVM, so we cannot test by hasattr()
if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
gw.callback_server_parameters.eager_load = True
gw.callback_server_parameters.daemonize = True
gw.callback_server_parameters.daemonize_connections = True
gw.callback_server_parameters.port = 0
gw.start_callback_server(gw.callback_server_parameters)
cbport = gw._callback_server.server_socket.getsockname()[1]
gw._callback_server.port = cbport
# gateway with real port
gw._python_proxy_port = gw._callback_server.port
# get the GatewayServer object in JVM by ID
jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
# update the port of CallbackClient with real port
jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port)
# register serializer for TransformFunction
# it happens before creating SparkContext when loading from checkpointing
cls._transformerSerializer = TransformFunctionSerializer(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
為什么需要這樣呢杨箭?一個streaming job通常需要調(diào)用foreachRDD
,并提供一個函數(shù)储狭,這個函數(shù)會在每個batch被回調(diào):
def foreachRDD(self, func):
"""
Apply a function to each RDD in this DStream.
"""
if func.__code__.co_argcount == 1:
old_func = func
func = lambda t, rdd: old_func(rdd)
jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
api = self._ssc._jvm.PythonDStream
api.callForeachRDD(self._jdstream, jfunc)
這里互婿,Python函數(shù)func
被封裝成了一個TransformFunction
對象,在scala端spark也定義了同樣接口一個trait
:
/**
* Interface for Python callback function which is used to transform RDDs
*/
private[python] trait PythonTransformFunction {
def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
/**
* Get the failure, if any, in the last call to `call`.
*
* @return the failure message if there was a failure, or `null` if there was no failure.
*/
def getLastFailure: String
}
這樣是Py4J提供的機(jī)制辽狈,這樣就可以讓JVM通過這個影子接口
回調(diào)Python中的對象了慈参,下面就是scala中的callForeachRDD
函數(shù),它把PythonTransformFunction
又封裝了一層成為scala中的TransformFunction
刮萌, 但不管如何封裝驮配,最后都會調(diào)用PythonTransformFunction
接口中的call
方法完成對Python的回調(diào)。
/**
* helper function for DStream.foreachRDD(),
* cannot be `foreachRDD`, it will confusing py4j
*/
def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonTransformFunction) {
val func = new TransformFunction((pfunc))
jdstream.dstream.foreachRDD((rdd, time) => func(Some(rdd), time))
}
所以着茸,終于要回答這個問題了僧凤,我們一開始看到的driver中的多個線程是怎么來的?
- python調(diào)用
foreachRDD
提供一個TranformFunction
給scala端 - scala端調(diào)用自己的
foreachRDD
進(jìn)行正常的spark streaming作業(yè) - 由于我們開啟了
spark.streaming.concurrentJobs
元扔,多個batch可以同時運(yùn)行,這在scala端是通過線程池來進(jìn)行的旋膳,每個batch都需要回調(diào)Python中的TranformFunction
澎语,而按照我們之前介紹的Py4J線程模型,多個并發(fā)的回調(diào)會發(fā)現(xiàn)沒有可用的socket連接而生成新的,而在CallbackServer(Python)這端擅羞,每個新連接都會創(chuàng)建一個新線程來處理尸变。這樣就出現(xiàn)了driver的Python進(jìn)程中出現(xiàn)多個線程的現(xiàn)象。