線程安全的實現(xiàn)
線程之間共享數(shù)據(jù)要注意數(shù)據(jù)是否是線程安全的,使用鎖或者Queue腾务。內(nèi)置數(shù)據(jù)類型包括list毕骡、dict等等是線程安全的(對其的操作是原子性的),但是在訪問時由于可能別的線程對其進行了操作導(dǎo)致得不到預(yù)期的結(jié)果岩瘦。
線程1:
size=1
print(size)
線程2:
size=2
print(size)
最后打印的size結(jié)果不一定是該線程內(nèi)所設(shè)置的挺峡,有可能線程2會打印出1。
即子操作是沒有問題的担钮,但是混合操作就不行了。
通過繼承Queue來實現(xiàn)線程安全
使用Queue來解決線程之間的數(shù)據(jù)共享問題尤仍。
可以MyQueue(Queue)箫津,重構(gòu)_init()、_get()宰啦、_put()等方法苏遥,這些方法是線程安全的,即里面可以混合寫和讀等多個操作作為一個原子操作赡模,源碼中所標(biāo)注的那樣:These will only be called with appropriate locks held田炭。
由于_put和_get是保護屬性,按約定應(yīng)該訪問get和put方法漓柑,但Queue源碼中的get和put方法本身包含了對empty和full的阻塞操作教硫,即empty就阻塞等待put中的notify叨吮,full也阻塞等待get中的notify,如果實際操作中不需要這些瞬矩,那么也需要重寫get和put方法來確保代碼不阻塞等待茶鉴。
在Myqueue中可以定義其他數(shù)據(jù)結(jié)構(gòu)和類型,其本意就是在操作時候加鎖景用。但是要注意可能真正線程安全的是_put和_get方法而不是put和get方法涵叮。
自定義dict實現(xiàn)線程安全類型
通過研究Queue源碼發(fā)現(xiàn)可以使用鎖,使其他數(shù)據(jù)結(jié)構(gòu)和操作變成線程安全的伞插。
class DictWithLock():
def __init__(self):
self.mutex = threading.Lock()
self.__dic = {}
def setdata(self, name, value):
with self.mutex:
self.__dic[name] = value
return self.getdata(name)
def getdata(self, name):
with self.mutex:
if name in self.__dic and self.__dic[name] != None:
return self.__dic[name]
else:
return None
def setdefault(self, key, value):
with self.mutex:
return self.__dic.setdefault(key, value)
def append(self, key, value):
with self.mutex:
return self.__dic.setdefault(key, []).append(value)
def remove(self, key, value):
with self.mutex:
if value in self.__dic[key]:
self.__dic[key].remove(value)
return self.__dic
def __getitem__(self, name):
return self.getdata(name)
def __setitem__(self, name, value):
return self.setdata(name, value)
def __str__(self):
return str(self.__dic)
在使用時割粮,每個實例都擁有一個鎖threading.Lock(),在多線程對一個實例(注意不是一個類的多個實例)操作時會去獲取鎖來進行操作媚污。with的作用是上下文管理器舀瓢,相當(dāng)于獲取資源-》執(zhí)行-》釋放。如果想要實現(xiàn)其他操作杠步,自定義函數(shù)加入 with self.mutex:即可氢伟,在這個語句下的所有操作都是原子性的,其實如果是dict的單操作不加鎖也可以幽歼,因為他本身就是原子性的朵锣,如pop。
死鎖問題
使用鎖的時候甸私,如果一個協(xié)程沒有釋放鎖而切換到其他協(xié)程诚些,后個協(xié)程阻塞等待請求鎖就會造成死鎖問題。如下:
class A:
def __init__(self):
self.a = 1
self.mutex = threading.Lock()
def run(self, index):
while True:
with self.mutex:
self.a += 1
gevent.sleep(0.1)
self.a -= 1
print("{}:{}".format(index, self.a))
a = A()
gevent.spawn(a.run, 0)
gevent.spawn(a.run, 1)
gevent.spawn(a.run, 2)
gevent.spawn(a.run, 3)
這種情況下使用monkey.patch_all()可以避免后一個協(xié)程一直在阻塞請求鎖皇型,但是也只能切換到可以運行的協(xié)程0诬烹,相當(dāng)于單個協(xié)程在運行。如輸出:
0:1
0:1
0:1
...
在while中加入手動切換協(xié)程的gevent.sleep(0.1)弃鸦,則可以切換到其他協(xié)程:
while True:
with self.mutex:
self.a += 1
gevent.sleep(0.1)
self.a -= 1
print("{}:{}".format(index, self.a))
gevent.sleep(0.1)
但輸出是這樣的绞吁,協(xié)程2和3沒有輸出,猜測是因為協(xié)程0在執(zhí)行之后休息0.1s釋放鎖唬格,切換到協(xié)程1家破,協(xié)程1在鎖內(nèi)用了0.1s然后切換時候又切換到了0而不是123,這個應(yīng)該跟系統(tǒng)切換順序有關(guān)购岗。
0:1
1:1
0:1
1:1
0:1
...
將鎖內(nèi)外的時間調(diào)整不一致試試:
def run(self, index):
while True:
with self.mutex:
self.a += 1
gevent.sleep(0.1)
self.a -= 1
# gevent.sleep(0.1)
print("{}:{}".format(index, self.a))
gevent.sleep(1)
這樣程序就可以按照想要的形式輸出了汰聋。
0:1
1:1
2:1
3:1
0:1
...
最后我們來看一下不加鎖的效果:
while True:
# with self.mutex:
self.a += 1
gevent.sleep(0.1)
self.a -= 1
print("{}:{}".format(index, self.a))
gevent.sleep(1)
輸出:
3:4
2:3
1:2
0:1
2:4
0:3
1:2
3:1
0:4
有意思的是如果將兩個sleep都換成0.1,那么可以看到切換就是有順序的了喊积。
3:4
2:3
1:2
0:1
3:4
2:3
1:2
0:1
3:4
系統(tǒng)具體的切換順序可以后續(xù)研究一下烹困,但是實際編碼中代碼的精確執(zhí)行時間并不像sleep直接設(shè)定這樣簡單,所以不建議依賴sleep來確定順序乾吻,更不應(yīng)該在鎖內(nèi)執(zhí)行一些可能切換協(xié)程的操作髓梅。最好的做法就是不在協(xié)程中使用鎖拟蜻,盡量不把協(xié)程和線程混合用,畢竟這兩種都不算真正意義上的并行女淑,有時候協(xié)程的效率會高于線程瞭郑。
只是目前尚不知道如果有IO操作當(dāng)前線程被系統(tǒng)切換再回來之后還是不是當(dāng)前協(xié)程了,就假設(shè)是吧鸭你,畢竟不是當(dāng)前協(xié)程的話屈张,協(xié)程也必須加鎖了。