大師兄的Python源碼學(xué)習(xí)筆記(四十): Python的多線程機制(二)
大師兄的Python源碼學(xué)習(xí)筆記(四十二): Python的多線程機制(四)
四郎嫁、創(chuàng)建線程
1. 創(chuàng)建子線程
- 在建立多線程環(huán)境后幅恋,Python會開始創(chuàng)建底層平臺的原生線程,也可以稱為子進程讲岁。
- 這還要從調(diào)用thread_PyThread_start_new_thread的主線程開始:
Modules\_threadmodule.c
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
PyObject *func, *args, *keyw = NULL;
struct bootstate *boot;
unsigned long ident;
if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
&func, &args, &keyw))
return NULL;
... ...
boot = PyMem_NEW(struct bootstate, 1);
if (boot == NULL)
return PyErr_NoMemory();
boot->interp = PyThreadState_GET()->interp;
boot->func = func;
boot->args = args;
boot->keyw = keyw;
boot->tstate = _PyThreadState_Prealloc(boot->interp);
if (boot->tstate == NULL) {
PyMem_DEL(boot);
return PyErr_NoMemory();
}
Py_INCREF(func);
Py_INCREF(args);
Py_XINCREF(keyw);
PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
if (ident == PYTHREAD_INVALID_THREAD_ID) {
PyErr_SetString(ThreadError, "can't start new thread");
Py_DECREF(func);
Py_DECREF(args);
Py_XDECREF(keyw);
PyThreadState_Clear(boot->tstate);
PyMem_DEL(boot);
return NULL;
}
return PyLong_FromUnsignedLong(ident);
}
- 主線程在創(chuàng)建多線程環(huán)境后听隐,調(diào)用PyThread_start_new_thread創(chuàng)建子線程:
Python\thread_nt.h
unsigned long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
HANDLE hThread;
unsigned threadID;
callobj *obj;
dprintf(("%lu: PyThread_start_new_thread called\n",
PyThread_get_thread_ident()));
if (!initialized)
PyThread_init_thread();
obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj));
if (!obj)
return PYTHREAD_INVALID_THREAD_ID;
obj->func = func;
obj->arg = arg;
PyThreadState *tstate = PyThreadState_GET();
size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0;
hThread = (HANDLE)_beginthreadex(0,
Py_SAFE_DOWNCAST(stacksize, Py_ssize_t, unsigned int),
bootstrap, obj,
0, &threadID);
if (hThread == 0) {
/* I've seen errno == EAGAIN here, which means "there are
* too many threads".
*/
int e = errno;
dprintf(("%lu: PyThread_start_new_thread failed, errno %d\n",
PyThread_get_thread_ident(), e));
threadID = (unsigned)-1;
HeapFree(GetProcessHeap(), 0, obj);
}
else {
dprintf(("%lu: PyThread_start_new_thread succeeded: %p\n",
PyThread_get_thread_ident(), (void*)hThread));
CloseHandle(hThread);
}
return threadID;
}
- 觀察PyThread_start_new_thread函數(shù)的參數(shù):
func:函數(shù)t_bootstrap。
arg: boot饰躲,也就是保存了線程信息的bootstate結(jié)構(gòu)盗扇。
- PyThread_start_new_thread實際將func和arg打包到一個類型為callobj的結(jié)構(gòu)體中:
Python\thread_nt.h
/*
* Thread support.
*/
typedef struct {
void (*func)(void*);
void *arg;
} callobj;
- 有一點值得注意,_beginthreadex是Win32下用于創(chuàng)建線程的API寥假。
Include\10.0.18362.0\ucrt\process.h
_Success_(return != 0)
_ACRTIMP uintptr_t __cdecl _beginthreadex(
_In_opt_ void* _Security,
_In_ unsigned _StackSize,
_In_ _beginthreadex_proc_type _StartAddress,
_In_opt_ void* _ArgList,
_In_ unsigned _InitFlag,
_Out_opt_ unsigned* _ThrdAddr
);
- 這是一個關(guān)鍵的轉(zhuǎn)折市框,因為在此之前,我們一直在主線程的執(zhí)行路徑上昧旨;而現(xiàn)在我們通過_beginthreadex創(chuàng)建了一個子線程拾给,并將之前打包的callobj結(jié)構(gòu)體obj作為參數(shù)傳遞給了子線程。
- 梳理Python當(dāng)前的狀態(tài):
- Python當(dāng)前實際上由兩個Win32下的原生線程組成兔沃,一個是執(zhí)行Python程序時操作系統(tǒng)創(chuàng)建的主線程蒋得;另一個是通過_beginthreadex創(chuàng)建的子線程。
- 主線程在在執(zhí)行PyEval_InitThreads的過程中乒疏,獲得了GIL额衙,并將自己掛起等待子線程。
- 子線程的線程過程是bootstrap怕吴,為了訪問Python解釋器窍侧,必須首先獲得GIL。
Python\thread_nt.h
/* thunker to call adapt between the function type used by the system's
thread start function and the internally used one. */
static unsigned __stdcall
bootstrap(void *call)
{
callobj *obj = (callobj*)call;
void (*func)(void*) = obj->func;
void *arg = obj->arg;
HeapFree(GetProcessHeap(), 0, obj);
func(arg);
return 0;
}
- 在bootstrap中转绷,子線程完成了三個動作:
1. 獲得線程id伟件;
2. 喚醒主線程;
3. 調(diào)用t_bootstrap议经。
- 主線程之所以需要等待子線程斧账,是因為主線程調(diào)用的PyThread_start_new_thread需要返回所創(chuàng)建子線程的線程id谴返,一旦在子線程中獲得了線程id,就會設(shè)法喚醒主線程咧织。
- 到這里嗓袱,主線程和子線程開始分道揚鑣,主線程在返回子線程id并獲得GIL后习绢,會繼續(xù)執(zhí)行后續(xù)字節(jié)碼指令渠抹;而子線程則將進入t_bootstrap,最終進入等待GIL的狀態(tài)闪萄。
Modules\_threadmodule.c
static void
t_bootstrap(void *boot_raw)
{
struct bootstate *boot = (struct bootstate *) boot_raw;
PyThreadState *tstate;
PyObject *res;
tstate = boot->tstate;
tstate->thread_id = PyThread_get_thread_ident();
_PyThreadState_Init(tstate);
PyEval_AcquireThread(tstate);
tstate->interp->num_threads++;
res = PyObject_Call(boot->func, boot->args, boot->keyw);
if (res == NULL) {
if (PyErr_ExceptionMatches(PyExc_SystemExit))
PyErr_Clear();
else {
PyObject *file;
PyObject *exc, *value, *tb;
PySys_WriteStderr(
"Unhandled exception in thread started by ");
PyErr_Fetch(&exc, &value, &tb);
file = _PySys_GetObjectId(&PyId_stderr);
if (file != NULL && file != Py_None)
PyFile_WriteObject(boot->func, file, 0);
else
PyObject_Print(boot->func, stderr, 0);
PySys_WriteStderr("\n");
PyErr_Restore(exc, value, tb);
PyErr_PrintEx(0);
}
}
else
Py_DECREF(res);
Py_DECREF(boot->func);
Py_DECREF(boot->args);
Py_XDECREF(boot->keyw);
PyMem_DEL(boot_raw);
tstate->interp->num_threads--;
PyThreadState_Clear(tstate);
PyThreadState_DeleteCurrent();
PyThread_exit_thread();
}
- 子線程從這里開始了與主線程對GIL的競爭:
- 首先子線程通過PyEval_AcquireThread申請GIL:
Python\ceval.c void PyEval_AcquireThread(PyThreadState *tstate) { if (tstate == NULL) Py_FatalError("PyEval_AcquireThread: NULL new thread state"); /* Check someone has called PyEval_InitThreads() to create the lock */ assert(gil_created()); take_gil(tstate); if (PyThreadState_Swap(tstate) != NULL) Py_FatalError( "PyEval_AcquireThread: non-NULL old thread state"); }
- 接下來子線程通過PyObject_Call調(diào)用字節(jié)碼執(zhí)行引擎:
Objects\call.c PyObject * PyObject_Call(PyObject *callable, PyObject *args, PyObject *kwargs) { ternaryfunc call; PyObject *result; /* PyObject_Call() must not be called with an exception set, because it can clear it (directly or indirectly) and so the caller loses its exception */ assert(!PyErr_Occurred()); assert(PyTuple_Check(args)); assert(kwargs == NULL || PyDict_Check(kwargs)); if (PyFunction_Check(callable)) { return _PyFunction_FastCallDict(callable, &PyTuple_GET_ITEM(args, 0), PyTuple_GET_SIZE(args), kwargs); } else if (PyCFunction_Check(callable)) { return PyCFunction_Call(callable, args, kwargs); } else { call = callable->ob_type->tp_call; if (call == NULL) { PyErr_Format(PyExc_TypeError, "'%.200s' object is not callable", callable->ob_type->tp_name); return NULL; } if (Py_EnterRecursiveCall(" while calling a Python object")) return NULL; result = (*call)(callable, args, kwargs); Py_LeaveRecursiveCall(); return _Py_CheckFunctionResult(callable, result, NULL); } }
- 傳遞進PyObject_Call的boot->func是一個PyFunctionObject對象梧却,對應(yīng)線程執(zhí)行的方法。
- PyObject_Call結(jié)束后败去,子線程將釋放GIL篮幢,并完成銷毀線程的所有掃尾工作。
- 從t_bootstrap代碼上看为迈,子線程應(yīng)該全部執(zhí)行完成,才會通過PyThreadState_DeleteCurrent釋放GIL缺菌。
- 但實際情況正如前面章節(jié)提到的葫辐,Python會定時激活線程的調(diào)度機制,在子線程和主線程之間不斷切換伴郁,從而真正實現(xiàn)多線程機制耿战。