BFS(廣度優(yōu)先搜索)是圖論中的一個(gè)基礎(chǔ)搜索算法坡贺,對(duì)于下圖,BFS將按照節(jié)點(diǎn)的數(shù)字大小逐一遍歷。
單線程中的實(shí)現(xiàn)
借用隊(duì)列的先進(jìn)先出特性實(shí)現(xiàn)器贩,偽代碼實(shí)現(xiàn)如下,代碼清晰且易于理解朋截。
1 Breadth-First-Search(Graph, root):
2
3 for each node n in Graph:
4 n.distance = INFINITY
5 n.parent = NIL
6
7 create empty queue Q
8
9 root.distance = 0
10 Q.enqueue(root)
11
12 while Q is not empty:
13
14 current = Q.dequeue()
15
16 for each node n that is adjacent to current:
17 if n.distance == INFINITY:
18 n.distance = current.distance + 1
19 n.parent = current
20 Q.enqueue(n)
21
多線程中的實(shí)現(xiàn)
很多搜索場(chǎng)景下蛹稍,比如網(wǎng)絡(luò)爬蟲算法,單線程的BFS無法充分利用現(xiàn)有的多核多線程的優(yōu)勢(shì)部服。
算法導(dǎo)論第三版新增了一章多線程算法的內(nèi)容唆姐,主要描述了在多線程環(huán)境下如何實(shí)現(xiàn)某種算法。主要思想可以通過遞歸計(jì)算斐波那契數(shù)列(Fibonacci)來描述廓八,眾所周知的fib遞歸算法如下:
def fib(n):
if n < 2:
return n
a = fib(n - 1)
b = fib(n - 2)
return a + b
通過增加sync和spawn原語奉芦,可以將其轉(zhuǎn)為多線程算法。spawn的含義代表在新線程中運(yùn)行剧蹂,sync的含義代表所有線程的匯聚声功,即所有線程都運(yùn)行至此才繼續(xù)執(zhí)行下一行代碼。
將上述fib轉(zhuǎn)為多線程算法是這樣的:
def fib(n):
if n < 2:
return n
a = spawn fib(n - 1)
b = fib(n - 2)
sync
return a + b
基于這個(gè)理論宠叼,需要首先找到哪些可并發(fā)先巴,哪些需要匯聚,考慮下面這種情況:
B和C,D伸蚯、E和E是分別滿足并發(fā)條件的醋闭,但B和C并發(fā)后的節(jié)點(diǎn)E同時(shí)屬于二者的后繼,如果多個(gè)線程同時(shí)訪問時(shí)朝卒,會(huì)將E兩次推送至造成錯(cuò)誤证逻,并有可能使得多個(gè)線程同時(shí)取得對(duì)E的訪問。
在既保證遍歷是按照廣度優(yōu)先抗斤,而又不至于發(fā)生多次訪問的錯(cuò)誤的情況下囚企,可以采用一種基于有限約束的并發(fā)算法,即只允許當(dāng)前節(jié)點(diǎn)的所有子節(jié)點(diǎn)并發(fā)瑞眼。這種約束反饋在代碼實(shí)現(xiàn)上:
1 Breadth-First-Search(Graph, root):
2-11 ... ...
12 while Q is not empty:
13
14 current = Q.dequeue()
15
16 for spaw each node n that is adjacent to current:
17 if n.distance == INFINITY:
18 n.distance = current.distance + 1
19 n.parent = current
20 Q.enqueue(n)
21 sync
注意上面for循環(huán)中的spaw龙宏,這里指的是對(duì)每個(gè)節(jié)點(diǎn)的訪問都將在單獨(dú)的線程中進(jìn)行,僅當(dāng)所有節(jié)點(diǎn)完成遍歷后進(jìn)行sync伤疙。這樣银酗,所有的并發(fā)既不會(huì)亂序,也不會(huì)帶來訪問沖突徒像。
Actor模型實(shí)現(xiàn)spawn和sync
什么是Actor
七周七并發(fā)一書中詳細(xì)介紹了Actor模型黍特。首先它是一個(gè)通用并發(fā)編程模型,也被Erlang借鑒锯蛀。其次灭衷,它封裝了狀態(tài)并通過消息與其他actor通信,可以適應(yīng)共享內(nèi)存架構(gòu)和分布式內(nèi)存架構(gòu)旁涤,而且有很好的容錯(cuò)性翔曲。簡(jiǎn)單來講,Actor就是一個(gè)可以接收和發(fā)送消息的異步執(zhí)行體劈愚。
使用python來模擬Actor
鑒于Actor是一個(gè)可收發(fā)消息的執(zhí)行體瞳遍,我們需要2個(gè)python的概念與其對(duì)應(yīng):thread 和 queue。 thread是一個(gè)執(zhí)行體菌羽,而Queue恰好是一個(gè)消息存儲(chǔ)體掠械。
class Actor(Thread):
def __init__(s):
s.queue = Queue()
Thread.__init__(s)
def send(s, obj):
s.queue.put(obj)
def recv(s):
return s.queue.get()
def work_done(s):
s.queue.task_done()
def sync(s):
s.queue.join()
def has_more_work(s):
return not s.queue.empty()
def work():
pass
def run(s):
s.work()
Python內(nèi)置類Queue的join方法說明:
Queue.join()
Blocks until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
配合task_done(),恰好可以模擬前文提及的sync方法算凿,比如這樣同步所有工作線程:
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
多線程BFS實(shí)現(xiàn)
有了可以sync的Actor份蝴,那么多線程的BFS就可以如下方式實(shí)現(xiàn)了。
首先將遍歷節(jié)點(diǎn)的動(dòng)作封裝在Actor中氓轰,比如稱之為Vistor婚夫,它是一個(gè)Actor:
class Visitor(Actor):
def __init__(s, monitor, vid = 0):
s.monitor = monitor
s.sid = sid
Actor.__init__(s)
def work(s):
while True:
v = s.recv()
nexts = visit(v)
s.send(nexts)
s.work_done()
對(duì)于發(fā)起B(yǎng)FS的線程,我們稱之為monitor線程署鸡,當(dāng)然案糙,它也是一個(gè)Actor:
class Monitor(Actor):
def __init__(s, n_visitors, start_vertex = [], depth = 8):
Actor.__init__(s)
s.n_visitors = n_visitors
s.depth = depth
s.choice = 0
s.visit_history = set()
for v in start_vertex:
s.send(v)
s.visitors = [Visitor(s, x) for x in range(n_visitors)]
for sp in s.n_visitors:
sp.start()
def wait_all(s):
for sp in s.n_visitors:
sp.sync()
def dispatch(s,v):
if v not in s.visit_history:
s.n_visitors[s.choice % s.n_visitors].send(v)
s.choice += 1
s.visit_history.add(v)
def work(s):
while s.has_more_work():
for v in chain.from_iterable(s.recv_all()):
if has_visit_depth == s.depth:
break
s.dispatch(v)
s.wait_all()
s.done_work()
至此限嫌,我們就實(shí)現(xiàn)了這種基于有限約束的BFS了。