1. Stackless Python簡介?
??
Stackless Python是CPython的增強(qiáng)版本,它實(shí)現(xiàn)了一個(gè)合作式多任務(wù)系統(tǒng)。和搶占式多任?
務(wù)系統(tǒng)不同,Stackless中的每個(gè)任務(wù)(tasklet)必須主動(dòng)放棄對處理器的控制,才能實(shí)現(xiàn)?
多任務(wù)。?
??
嚴(yán)格地說灌具,合作式多任務(wù)系統(tǒng)都可以通過簡單的循環(huán)實(shí)現(xiàn)。但是使用Stackless可以讓代碼?
更清楚譬巫,同時(shí)也讓程序員不必考慮復(fù)雜的同步控制咖楣,因?yàn)閷?shí)際上實(shí)現(xiàn)合作式多任務(wù)系統(tǒng)的循?
環(huán)并不簡單。另一方面芦昔,這種合作式多任務(wù)系統(tǒng)的資源消耗比傳統(tǒng)線程小得多诱贿,加上?
Python有偉大的GIL,所以Stackless的效率還是很不錯(cuò)的咕缎。?
??
Stackless Python的主頁是
http://www.stackless.com
珠十,上面可以下載源代碼和預(yù)編譯的版?
本。同時(shí)也有不少文檔——多是介紹性質(zhì)的凭豪,缺少手冊焙蹭。幸好,使用Stackless Python不需?
要厚厚的手冊嫂伞,看了主頁上的入門加上help()就足夠了孔厉。Python的self-documenting就是好?
!下面有關(guān)Stackless的介紹可能尚不敷用帖努,所以如果對Stackless Python感興趣最好還是?
啃啃主頁上的英文撰豺。?
??
1.1. tasklet & run?
??
Stackless中一個(gè)任務(wù)被稱作一個(gè)tasklet,可以這樣創(chuàng)建一個(gè)tasklet:?
stackless.tasklet(a_callable)(args)拼余。之后處理這個(gè)任務(wù)就是運(yùn)行a_callable(args)?
??
創(chuàng)建了一個(gè)或多個(gè)tasklet之后污桦,可以用stackless.run()來運(yùn)行這些任務(wù)。實(shí)際上可以認(rèn)為?
run()是進(jìn)入了多任務(wù)系統(tǒng)匙监,如果還有可運(yùn)行的任務(wù)(沒有運(yùn)行完或者阻塞在某個(gè)channel上?
)凡橱,那么run()就會(huì)持續(xù)運(yùn)行小作。?
??
1.2. schedule?
??
當(dāng)在一個(gè)tasklet中運(yùn)行stackless.schedule(),當(dāng)前的任務(wù)就被掛起梭纹,并放在運(yùn)行循環(huán)的?
最后躲惰。下面一個(gè)程序是一個(gè)死循環(huán):?
??
import stackless?
def idle():?
???? while True:?
???????? print 'idle'?
???????? stackless.schedule()?
stackless.tasklet(idle)()?
stackless.run()?
??
1.3. channel: send & receive?
??
channel是tasklet之間的同步方式致份,使用stackless.channel()就能建立一個(gè)通信用的?
channel变抽。channel可以傳送任何Python對象。下面假設(shè)ch = stackless.channel()氮块。?
??
ch.receive()返回從ch中收到的對象绍载,如果ch中沒有對象,則該tasklet被阻塞滔蝉。?
??
ch.send(obj)把obj送入ch击儡,如果已經(jīng)有其他tasklet執(zhí)行了ch.receive(),則當(dāng)前tasklet?
停止運(yùn)行并成為最后一個(gè)tasklet蝠引,而阻塞在ch.receive()的tasklet繼續(xù)運(yùn)行阳谍。如果沒有,?
則當(dāng)前tasklet阻塞在ch.send(obj)螃概。?
??
這又是一個(gè)死循環(huán):?
??
import stackless?
ch = stackless.channel()?
def ping():?
???? while True:?
???????? print ch.receive()?
???????? ch.send('ping')?
def pong():?
???? while True:?
???????? print ch.receive()?
???????? ch.send('pong')?
stackless.tasklet(ping)()?
stackless.tasklet(pong)()?
ch.send('start')?
stackless.run()?
??
2. 基于Stackless的多任務(wù):三種行為模式?
??
概念基本上來自Introduction to Concurrent Programming with Stackless Python矫夯,分別?
是:?
??
Daemon——每個(gè)周期都要運(yùn)行。?
class Daemon(object):?
???? def __init__(self):?
???????? stackless.tasklet(self)()?
??
???? def __call__(self):?
???????? while True:?
???????????? self.run()?
???????????? stackless.schedule()?
??
Task——只運(yùn)行一次吊洼。?
class Task(object):?
???? def __init__(self):?
???????? stackless.tasklet(self)()?
??
???? def __call__(self):?
???????? self.run()?
??
Handler——通過channel處理請求训貌,沒有請求就阻塞。如果收到一個(gè)nil請求冒窍,則結(jié)束運(yùn)行递沪。?
class Handler(object):?
???? def __init__(self):?
???????? self.channel = stackless.channel()?
???????? stackless.tasklet(self)()?
??
???? def __call__(self):?
???????? is_alive = True?
???????? while is_alive:?
???????????? message = self.channel.receive()?
???????????? if message:?
???????????????? is_alive = self.run(message)?
???????????? else:?
???????????????? is_alive = False?
??
在游戲中,Daemon實(shí)現(xiàn)一些管理功能综液,每個(gè)Object可能都要有一個(gè)Handler與之對應(yīng)(反正?
大多數(shù)Handler都是阻塞的)款慨,但因?yàn)镠andler模型需要分析收到的message,所以有的時(shí)候?
也可以用Task谬莹。?
??
3. 網(wǎng)絡(luò)連接?
??
網(wǎng)絡(luò)連接當(dāng)然要使用異步socket樱调,我從Python In A Nutshell抄了一個(gè)asyncore+asynchat?
的例子,放在子線程里運(yùn)行asyncore.loop()届良。我沒有用www.stackless.com提供的一個(gè)?
stacklesssocket笆凌,因?yàn)榇蟾趴戳艘幌聅tacklesssocket.py,覺得好像有些問題士葫。?
??
4. PyMud01?
??
代碼:實(shí)現(xiàn)了一個(gè)echo server乞而,利用了Daemon-Handler-Task三級模型。?
network_thread通過conn_manager.queue和主線程通信慢显,?
EchoDaemon給每個(gè)不同的連接建立一個(gè)EchoHandler爪模,?
收到信息時(shí)發(fā)給相應(yīng)的EchoHandler欠啤,EchoHandler建立一個(gè)EchoTask。?
最后由EchoTask完成回顯屋灌。?
??
import stackless?
import asyncore, asynchat, socket?
import threading, Queue?
??
class Monitor(asyncore.dispatcher):?
???? def __init__(self, conn_manager, addr='', port=9000):?
???????? asyncore.dispatcher.__init__(self)?
???????? self.create_socket(socket.AF_INET, socket.SOCK_STREAM)?
???????? self.bind((addr, port))?
???????? self.listen(5)?
???????? self.conn_manager = conn_manager?
??????????
???? def handle_accept(self):?
???????? conn, addr_port = self.accept()?
???????? self.conn_manager.Create(conn, addr_port)?
??
class Connection(asynchat.async_chat):?
???? def __init__(self, conn, addr_port, conn_manager):?
???????? asynchat.async_chat.__init__(self, conn)?
???????? self.set_terminator('\r\n')?
???????? self.addr_port = addr_port?
???????? self.data_pieces = []?
???????? self.conn_manager = conn_manager?
??????????
???? def collect_incoming_data(self, data):?
???????? self.data_pieces.append(data)?
??
???? def found_terminator(self):?
???????? self.conn_manager.Receive(self.addr_port, ''.join(self.data_pieces))?
???????? self.data_pieces = []?
??
???? def handle_close(self):?
???????? self.conn_manager.Close(self.addr_port)?
???????? self.close()?
??????????
class ConnectionManager(object):?
???? def __init__(self):?
???????? self.queue = Queue.Queue()?
???????? self.conn_table = {}?
??????????
???? def Create(self, conn, addr_port):?
???????? self.conn_table[addr_port] = Connection(conn, addr_port, self)?
??
???? def Receive(self, addr_port, data):?
???????? self.queue.put_nowait((addr_port, data))?
??
???? def Send(self, addr_port, data):?
???????? self.conn_table[addr_port].push(data)?
??
???? def Close(self, addr_port):?
???????? self.conn_table[addr_port] = None?
??
???? def IsConnect(self, addr_port):?
???????? if self.conn_table.has_key(addr_port):?
???????????? return bool(self.conn_table[addr_port])?
???????? else:?
???????????? return False?
??????????
conn_manager = ConnectionManager()?
??
class NetworkThread(threading.Thread):?
???? def run(self):?
???????? Monitor(conn_manager)?
???????? asyncore.loop()?
??????
network_thread = NetworkThread()?
network_thread.start()?
??
class Daemon(object):?
???? def __init__(self):?
???????? stackless.tasklet(self)()?
??
???? def __call__(self):?
???????? while True:?
???????????? self.run()?
???????????? stackless.schedule()?
??
class Task(object):?
???? def __init__(self):?
???????? stackless.tasklet(self)()?
??
???? def __call__(self):?
???????? self.run()?
??
class Handler(object):?
???? def __init__(self):?
???????? self.channel = stackless.channel()?
???????? stackless.tasklet(self)()?
??
???? def __call__(self):?
???????? is_alive = True?
???????? while is_alive:?
???????????? message = self.channel.receive()?
???????????? if message:?
???????????????? is_alive = self.run(message)?
???????????? else:?
???????????????? is_alive = False?
??
class EchoTask(Task):?
???? def __init__(self, addr_port, data):?
???????? Task.__init__(self)?
???????? self.addr_port = addr_port?
???????? self.data = data?
??
???? def run(self):?
???????? print 'task:', stackless.getruncount(), self.addr_port, self.data?
???????? conn_manager.Send(self.addr_port, self.data+'\r\n')?
??
class EchoHandler(Handler):?
???? def run(self, message):?
???????? print 'handler:', stackless.getruncount(), message?
???????? EchoTask(message[0], message[1])?
???????? return True?
??????
class EchoDaemon(Daemon):?
???? def __init__(self):?
???????? Daemon.__init__(self)?
???????? self.handler_table = {}?
??????????
???? def run(self):?
???????? for key in self.handler_table.keys():?
???????????? if not conn_manager.IsConnect(key):?
???????????????? self.handler_table[key].channel.send(None)?
???????????????? del self.handler_table[key]?
???????? try:?
???????????? message = conn_manager.queue.get_nowait()?
???????? except Queue.Empty:?
???????????? return?
???????? print 'daemon:', message?
??????????
???????? if not self.handler_table.has_key(message[0]):?
???????????? self.handler_table[message[0]] = EchoHandler()?
???????????? print 'daemon:', len(self.handler_table)?
??
???????? self.handler_table[message[0]].channel.send(message)?
??
EchoDaemon()?
??
stackless.run()?