注:本文很多素材來源于網(wǎng)絡(luò)上前人總結(jié)和《流暢的python》一書,本人僅僅以個(gè)人視角重新整合,便于自己理解困曙,再此聲明
并發(fā)是指一次處理多件事。
并行是指一次做多件事谦去。
二者不同慷丽,但是有聯(lián)系。
一個(gè)關(guān)于結(jié)構(gòu)鳄哭,一個(gè)關(guān)于執(zhí)行盈魁。
并發(fā)用于制定方案,用來解決可能(但未必)并行的問題窃诉。
by Rob Pike
1杨耙、簡介
看了太多的人寫的多進(jìn)程,多線程飘痛,協(xié)程相關(guān)的文章珊膜,依然是云里霧里的,即便是知道了一二宣脉,可依然寫不出了相關(guān)邏輯的代碼车柠,直到有一天,我看到了這篇文章,才略有些思路竹祷,所以我想從這里開始谈跛,從新的視角下學(xué)習(xí)一下多進(jìn)程,多線程之類的東西塑陵。
附上文檔地址http://learn-gevent-socketio.readthedocs.io/en/latest/general_concepts.html
2. 并發(fā)和并行
當(dāng)我們討論實(shí)現(xiàn)線程(不管是綠線還是pthreads)和進(jìn)程時(shí)感憾,我們真正想實(shí)現(xiàn)的是并發(fā)性和/或并行性。那并發(fā)性和并行性兩者有什么區(qū)別呢令花?并發(fā)性和并行性是截然不同的概念,但也是有聯(lián)系的阻桅。特別的,Ruby的并發(fā)性是指兩個(gè)任務(wù)可以在重疊的時(shí)間段內(nèi)開始執(zhí)行兼都,運(yùn)行嫂沉,結(jié)束。但這并不一定意味著扮碧,他們會(huì)同時(shí)在同一時(shí)刻運(yùn)行(例如趟章,多線程在單核心機(jī)器上運(yùn)行)。相反慎王,并行性是兩個(gè)任務(wù)同時(shí)運(yùn)行的時(shí)候尤揣。真正的并行需要多個(gè)核心。(多進(jìn)程在多核心機(jī)器上運(yùn)行)柬祠。這里的關(guān)鍵點(diǎn)是北戏,并發(fā)線程和/或進(jìn)程并不一定是并行運(yùn)行的。
并發(fā)性涉及從不同線程管理對共享狀態(tài)的訪問漫蛔,而并行性則涉及利用多個(gè)處理器/內(nèi)核來提高計(jì)算的性能嗜愈。
3.線程和進(jìn)程
3.1 進(jìn)程
進(jìn)程是一個(gè)正在運(yùn)行的程序的一個(gè)實(shí)例。包括程序代碼和當(dāng)前的活動(dòng)莽龟,根據(jù)操作系統(tǒng)的不同蠕嫁,一個(gè)進(jìn)程可能由多個(gè)執(zhí)行指令的執(zhí)行線程組成。大多數(shù)現(xiàn)代操作系統(tǒng)都阻止了獨(dú)立進(jìn)程之間的直接通信毯盈,提供了嚴(yán)格的中介和控制的進(jìn)程間通信 (IPC).
一個(gè)進(jìn)程典型的包含以下資源:
1.an image of the executable machine code associated with the program
2.memory, which includes:2.1.executable code
2.2.process-specific data (input and output)
2.3.call stack that keeps track of active subroutines and/or other events
2.4.heap which holds intermediate computation data during run time
3.operating system descriptors of resources that are allocated to the process such as file descriptors (unix/linux) and handles (windows), dat sources and sinks
4.security attributes (process owner and set of permissions, e.g. allowable operations)
5.processor state (context) such as registers and physical memory addressing
操作系統(tǒng)將關(guān)于進(jìn)程的很多信息都放在了程序控制塊中(PCB)剃毒。操作系統(tǒng)將其進(jìn)程分隔開,并分配所需的資源搂赋。這樣它們就不太可能相互干擾從而導(dǎo)致系統(tǒng)故障赘阀。操作系統(tǒng)可以為進(jìn)程間通信提供機(jī)制,使進(jìn)程以安全且可預(yù)測的方式進(jìn)行脑奠。
3.2 線程
線程是CPU可使用的最基本單元基公。有時(shí)候也成為輕量級進(jìn)程。線程是進(jìn)程中的指令序列宋欺,其行為類似于進(jìn)程中的進(jìn)程轰豆。 它不同于進(jìn)程是因?yàn)樗鼪]有自己的程序控制塊胰伍。通常,在進(jìn)程中創(chuàng)建多線程酸休。線程在進(jìn)程內(nèi)執(zhí)行骂租,進(jìn)程在操作系統(tǒng)內(nèi)核中執(zhí)行。
一個(gè)線程的組成:
- thread ID
- program counter
- register set
- stack
注意:1. Python線程是在我所知道的所有實(shí)現(xiàn)中使用OS線程實(shí)現(xiàn)的(C Python斑司、PyPy和Jython)渗饮。對于每個(gè)Python線程,都有一個(gè)底層OS線程陡厘。
2.一些操作系統(tǒng)(Linux是其中之一)在所有正在運(yùn)行的進(jìn)程的列表中提供由同一個(gè)可執(zhí)行程序啟動(dòng)的所有不同的線程抽米。這是操作系統(tǒng)的實(shí)現(xiàn)細(xì)節(jié)特占,而不是Python糙置。在其他一些操作系統(tǒng)上,在列出所有進(jìn)程時(shí)是目,可能看不到這些線程谤饭。
3.對于操作系統(tǒng),一個(gè)進(jìn)程由多個(gè)線程組成懊纳,每個(gè)線程都是相等的.操作系統(tǒng)不知道那個(gè)是守護(hù)進(jìn)程線程揉抵。這純粹是一個(gè)Python概念。當(dāng)最后一個(gè)非守護(hù)線程完成時(shí)嗤疯,進(jìn)程將終止冤今。在這一點(diǎn)上,所有守護(hù)進(jìn)程線程都將被終止茂缚。所以戏罢,這些線程是你的進(jìn)程的一部分,但并沒有阻止它的終止脚囊。當(dāng)系統(tǒng)調(diào)用_exit函數(shù)時(shí)龟糕,進(jìn)程中斷。主線程也會(huì)中斷悔耘。python解釋器會(huì)檢測是否還有非守護(hù)線程運(yùn)行讲岁,如果沒有,就調(diào)用_exit.否則會(huì)等到非守護(hù)進(jìn)程結(jié)束后再調(diào)用_exit.
4.這個(gè)守護(hù)線程標(biāo)志是由threading模塊在純Python中實(shí)現(xiàn)的衬以。當(dāng)模塊加載后缓艳,一個(gè)Thread類將會(huì)被創(chuàng)建用來代替main線程。它的 _exitfunc 方法被注冊成一個(gè) atexit 鉤子.
import sys
import time
import threading
class WorkerThread(threading.Thread):
def run(self):
while True:
print 'Working hard'
time.sleep(0.5)
def main(args):
use_daemon = False
for arg in args:
if arg == '--use_daemon':
use_daemon = True
worker = WorkerThread()
worker.setDaemon(use_daemon)
worker.start()
time.sleep(1)
sys.exit(0)
if __name__ == '__main__':
main(sys.argv[1:])
上面的例子可以說明常規(guī)線程和守護(hù)線程的區(qū)別看峻,當(dāng)腳本以帶有--use daemon的選項(xiàng)運(yùn)行時(shí)郎任,線程被設(shè)置為守護(hù)線程,在打印出一條后备籽,整個(gè)程序就結(jié)束了舶治。如果不帶選項(xiàng)分井,程序?qū)⒊掷m(xù)打印,即便是主線程結(jié)束也不會(huì)停止霉猛,直到被kill尺锚。
線程和進(jìn)程的區(qū)別:
在單個(gè)處理器上,多線程通常是由時(shí)分多路復(fù)用(也稱為多任務(wù)處理)發(fā)生的惜浅,也就是說瘫辩,在不同線程之間的單處理器切換。這種上下文切換的速度非程诚ぃ快伐厌,這樣我們就可以在同一時(shí)間看到線程在運(yùn)行。在多處理器和多核系統(tǒng)中裸影,線程可以是真正的并發(fā)挣轨,每個(gè)處理器或CPU內(nèi)核同時(shí)執(zhí)行一個(gè)單獨(dú)的線程。
4轩猩、協(xié)程
4.1 概述
協(xié)程是一種 允許在特定位置暫途戆纾或恢復(fù)的子程序。但和 生成器 不同的是均践,協(xié)程 可以控制子程序暫停之后代碼的走向晤锹,而 生成器 僅能被動(dòng)地將控制權(quán)交還給調(diào)用者。
??Coroutines適用于實(shí)現(xiàn)合作任務(wù)彤委、迭代器鞭铆、無限列表和管道。協(xié)程看上去像是線程焦影,使用的接口類似線程接口车遂,但是實(shí)際使用非線程的方式,對應(yīng)的線程開銷也不存的偷办。協(xié)程 可以只利用一個(gè)線程更加輕便地實(shí)現(xiàn) 多任務(wù)艰额,將任務(wù)切換的開銷降至最低。和 回調(diào) 等其他異步技術(shù)相比椒涯,協(xié)程 維持了正常的代碼流程柄沮,在保證代碼可讀性的同時(shí)最大化地利用了 阻塞 IO 的空閑時(shí)間。它的高效與簡潔贏得了開發(fā)者們的擁戴废岂。
??常用的協(xié)程實(shí)現(xiàn)方式分為有以gevent祖搓,在底層實(shí)現(xiàn)了協(xié)程調(diào)度,并將大部分的 阻塞 IO 重寫為異步湖苞。使用c語言實(shí)現(xiàn)的拯欧,第二種為Tornado中異步編程方式,直到python3.5中财骨,語法上以async和await關(guān)鍵字實(shí)現(xiàn)的協(xié)程镐作,可讀性更好一些藏姐。
??具體的說,協(xié)程是函數(shù)體中包含yield或者yield from 的函數(shù)该贾。一個(gè)協(xié)程可以通常處于四種狀態(tài)之一('GEN_CREATED',,'GEN_RUNNING','GEN_SUSPENDED','GEN_CLOSED').協(xié)程狀態(tài)可以使用inspect.getgeneratorstate(...)函數(shù)來獲知羔杨。
??第一次激活線程通常有next(my_coro)完成。你也可以使用my_coro.send(None)完成杨蛋,他們的作用是一樣的兜材。
??協(xié)程的執(zhí)行被明確的掛起在yield關(guān)鍵字上。具體例子:a= yield b.位于關(guān)鍵字右側(cè)的值也就是b值逞力,被返回給調(diào)用者曙寡,協(xié)程讓出控制權(quán)。直到調(diào)用者調(diào)用send方法發(fā)送一個(gè)值寇荧,協(xié)程拿回控制權(quán)举庶,調(diào)用者發(fā)送的值被分配給等號左側(cè)的變量,也就是變量a.
4.2 異常處理
調(diào)用者通過throw和close方法砚亭,可以明確的發(fā)送異常給協(xié)程灯变。
4.3 yield from 表達(dá)式
在python 3.3版本后殴玛,一個(gè)協(xié)程可以返回給調(diào)用者值捅膘,但是值是作為StopIteraton異常對象屬性(value屬性)。yield from 自動(dòng)的啟動(dòng)協(xié)程滚粟,并處理StopIteration異常寻仗。
簡單的,yield from 可用于簡化 for 循環(huán)中的 yield 表達(dá)式凡壤。例如
yield from 也可以用來鏈接可迭代對象署尤,比如:
>>> def chain(*iterables):
... for it in iterables:
... yield from it
...
>>> s = 'ABC'
>>> t = tuple(range(3))
>>> list(chain(s, t))
['A', 'B', 'C', 0, 1, 2]
yield from x 表達(dá)式對 x 對象所做的第一件事是,調(diào)用 iter(x)亚侠,從中獲取迭代器曹体。因此,x 可以是任何可迭代的對象硝烂。
yield from 的主要功能是打開雙向通道箕别,把最外層的調(diào)用方與最內(nèi)層的子生成器連接起來,這樣二者可以直接發(fā)送和產(chǎn)出值滞谢,還可以直接傳入異常串稀,而不用在位于中間的協(xié)程中添加大量處理異常的樣板代碼。有了這個(gè)結(jié)構(gòu)狮杨,協(xié)程可以通過以前不可能的方式委托職責(zé)母截。
yield from 結(jié)構(gòu)會(huì)在內(nèi)部自動(dòng)捕獲 StopIteration 異常。這種處理方式與 for 循環(huán)處理 StopIteration 異常的方式一樣:循環(huán)機(jī)制使用用戶易于理解的方式處理異常橄教。對 yield from 結(jié)構(gòu)來說清寇,解釋器不僅會(huì)捕獲 StopIteration 異常喘漏,還會(huì)把value 屬性的值變成 yield from 表達(dá)式的值』蹋可惜陷遮,我們無法在控制臺中使用交互的方式測試這種行為,因?yàn)樵诤瘮?shù)外部使用 yield from(以及 yield)會(huì)導(dǎo)致句法出錯(cuò)垦江。
借用別人的例子來說明一下yield from的使用
from collections import namedtuple
Result = namedtuple('Result', 'count average')
# the subgenerator
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None: # The crucial terminating condition. Without it, a yield from calling this coroutine will block forever
break
total += term
count += 1
average = total/count
return Result(count, average) # The returned Result will be the value of the yield from expression in grouper.
# the delegating generator
def grouper(results, key):
while True:
# Whenever grouper is sent a value, it’s piped into the `averager` instance by the `yield from`.
# `grouper` will be suspended here as long as the `averager` instance is consuming values sent by the
# client. When an `averager` instance runs to the end, the value it returns is bound to `results[key]`.
# The `while` loop then proceeds to create another `averager` instance to consume more values.
results[key] = yield from averager()
# the client code, a.k.a. the caller
def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group)
for value in values:
# Send each value into the grouper. That value ends up in the `term = yield` line of averager;
# grouper never has a chance to see it.
group.send(value)
# causes the current `averager` instance to terminate, and allows `grouper` to run again, which
# creates another `averager` for the next group of values.
group.send(None) # important!
# print(results) # uncomment to debug
report(results)
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))
data={ 'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
main(data)
把 None 傳入 grouper帽馋,導(dǎo)致當(dāng)前的 averager 實(shí)例終止,也讓 grouper 繼續(xù)運(yùn)行比吭,再創(chuàng)建一個(gè) averager 實(shí)例绽族,處理下一組值。
這個(gè)試驗(yàn)想表明的關(guān)鍵一點(diǎn)是衩藤,如果子生成器不終止吧慢,委派生成器會(huì)在yield from 表達(dá)式處永遠(yuǎn)暫停。如果是這樣赏表,程序不會(huì)向前執(zhí)行检诗,因?yàn)?yield from(與 yield 一樣)把控制權(quán)轉(zhuǎn)交給客戶代碼(即,委派生成器的調(diào)用方)了瓢剿。顯然逢慌,肯定有任務(wù)無法完成。
上面的示例展示了 yield from 結(jié)構(gòu)最簡單的用法间狂,只有一個(gè)委派生成器和一個(gè)子生成器攻泼。因?yàn)槲缮善飨喈?dāng)于管道,所以可以把任意數(shù)量個(gè)委派生成器連接在一起:一個(gè)委派生成器使用 yield from 調(diào)用一個(gè)子生成器鉴象,而那個(gè)子生成器本身也是委派生成器忙菠,使用 yield from 調(diào)用另一個(gè)子生成器,以此類推纺弊。最終牛欢,這個(gè)鏈條要以一個(gè)只使用 yield表達(dá)式的簡單生成器結(jié)束;不過淆游,也能以任何可迭代的對象結(jié)束傍睹。
4.4 asyncio
asyncio 這個(gè)包使用事件循環(huán)驅(qū)動(dòng)的協(xié)程實(shí)現(xiàn)并發(fā)。該包使用的“協(xié)程”是較嚴(yán)格的定義稽犁。適合asyncio API 的協(xié)程在定義體中必須使用 yield from焰望,而不能使用 yield。此外已亥,
適合 asyncio 的協(xié)程要由調(diào)用方驅(qū)動(dòng)熊赖,并由調(diào)用方通過 yield from 調(diào)用;或者把協(xié)程傳給 asyncio 包中的某個(gè)函數(shù)虑椎,例如 asyncio.async(...) 和本章要介紹的其他函數(shù)震鹉,從而驅(qū)動(dòng)協(xié)程俱笛。最后,@asyncio.coroutine 裝飾器應(yīng)該應(yīng)用在協(xié)程上.
import asyncio
import itertools
import sys
@asyncio.coroutine
def spin(msg):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
yield from asyncio.sleep(.1)
except asyncio.CancelledError:
break
write(' ' * len(status) + '\x08' * len(status))
@asyncio.coroutine
def slow_function():
# 假裝等待I/O一段時(shí)間
yield from asyncio.sleep(3)
return 42
@asyncio.coroutine
def supervisor():
spinner = asyncio.async(spin('thinking!'))
print('spinner object:', spinner)
result = yield from slow_function()
spinner.cancel()
return result
def main():
loop = asyncio.get_event_loop()
# 驅(qū)動(dòng) supervisor 協(xié)程传趾,讓它運(yùn)行完畢迎膜;這個(gè)協(xié)程的返回值是這次調(diào)用的返回值。
result = loop.run_until_complete(supervisor())
loop.close()
print('Answer:', result)
if __name__ == '__main__':
main()
4.5 asyncio.Future
asyncio.Future 類與 concurrent.futures.Future 類的接口基本一致浆兰,不過實(shí)現(xiàn)方式不同磕仅,不可以互換.
在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一個(gè)協(xié)程簸呈,排定它的運(yùn)行時(shí)間榕订,然后返回一個(gè) asyncio.Task 實(shí)例——也是 asyncio.Future 類的實(shí)例,因?yàn)?Task 是Future 的子類蜕便,用于包裝協(xié)程劫恒。這與調(diào)用 Executor.submit(...) 方法創(chuàng)建
concurrent.futures.Future 實(shí)例是一個(gè)道理。
與 concurrent.futures.Future 類似轿腺,asyncio.Future 類也提供了.done()两嘴、.add_done_callback(...) 和 .result() 等方法。不過 .result() 方法差別很大族壳。asyncio.Future 類的 .result() 方法沒有參數(shù)憔辫,因此不能指定超時(shí)時(shí)間。此外决侈,如果調(diào)用 .result() 方法時(shí)期物還沒運(yùn)行完畢螺垢,那么 .result() 方法不會(huì)阻塞去等待結(jié)果喧务,而是拋出 asyncio.InvalidStateError 異常.獲取 asyncio.Future 對象的結(jié)果通常使用 yield from赖歌,從中產(chǎn)出結(jié)果.
總之,因?yàn)?asyncio.Future 類的目的是與 yield from 一起使用功茴,所以通常不需要使用以下方法:
1.無需調(diào)用 my_future.add_done_callback(...)庐冯,因?yàn)榭梢灾苯影严朐谄谖镞\(yùn)行結(jié)束后執(zhí)行的操作放在協(xié)程中 yield from my_future 表達(dá)式的后面。這是協(xié)程的一大優(yōu)勢:協(xié)程是可以暫停和恢復(fù)的函數(shù)坎穿。
2.無需調(diào)用 my_future.result()展父,因?yàn)?yield from 從期物中產(chǎn)出的值就是結(jié)果(例如,result = yield from my_future)玲昧。
在 asyncio 包中栖茉,期物和協(xié)程關(guān)系緊密,因?yàn)榭梢允褂?yield from 從asyncio.Future 對象中產(chǎn)出結(jié)果孵延。這意味著吕漂,如果 foo 是協(xié)程函數(shù)(調(diào)用后返回協(xié)程對象),抑或是返回 Future 或 Task 實(shí)例的普通函數(shù)尘应,那么可以這樣寫:res = yield from foo()惶凝。這是 asyncio 包的 API 中很多地方可以互換協(xié)程與期物的原因之一吼虎。為了執(zhí)行這些操作,必須排定協(xié)程的運(yùn)行時(shí)間苍鲜,然后使用 asyncio.Task 對象包裝協(xié)程思灰。對協(xié)程來說,獲取 Task 對象有兩種主要方式:
1.asyncio.async(coro_or_future, *, loop=None)
2.BaseEventLoop.create_task(coro)
4.6 關(guān)于測試代碼中的time.sleep注意
實(shí)際應(yīng)用中必須使用支持異步操作的非阻塞代碼才能實(shí)現(xiàn)真正的異步混滔。舉個(gè)例子洒疚,比如,我們在爬蟲的時(shí)候坯屿,需要請求的url列表中有和很多需要請求的url.這個(gè)時(shí)候使用requests庫中的get請求方法拳亿,即便是放在異步實(shí)現(xiàn)的代碼中,其實(shí)際操作還是同步請求的愿伴,真正想要實(shí)現(xiàn)異步還得使用非阻塞形式實(shí)現(xiàn)的庫aiohttp.