第十天 numpy模塊介紹和使用
今天計劃學習numpy模塊以及Python的多線程編程州疾,學習項目及練習源碼地址:
GitHub源碼
numpy模塊
NumPy是Python中科學計算的基礎(chǔ)包杭朱。它是一個Python庫桃纯,提供多維數(shù)組對象扇谣,各種派生對象(如掩碼數(shù)組和矩陣)癞志,以及用于數(shù)組快速操作的各種例程充活,包括數(shù)學亿眠,邏輯糠赦,形狀操作会傲,排序,選擇拙泽,I/O離散傅立葉變換淌山,基本線性代數(shù),基本統(tǒng)計運算顾瞻,隨機模擬等等泼疑。
NumPy包的核心是ndarray對象。這封裝了同構(gòu)數(shù)據(jù)類型的n維數(shù)組荷荤,許多操作在編譯代碼中執(zhí)行以提高性能退渗。NumPy數(shù)組和標準Python序列之間有幾個重要的區(qū)別:
- NumPy數(shù)組在創(chuàng)建時具有固定大小,與Python列表(可以動態(tài)增長)不同蕴纳。更改ndarray的大小將創(chuàng)建一個新數(shù)組并刪除原始數(shù)組会油。
- NumPy數(shù)組中的元素都需要具有相同的數(shù)據(jù)類型,因此在內(nèi)存中的大小相同袱蚓。例外:可以有(Python钞啸,包括NumPy)對象的數(shù)組,從而允許不同大小的元素的數(shù)組喇潘。
- NumPy數(shù)組有助于對大量數(shù)據(jù)進行高級數(shù)學和其他類型的操作体斩。通常,與使用Python的內(nèi)置序列相比颖低,這些操作的執(zhí)行效率更高絮吵,代碼更少。
- 越來越多的基于Python的科學和數(shù)學軟件包正在使用NumPy數(shù)組; 雖然這些通常支持Python序列輸入忱屑,但它們在處理之前將這些輸入轉(zhuǎn)換為NumPy數(shù)組蹬敲,并且它們通常輸出NumPy數(shù)組。換句話說莺戒,為了有效地使用當今大量(甚至大多數(shù))基于Python的科學/數(shù)學軟件伴嗡,只知道如何使用Python的內(nèi)置序列類型是不夠的 - 還需要知道如何使用NumPy數(shù)組。
安裝numpy和使用示例
pip3 install numpy
import numpy
a = numpy.arange(10)
print(a) # [0 1 2 3 4 5 6 7 8 9] 看起來是個列表从铲,實際不是
print([1,2,3]) # [1, 2, 3] 這才是列表
在這個例子中只涉及numpy模塊中的一個arange函數(shù)瘪校,該函數(shù)可以傳入一個整數(shù)類型的參數(shù)n,函數(shù)返回值看著像一個列表,其實返回值類型是numpy.ndarray阱扬。這是Numpy中特有的數(shù)組類型泣懊。如果傳入arange函數(shù)的參數(shù)值是n,那么arange函數(shù)會返回0 到n-1的ndarray類型的數(shù)組麻惶。
numpy中的數(shù)組
numpy.array
numpy模塊的array函數(shù)可以生成多維數(shù)組馍刮。例如,如果要生成一個二維數(shù)組窃蹋,需要向array函數(shù)傳遞一個列表類型的參數(shù)卡啰。每一個列表元素是一維的ndarray類型數(shù)組,作為二維數(shù)組的行警没。另外碎乃,通過ndarray類的shape屬性可以獲得數(shù)組每一維的元素個數(shù)(元組形式),也可以通過shape[n]形式獲得每一維的元素個數(shù)惠奸,其中n是維度梅誓,從0開始。
numpy.array(object,dtype=None,copy=None,order=None,subok=Flase,ndmin=0)
參數(shù)名稱 | 描述 |
---|---|
object | 數(shù)組或嵌套的數(shù)列 |
dtype | 數(shù)組元素的數(shù)據(jù)類型佛南,可選 |
copy | 對象是否需要復制梗掰,可選 |
order | 創(chuàng)建數(shù)組的樣式,C為行方向嗅回,F(xiàn)為列方向及穗,A為任意方向(默認) |
subok | 默認返回一個與基類類型一致的數(shù)組 |
ndmin | 指定生成數(shù)組的最小維度 |
-
一維數(shù)組的創(chuàng)建
a = numpy.array([1,2,3,4,5]) print(a) print('數(shù)組的緯度:',a.shape) # (5,) 元祖
-
二維數(shù)組
a = numpy.array([[1,2],[3,4],[5,6]]) print(a) print('數(shù)組的緯度:',a.shape) # (5,) 元祖
-
dtyp參數(shù)的使用
a = numpy.array([1,2,3,4,5],dtype=complex) print(a)
numpy.arange
使用 arange 函數(shù)創(chuàng)建數(shù)值范圍并返回 ndarray 對象,函數(shù)格式如下:
numpy.arange(start,stop,step=1,dtype=None)
注意:不包含stop的值
numpy.random.random創(chuàng)建隨機數(shù)數(shù)組
numpy.random.random(size=None)
默認返回[0.0,1.0)之間的隨機數(shù)绵载,不包含1
numpy.random.randint創(chuàng)建隨機數(shù)數(shù)組
numpy.random.randint(low=0,high=None,size=None)
該方法有三個參數(shù)low埂陆、high、size 三個參數(shù)娃豹。默認 high 是 None,如果只有l(wèi)ow焚虱,那范圍就是[0,low)。如果有high懂版,范圍就是[low,high)鹃栽。
a = numpy.random.randint(4,10,size=(3,4,2))
print(a) # 第一緯度3個元素,第二緯度4個元素,第三緯度2個元素 隨機數(shù)在最里面的緯度
# [
# [[4 5][5 5][5 7][9 6]]
# [[6 9][5 7][7 9][7 9]]
# [[8 7][8 7][7 6][8 6]]
# ]
numpy.random.randn 返回標準正太分布
函數(shù)返回一個或一組樣本躯畴,具有標準正態(tài)分布(期望為 0民鼓,方差為 1)。 dn表格每個維度蓬抄。返回值為指定維度的array丰嘉。
numpy.random.randn(d0,d1,...,dn)
numpy.random.normal指定期望和方差的正太分布
numpy.random.normal(loc=0,scale=1,size=None)
ndarray對象
numpy最重要的一個特點是其N維數(shù)組對象ndarray,它是一系列同類型數(shù)據(jù)的集合嚷缭,以0下標為開始進行集合中元素的索引饮亏。
- ndarray 對象是用于存放同類型元素的多維數(shù)組。
- ndarray 中的每個元素在內(nèi)存中都有相同存儲大小的區(qū)域。
- ndarray 內(nèi)部由以下內(nèi)容組成:
- 一個指向數(shù)據(jù)(內(nèi)存或內(nèi)存映射文件中的一塊數(shù)據(jù))的指針克滴。
- 數(shù)據(jù)類型或dtype,描述在數(shù)組中的固定大小值的格子优床。
- 一個表示數(shù)組形狀(shape)的元組劝赔,表示各維度大小的元組。
重要的ndarray對象屬性有:
屬性 | 說明 |
---|---|
ndarray.ndim | 秩胆敞,即軸的數(shù)量或維度的數(shù)量 |
ndarray.shape | 數(shù)組的維度着帽,對于矩陣,n 行 m 列 |
ndarray.size | 數(shù)組元素的總個數(shù)移层,相當于 .shape 中 n*m 的值 |
ndarray.dtype | ndarray 對象的元素類型 |
ndarray.itemsize | ndarray 對象中每個元素的大小仍翰,以字節(jié)為單位 |
ndarray.flags | ndarray 對象的內(nèi)存信息 |
ndarray.real | ndarray 元素的實部 |
ndarray.imag | ndarray 元素的虛部 |
ndarray.data | 包含實際數(shù)組元素的緩沖區(qū),由于一般通過數(shù)組的索引獲取元素观话,所以通常不需要使用這個屬性予借。 |
numpy.zeros 創(chuàng)建指定大小的數(shù)組,數(shù)組元素以 0 來填充
numpy.zeros(shape, dtype = float, order = 'C')
有"C"和"F"兩個選項,分別代表频蛔,行優(yōu)先和列優(yōu)先灵迫,在計算機內(nèi)存中的存儲元素的順序。
舉例:
x = numpy.zeros(5) # [0. 0. 0. 0. 0.] 默認是float
x = numpy.zeros((5,),dtype=int) # # [0 0 0 0 0]
x = numpy.zeros((2,2),dtype=str) #[['' '']['' '']]
numpy.ones 創(chuàng)建指定形狀的數(shù)組晦溪,數(shù)組元素以1來填充
numpy.ones(shape, dtype = None, order = 'C')
類似于zeros
舉例:
x = numpy.ones((5,),dtype=int) # # [1 1 1 1 1]
x = numpy.ones((2,2),dtype=str) #[['1' '1']['1' '1']]
numpy.empty 方法用來創(chuàng)建一個指定形狀(shape)瀑粥、數(shù)據(jù)類型(dtype)且未初始化的 數(shù)組,里面的元素的值是之前內(nèi)存的值
numpy.empty(shape, dtype = float, order = 'C')
舉例:
x = numpy.empty((5,),dtype=int)
x = numpy.empty((2,2),dtype=str)
切片和索引
ndarray對象的內(nèi)容可以通過索引或切片來訪問和修改三圆,與 Python中l(wèi)ist的切片操作一樣狞换。
a = numpy.arange(10)
b = a[2:7:1]
c = a[3:]
print(a[1])
改變數(shù)組的維度
處理數(shù)組的一項重要工作就是改變數(shù)組的維度,包含提高數(shù)組的維度和降低數(shù)組的維度舟肉,還包括數(shù)組的轉(zhuǎn)置修噪。Numpy提供的大量 API可以很輕松地完成這些數(shù)組的操作。例如路媚,通過reshape方法可以將一維數(shù)組變成二維割按、三維或者多維數(shù)組。通過ravel方法或 flatten方法可以將多維數(shù)組變成一維數(shù)組磷籍。改變數(shù)組的維度還可以直接設(shè)置Numpy數(shù)組的shape屬性(元組類型)适荣,通過 resize法也可以改變數(shù)組的維度。
a = numpy.arange(1,13)
print(a)
b = a.reshape(2,3,2) # 乘積需要等于元素個數(shù)
print(b)
c = b.ravel()
print(c)
d = c.flatten()
print(d)
數(shù)組的拼接
水平拼接
通過hstack函數(shù)可以將兩個或多個數(shù)組水平組合起來形成一個數(shù)組院领。水平組合必須要滿足一個條件弛矛,就是所有參與水平組合的數(shù)組的行數(shù)必須相同,否則進行水平組合會拋出異常比然。
垂直拼接
通過vstack函數(shù)可以將兩個或多個數(shù)組垂直組合起來形成一個數(shù)組丈氓。水平組合必須要滿足一個條件,就是所有參與水平組合的數(shù)組的列數(shù)必須相同,否則進行水平組合會拋出異常万俗。
numpy.concatenate函數(shù)用于沿指定軸連接相同形狀的兩個或多個數(shù)組
格式如下:
numpy.concatenate((a1, a2, ...), axis)
axis = 0 默認 vstack
axis = 1 hstack
axis = 2 dstack
多線程和并發(fā)編程
進程
在學習線程前湾笛,先了解下Python關(guān)于進程的知識。
什么是進程:程序編寫完沒有運行稱之為程序闰歪。正在運行的代碼就是進程嚎研。
在Python3語言中,對多進程支持的是multiprocessing模塊和subprocess模塊库倘。multiprocessing模塊為在子進程中運行任務(wù)临扮、通訊和共享數(shù)據(jù),以及執(zhí)行各種形式的同步提供支持教翩。
進程的創(chuàng)建
Python提供了非常好用的多進程包multiprocessing杆勇,只需要定義一個函數(shù),Python會完成其他所有事情饱亿。借助這個包蚜退,可以輕松完成從單進程到并發(fā)執(zhí)行的轉(zhuǎn)換。multiprocessing支持子進程彪笼、通信和共享數(shù)據(jù)关霸。語法格式如下:
Process([group [, target [, name [, args [, kwargs]]]]])
其中target表示調(diào)用對象,args表示調(diào)用對象的位置參數(shù)元組杰扫。kwargs表示調(diào)用對象的字典队寇。name為別名。group參數(shù)未使用章姓,值始終為None佳遣。
構(gòu)造函數(shù)簡單地構(gòu)造了一個Process進程,Process的實例方法凡伊、Process的實例屬性如下表所示:
方法 | 描述 |
---|---|
is_alive() | 如果p仍然運行零渐,返回True |
join([timeout]) | 等待進程p終止。Timeout是可選的超時期限系忙,進程可以被鏈接無數(shù)次诵盼,但如果連接自身則會出錯 |
run() | 進程啟動時運行的方法。默認情況下银还,會調(diào)用傳遞給Process構(gòu)造函數(shù)的target风宁。定義進程的另一種方法是繼承Process類并重新實現(xiàn)run()函數(shù) |
start() | 啟動進程,這將運行代表進程的子進程蛹疯,并調(diào)用該子進程中的run()函數(shù) |
terminate() | 強制終止進程戒财。如果調(diào)用此函數(shù),進程p將被立即終止捺弦,同時不會進行任何清理動作饮寞。如果進程p創(chuàng)建了它自己的子進程孝扛,這些進程將變?yōu)榻┦M程。使用此方法時需要特別小心幽崩。如果p保存了一個鎖或參與了進程間通信苦始,那么終止它可能會導致死鎖或I/O損壞 |
【示例】創(chuàng)建子進程并執(zhí)行
from multiprocessing import Process,set_start_method
set_start_method('spawn',True) # 在vscode中運行時需要加上這個,否則要報錯
#定義子進程代碼
def run_proc():
print('子進程運行中')
if __name__=='__main__':
print('父進程運行')
p=Process(target=run_proc)
print('子進程將要執(zhí)行')
p.start()
【示例】創(chuàng)建子進程慌申,傳遞參數(shù)
from multiprocessing import Process,set_start_method
import os
from time import sleep
set_start_method('spawn',True)
#創(chuàng)建子進程代碼
def run_proc(name,age,**kwargs):
for i in range(5):
print('子進程運行中陌选,參數(shù)name:%s,age:%d'%(name,age))
print('字典參數(shù)kwargs:',kwargs)
sleep(0.5)
if __name__=='__main__':
print('主進程開始運行')
p=Process(target=run_proc,args=('張山',38),kwargs={'book1':'test1','chuban':'xin'})
print('子進程將要執(zhí)行')
p.start()
【示例】join()方法的使用
from multiprocessing import Process,set_start_method
from time import sleep
set_start_method('spawn',True)
def worker(interval):
print("work start")
sleep(interval)
print("work end")
if __name__ == "__main__":
p = Process(target = worker, args = (10,))
p.start()
print('等待進程p終止')
p.join(3) # 如果子進程未結(jié)束 程序已經(jīng)結(jié)束,子程序還是會繼續(xù)執(zhí)行的
print("主進程結(jié)束!")
'''
等待進程p終止
work start
主進程結(jié)束!
work end
'''
Process實例屬性:
方法 | 描述 |
---|---|
name | 進程的名稱 |
pid | 進程的整數(shù)進程ID |
【示例】創(chuàng)建函數(shù)并將其作為多個進程
from multiprocessing import Process,set_start_method
from time import sleep
set_start_method('spawn',True)
#創(chuàng)建進程調(diào)用函數(shù)
def work1(interval):
print('work1')
sleep(interval)
print('end work1')
def work2(interval):
print('work2')
sleep(interval)
print('end work2')
def work3(interval):
print('work3')
sleep(interval)
print('end work3')
if __name__=='__main__':
#創(chuàng)建進程對象
p1=multiprocessing.Process(target=work1,args=(4,))
p2=multiprocessing.Process(target=work2,args=(3,))
p3=multiprocessing.Process(target=work3,args=(2,))
#啟動進程
p1.start()
p2.start()
p3.start()
p1.join()
print("p1 join")
p2.join()
print("p2 join")
p3.join()
print("p3 join")
print('主進程結(jié)束')
創(chuàng)建自定義Process子類
創(chuàng)建進程的方式還可以使用類的方式太示,可以自定義一個類,繼承Process類香浩,每次實例化這個類的時候类缤,就等同于實例化一個進程對象,示例如下:
from multiprocessing import Process
import time
#定義線程類
class MyProcess(Process):
def __init__(self,interval):
Process.__init__(self)
self.interval=interval
def run(self): # 一定要重新這個方法
print('子進程開始執(zhí)行的時間:{}'.format(time.ctime()))
time.sleep(self.interval)
print('子進程結(jié)束的時間:{}'.format(time.ctime()))
if __name__=='__main__':
#創(chuàng)建進程
p=MyProcess(3)
#啟動進程
p.start()
p.join()
print('主進程結(jié)束')
進程池
在利用Python進行系統(tǒng)管理的時候邻吭,特別是同時操作多個文件目錄餐弱,或者遠程控制多臺主機,并行操作可以節(jié)約大量的時間囱晴。當被操作對象數(shù)目不大時膏蚓,可以直接利用multiprocessing中的Process動態(tài)成生多個進程,十幾個還好畸写,但如果是上百個驮瞧,上千個目標,手動的去限制進程數(shù)量卻又太過繁瑣酣栈,此時可以發(fā)揮進程池的功效蹲嚣。
Pool可以提供指定數(shù)量的進程汁雷,供用戶調(diào)用,當有新的請求提交到pool中時狂魔,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求淫痰;但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值最楷,那么該請求就會等待,直到池中有進程結(jié)束待错,才會創(chuàng)建新的進程籽孙。Pool的語法格式如下:
Pool([numprocess [, initializer [, initargs]]])
其中numprocess是要創(chuàng)建的進程數(shù)。如果省略此參數(shù)火俄,將使用cpu_count()的值蚯撩。Initializer是每個工作進程啟動時要執(zhí)行的可調(diào)用對象。Initargs是要傳遞給initializer的參數(shù)元祖烛占。Initializer默認為None胎挎。
Pool類的實例方法如下表所示:
方法 | 描述 |
---|---|
apply(func [,args [,kwargs]]) | 在一個池工作進程中執(zhí)行函數(shù)(args沟启,*kwargs),然后返回結(jié)果犹菇。 |
apply_async | (func [, args [,kwargs [,callback ] ] ]) 在一個池工作進程中異步地執(zhí)行函數(shù)(args德迹,*kwargs),然后返回結(jié)果揭芍。此方法的結(jié)果是AsyncResult類的實例胳搞,稍后可用于獲得最終結(jié)果。Callback是可調(diào)用對象称杨,接受輸入?yún)?shù)肌毅。當func的結(jié)果變?yōu)榭捎脮r,將立即傳遞給callback姑原。Callback禁止執(zhí)行任何阻塞操作悬而,否則將阻塞接收其他異步操作中的結(jié)果 |
close() | 關(guān)閉進程池,防止進行進一步操作锭汛。如果還有掛起的操作笨奠,它們將在工作進程終止之前完成 |
join() | 等待所有工作進程退出。此方法只能在close()或者terminate()方法之后調(diào)用 |
imap( func唤殴,iterable [ ,chunksize] ) | map()函數(shù)的版本之一般婆,返回迭代器而非結(jié)果列表 |
imap_unordered( func,iterable [朵逝,chunksize] ) | 同imap()函數(shù)一樣蔚袍,只是結(jié)果的順序根據(jù)從工作進程接收到的時間任意確定 |
map( func,iterable [配名,chunksize] ) | 將可調(diào)用對象func應用給iterable中的所有項页响,然后以列表的形式返回結(jié)果。通過將iterable劃分為多塊并將工作分派給工作進程段誊,可以并行地執(zhí)行這項操作闰蚕。chunksize指定每塊中的項數(shù)。如果數(shù)量較大连舍,可以增大chunksize的值來提升性能 |
map_async( func没陡,iterable [,chunksize [索赏,callback]] ) | 同map()函數(shù)盼玄,但結(jié)果的返回是異步的。返回值是AsyncResult類的實例潜腻,稍后可用與獲取結(jié)果埃儿。Callback是指接受一個參數(shù)的可調(diào)對象。如果提供callable融涣,當結(jié)果變?yōu)榭捎脮r童番,將使用結(jié)果調(diào)用callable |
terminate() | 立即終止所有工作進程精钮,同時不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收剃斧,將自動調(diào)用此函數(shù) |
get( [ timeout] ) | 返回結(jié)果轨香,如果有必要則等待結(jié)果到達。Timeout是可選的超時幼东。如果結(jié)果在指定時間內(nèi)沒有到達臂容,將引發(fā)multiprocessing.TimeoutError異常。如果遠程操作中引發(fā)了異常根蟹,它將在調(diào)用此方法時再次被引發(fā) |
ready() | 如果調(diào)用完成脓杉,則返回True |
sucessful() | 如果調(diào)用完成且沒有引發(fā)異常,返回True简逮。如果在結(jié)果就緒之前調(diào)用此方法球散,將引發(fā)AssertionError異常 |
wait( [timeout] ) | 等待結(jié)果變?yōu)榭捎谩imeout是可選的超時 |
非阻塞(apply_async)進程池示例:
from multiprocessing import Process,set_start_method,Pool
import time,os
set_start_method('spawn',True)
def func(msg):
print("start:", msg)
time.sleep(3)
print("end:",msg)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in range(5):
msg = "hello %d" %(i)
#維持執(zhí)行的進程總數(shù)為processes买决,當一個進程執(zhí)行完畢后會添加新的進程進去
pool.apply_async(func, (msg, ))
pool.close() #進程池關(guān)閉之后不再接收新的請求
# 調(diào)用join之前沛婴,先調(diào)用close函數(shù)吼畏,否則會出錯督赤。
# 執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束
pool.join() # 如果不等待,主程序會提前結(jié)束泻蚊,未完的子程序會報錯
print('主進程結(jié)束')
【示例】使用進程池(阻塞)apply:
from multiprocessing import Process,set_start_method,Pool
import time,os
set_start_method('spawn',True)
def func(msg):
print("start:", msg)
time.sleep(3)
print("end",msg)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in range(5):
msg = "hello %d" %(i)
#維持執(zhí)行的進程總數(shù)為processes躲舌,當一個進程執(zhí)行完畢后會添加新的進程進去
pool.apply(func, (msg, ))
pool.close()
#調(diào)用join之前,先調(diào)用close函數(shù)性雄,否則會出錯没卸。
# 執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束
pool.join()
很明顯,在這里進程是一個添加等待執(zhí)行完畢之后才會添加下一個進程秒旋。
進程間通信
- 全局變量在多個進程中不共享约计,進程之間的數(shù)據(jù)是獨立的,默認情況下互不影響迁筛。
【示例】多個進程中數(shù)據(jù)不共享
from multiprocessing import Process
num=1
def work1():
global num
num+=5
print('子進程1運行煤蚌,num:',num)
def work2():
global num
num += 10
print('子進程2運行,num:',num)
if __name__=='__main__':
print('父進程開始運行')
p1=Process(target=work1)
p2=Process(target=work2)
p1.start()
p2.start()
p1.join()
p2.join()
'''
子進程1運行细卧,num: 6
子進程2運行尉桩,num: 11
'''
- Queue實現(xiàn)進程間的通信
Queue是多進程安全的隊列,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞贪庙。
put方法用以插入數(shù)據(jù)到隊列中蜘犁,put方法還有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認值)止邮,并且timeout為正值这橙,該方法會阻塞timeout指定的時間奏窑,直到該隊列有剩余的空間。如果超時析恋,會拋出Queue.full異常良哲。如果blocked為False,但該Queue已滿助隧,會立即拋出Queue.full異常筑凫。
get方法可以從隊列讀取并且刪除一個元素。同樣并村,get方法有兩個可選參數(shù):blocked和timeout巍实。如果blocked為True(默認值),并且timeout為正值哩牍,那么在等待時間內(nèi)沒有取到任何元素棚潦,會拋出Queue.Empty異常。如果blocked為False膝昆,有兩種情況存在丸边,如果Queue有一個值可用,則立即返回該值荚孵,否則妹窖,如果隊列為空,則立即拋出Queue.Empty異常收叶。
Queue實例方法表:
方法 | 描述 |
---|---|
cancle_join_thread() | 不會在進程退出時自動連接后臺線程骄呼。這可以防止join_thread()方法阻塞 |
close() | 關(guān)閉隊列,防止隊列中加入更多數(shù)據(jù)判没。調(diào)用此方法時蜓萄,后臺線程將繼續(xù)寫入那些已入隊列尚未寫入數(shù)據(jù),但將在此方法完成時馬上關(guān)閉 |
empty() | 如果調(diào)用此方法時q為空澄峰,返回True |
full() | 如果q已滿嫉沽,返回True |
get([block [,timeout]) | 返回q中的一個項。如果q為空俏竞,此方法將阻塞绸硕,直到隊列中有項可用為止。Block用于控制阻塞行為胞此,默認為True臣咖。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)漱牵。Timeout是可選超時時間夺蛇,用在阻塞模式中。如果在指定的時間間隔內(nèi)沒有項變?yōu)榭捎煤ㄕ停瑢⒁l(fā)Queue.Empty異常 |
join_thread() | 連接隊列的后臺線程刁赦。此方法用于在調(diào)用q.close()方法之后娶聘,等待所有隊列項被消耗。默認情況下此方法由不是q的原始創(chuàng)建者的所有進程調(diào)用甚脉。調(diào)用q.cancle_join_thread()方法可以禁止這種行為 |
put(item [ , block [, timeout]]) | 將item放入隊列丸升。如果隊列已滿,此方法將阻塞至有空間可用為止牺氨。Block控制阻塞行為狡耻,默認為True。如果設(shè)置為False猴凹,將引發(fā)Queue.Empty異常(定義在Queue模塊中)夷狰。Timeout指定在阻塞模式中等待可用時空間的時間長短。超時后將引發(fā)Queue.Full異常郊霎。 |
qsize() | 返回目前隊列中項的正確數(shù)量沼头。 |
joinableQueue([maxsize]) | 創(chuàng)建可連接的共享進程隊列。這就像是一個Queue對象书劝,但隊列允許項的消費者通知生產(chǎn)者項已經(jīng)被成功處理进倍。通知進程是使用共享的信號和條件變量來實現(xiàn)的 |
task_done() | 消費者使用此方法發(fā)出信號,表示q.get()返回的項已經(jīng)被處理购对。如果調(diào)用此方法的次數(shù)大于從隊列中刪除的項的數(shù)量猾昆,將引發(fā)ValueError異常 |
join() | 生產(chǎn)者使用此方法進行阻塞,知道隊列中的所有項均被處理洞斯。阻塞將持續(xù)到位隊列中的每個項均調(diào)用q.task_done()方法為止 |
【示例】Queue的方法使用
from multiprocessing import Queue
q=Queue(3)
q.put('消息1')
q.put('消息2')
print('消息隊列是否已滿:',q.full())
q.put('消息3')
print('消息隊列是否已滿:',q.full())
# q.put('消息4')因為消息隊列已滿毡庆,需要直接寫入需要等待坑赡,如果超時會拋出異常烙如,
# 所以寫入時候需判斷,消息隊列是否已滿
if not q.full():
q.put('消息4')
#同理讀取消息時毅否,先判斷消息隊列是否為空亚铁,再讀取
# qsize在MacOS上無法運行
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
#return self._maxsize – # self._sem._semlock._get_value()
# 所以 不能這樣用
#for i in range(q.qsize()):
# print(q.get())
while(not q.empty()):
print(q.get())
【示例】Queue隊列實現(xiàn)進程間通信
from multiprocessing import *
import time
def write(q):
#將列表中的元素寫入隊列中
for i in ["a","b","c"]:
print('開始寫入值%s' % i)
q.put(i)
time.sleep(1)
#讀取
def read(q):
print('開始讀取')
while True:
if not q.empty():
print('讀取到:',q.get())
time.sleep(1)
else:
break
if __name__=='__main__':
#創(chuàng)建隊列
q=Queue()
#創(chuàng)建寫入進程
pw=Process(target=write,args=(q,))
pr=Process(target=read,args=(q,))
#啟動進程
pw.start()
pw.join()
pr.start() # 注意:測試一下將這個放到pw.join()之前
pr.join()
生產(chǎn)者消費者模型
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)和消費的工作能力來提高程序的整體處理數(shù)據(jù)的速度螟加。
- 為什么要使用生產(chǎn)者和消費者模式
在并發(fā)世界里徘溢,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的進(線)程,消費者就是消費數(shù)據(jù)的進(線)程捆探。在多進(線)程開發(fā)當中然爆,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢黍图,那么生產(chǎn)者就必須等待消費者處理完曾雕,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理助被,如果消費者的處理能力大于生產(chǎn)者剖张,那么消費者就必須等待生產(chǎn)者切诀。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。
- 什么是生產(chǎn)者消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題搔弄。生產(chǎn)者和消費者彼此之間不直接通訊幅虑,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理顾犹,直接扔給阻塞隊列倒庵,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取炫刷,阻塞隊列就相當于一個緩沖區(qū)哄芜,平衡了生產(chǎn)者和消費者的處理能力。
Pool池里面的進程間通信
如果使用Pool創(chuàng)建進程柬唯,就需要使multiprocessing.Manager()中的Queue()來完成進程間的通信认臊,而不是multiprocessing.Queue(),否則會拋出如下異常锄奢。
【示例】進程池創(chuàng)建進程完成進程之間的通信
from multiprocessing import Manager,Pool
import time
def write(q):
#將列表中的元素寫入隊列中
for i in ["a","b","c"]:
print('開始寫入值%s' % i)
q.put(i)
time.sleep(1)
#讀取
def read(q):
print('開始讀取')
while True:
if not q.empty():
print('讀取到:',q.get())
time.sleep(1)
else:
break
if __name__=='__main__':
#創(chuàng)建隊列
q=Manager().Queue()
#創(chuàng)建進程池
p=Pool(3)
#使用阻塞模式創(chuàng)建進程
p.apply(write,(q,))
p.apply(read,(q,)) # 如果使用異步失晴,需要修改read保證讀取
p.close() # 不要忘了關(guān)閉
p.join()
小結(jié)
明天開始學習線程