Gevent高并發(fā)網(wǎng)絡(luò)庫(kù)精解

進(jìn)程 線程 協(xié)程 異步

并發(fā)編程(不是并行)目前有四種方式:多進(jìn)程扔役、多線程摇天、協(xié)程和異步酝锅。

  • 多進(jìn)程編程在python中有類似C的os.fork,更高層封裝的有multiprocessing標(biāo)準(zhǔn)庫(kù)
  • 多線程編程python中有Thread和threading
  • 異步編程在linux下主+要有三種實(shí)現(xiàn)select巧骚,poll吭产,epoll
  • 協(xié)程在python中通常會(huì)說(shuō)到y(tǒng)ield个从,關(guān)于協(xié)程的庫(kù)主要有g(shù)reenlet,stackless,gevent,eventlet等實(shí)現(xiàn)脉幢。

進(jìn)程

  • 不共享任何狀態(tài)
  • 調(diào)度由操作系統(tǒng)完成
  • 有獨(dú)立的內(nèi)存空間(上下文切換的時(shí)候需要保存棧、cpu寄存器嗦锐、虛擬內(nèi)存嫌松、以及打開(kāi)的相關(guān)句柄等信息,開(kāi)銷大)
  • 通訊主要通過(guò)信號(hào)傳遞的方式來(lái)實(shí)現(xiàn)(實(shí)現(xiàn)方式有多種奕污,信號(hào)量萎羔、管道、事件等碳默,通訊都需要過(guò)內(nèi)核贾陷,效率低)

線程

  • 共享變量(解決了通訊麻煩的問(wèn)題缘眶,但是對(duì)于變量的訪問(wèn)需要加鎖)
  • 調(diào)度由操作系統(tǒng)完成(由于共享內(nèi)存,上下文切換變得高效)
  • 一個(gè)進(jìn)程可以有多個(gè)線程髓废,每個(gè)線程會(huì)共享父進(jìn)程的資源(創(chuàng)建線程開(kāi)銷占用比進(jìn)程小很多巷懈,可創(chuàng)建的數(shù)量也會(huì)很多)
  • 通訊除了可使用進(jìn)程間通訊的方式,還可以通過(guò)共享內(nèi)存的方式進(jìn)行通信(通過(guò)共享內(nèi)存通信比通過(guò)內(nèi)核要快很多)

協(xié)程

  • 調(diào)度完全由用戶控制
  • 一個(gè)線程(進(jìn)程)可以有多個(gè)協(xié)程
  • 每個(gè)線程(進(jìn)程)循環(huán)按照指定的任務(wù)清單順序完成不同的任務(wù)(當(dāng)任務(wù)被堵塞時(shí)瓦哎,執(zhí)行下一個(gè)任務(wù)砸喻;當(dāng)恢復(fù)時(shí),再回來(lái)執(zhí)行這個(gè)任務(wù)蒋譬;任務(wù)間切換只需要保存任務(wù)的上下文割岛,沒(méi)有內(nèi)核的開(kāi)銷,可以不加鎖的訪問(wèn)全局變量)
  • 協(xié)程需要保證是非堵塞的且沒(méi)有相互依賴
  • 協(xié)程基本上不能同步通訊犯助,多采用異步的消息通訊癣漆,效率比較高

總結(jié)

  • 進(jìn)程擁有自己獨(dú)立的堆和棧,既不共享堆剂买,亦不共享?xiàng);菟M(jìn)程由操作系統(tǒng)調(diào)度
  • 線程擁有自己獨(dú)立的棧和共享的堆,共享堆瞬哼,不共享?xiàng);樗粒€程亦由操作系統(tǒng)調(diào)度(標(biāo)準(zhǔn)線程是的)
  • 協(xié)程和線程一樣共享堆,不共享?xiàng)W浚瑓f(xié)程由程序員在協(xié)程的代碼里顯示調(diào)度

聊聊協(xié)程

協(xié)程较性,又稱微線程,纖程结胀。
Python的線程并不是標(biāo)準(zhǔn)線程赞咙,是系統(tǒng)級(jí)進(jìn)程,線程間上下文切換有開(kāi)銷糟港,而且Python在執(zhí)行多線程時(shí)默認(rèn)加了一個(gè)全局解釋器鎖(GIL)攀操,因此Python的多線程其實(shí)是串行的,所以并不能利用多核的優(yōu)勢(shì)秸抚,也就是說(shuō)一個(gè)進(jìn)程內(nèi)的多個(gè)線程只能使用一個(gè)CPU速和。

def coroutine(func):
    def ret():
        f = func()
        f.next()
        return f
    return ret

@coroutine
def consumer():
    print "Wait to getting a task"
    while True:
        n = (yield)
        print "Got %s",n

import time
def producer():
    c = consumer()
    task_id = 0
    while True:
        time.sleep(1)
        print "Send a task to consumer" % task_id
        c.send("task %s" % task_id)

if __name__ == "__main__":
    producer()

結(jié)果:

Wait to getting a task
Send a task 0 to consumer
Got task 0
Send a task 1 to consumer
Got task 1
Send a task 2 to consumer
Got task 2
...

傳統(tǒng)的生產(chǎn)者-消費(fèi)者模型是一個(gè)線程寫消息,一個(gè)線程取消息耸别,通過(guò)鎖機(jī)制控制隊(duì)列和等待健芭,但容易死鎖。
如果改用協(xié)程秀姐,生產(chǎn)者生產(chǎn)消息后,直接通過(guò)yield跳轉(zhuǎn)到消費(fèi)者開(kāi)始執(zhí)行若贮,待消費(fèi)者執(zhí)行完畢后省有,切換回生產(chǎn)者繼續(xù)生產(chǎn)痒留,效率極高。

Gevent

介紹

gevent是基于協(xié)程的Python網(wǎng)絡(luò)庫(kù)蠢沿。特點(diǎn):

  • 基于libev的快速事件循環(huán)(Linux上epoll伸头,F(xiàn)reeBSD上kqueue)。
  • 基于greenlet的輕量級(jí)執(zhí)行單元舷蟀。
  • API的概念和Python標(biāo)準(zhǔn)庫(kù)一致(如事件恤磷,隊(duì)列)。
  • 可以配合socket野宜,ssl模塊使用扫步。
  • 能夠使用標(biāo)準(zhǔn)庫(kù)和第三方模塊創(chuàng)建標(biāo)準(zhǔn)的阻塞套接字(gevent.monkey)。
  • 默認(rèn)通過(guò)線程池進(jìn)行DNS查詢,也可通過(guò)c-are(通過(guò)GEVENT_RESOLVER=ares環(huán)境變量開(kāi)啟)匈子。
  • TCP/UDP/HTTP服務(wù)器
  • 子進(jìn)程支持(通過(guò)gevent.subprocess)
  • 線程池

安裝和依賴

依賴于greenlet library
支持python 2.6+ 河胎、3.3+

核心部分

  • Greenlets
  • 同步和異步執(zhí)行
  • 確定性
  • 創(chuàng)建Greenlets
  • Greenlet狀態(tài)
  • 程序停止
  • 超時(shí)
  • 猴子補(bǔ)丁

Greenlets

gevent中的主要模式, 它是以C擴(kuò)展模塊形式接入Python的輕量級(jí)協(xié)程。 全部運(yùn)行在主程序操作系統(tǒng)進(jìn)程的內(nèi)部虎敦,但它們被程序員協(xié)作式地調(diào)度游岳。

在任何時(shí)刻,只有一個(gè)協(xié)程在運(yùn)行其徙。

區(qū)別于multiprocessing胚迫、threading等提供真正并行構(gòu)造的庫(kù), 這些庫(kù)輪轉(zhuǎn)使用操作系統(tǒng)調(diào)度的進(jìn)程和線程唾那,是真正的并行访锻。

同步和異步執(zhí)行

并發(fā)的核心思想在于,大的任務(wù)可以分解成一系列的子任務(wù)通贞,后者可以被調(diào)度成 同時(shí)執(zhí)行或異步執(zhí)行朗若,而不是一次一個(gè)地或者同步地執(zhí)行。兩個(gè)子任務(wù)之間的 切換也就是上下文切換昌罩。

在gevent里面哭懈,上下文切換是通過(guò)yielding來(lái)完成的.

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

執(zhí)行結(jié)果:

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

代碼執(zhí)行過(guò)程:

[圖片上傳失敗...(image-fd7606-1522313744163)]

網(wǎng)絡(luò)延遲或IO阻塞隱式交出greenlet上下文的執(zhí)行權(quán)。

import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
    print('Started Polling: %s' % tic())
    select.select([], [], [], 1)
    print('Ended Polling: %s' % tic())

def gr2():
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

def gr3():
    print("Hey lets do some stuff while the greenlets poll, %s" % tic())
    gevent.sleep(1)

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])

執(zhí)行結(jié)果:

Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 1.0 seconds
Ended Polling: at 2.0 seconds

同步vs異步

import gevent
import random

def task(pid):
    gevent.sleep(random.randint(0,2)*0.001)
    print('Task %s done' % pid)

def synchronous():
    for i in xrange(5):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in xrange(5)]
    gevent.joinall(threads)

    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()

執(zhí)行結(jié)果:

Synchronous:
Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Asynchronous:
Task 2 done
Task 0 done
Task 1 done
Task 3 done
Task 4 done

確定性

greenlet具有確定性茎用。在相同配置相同輸入的情況下遣总,它們總是會(huì)產(chǎn)生相同的輸出。

import time

def echo(i):
    time.sleep(0.001)
    return i

# Non Deterministic Process Pool
from multiprocessing.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)

# Deterministic Gevent Pool
from gevent.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)

執(zhí)行結(jié)果:

False
True

即使gevent通常帶有確定性轨功,當(dāng)開(kāi)始與如socket或文件等外部服務(wù)交互時(shí)旭斥, 不確定性也可能溜進(jìn)你的程序中。因此盡管gevent線程是一種“確定的并發(fā)”形式古涧, 使用它仍然可能會(huì)遇到像使用POSIX線程或進(jìn)程時(shí)遇到的那些問(wèn)題垂券。

涉及并發(fā)長(zhǎng)期存在的問(wèn)題就是競(jìng)爭(zhēng)條件(race condition)(當(dāng)兩個(gè)并發(fā)線程/進(jìn)程都依賴于某個(gè)共享資源同時(shí)都嘗試去修改它的時(shí)候, 就會(huì)出現(xiàn)競(jìng)爭(zhēng)條件),這會(huì)導(dǎo)致資源修改的結(jié)果狀態(tài)依賴于時(shí)間和執(zhí)行順序羡滑。 這個(gè)問(wèn)題菇爪,會(huì)導(dǎo)致整個(gè)程序行為變得不確定算芯。

解決辦法: 始終避免所有全局的狀態(tài).

創(chuàng)建Greenlets

gevent對(duì)Greenlet初始化提供了一些封裝.

import gevent
from gevent import Greenlet

def foo(message, n):
    gevent.sleep(n)
    print(message)

    thread1 = Greenlet.spawn(foo, "Hello", 1)
    thread2 = gevent.spawn(foo, "I live!", 2)
    thread3 = gevent.spawn(lambda x: (x+1), 2)
    threads = [thread1, thread2, thread3]
    gevent.joinall(threads)

執(zhí)行結(jié)果:

Hello
I live!

除使用基本的Greenlet類之外,你也可以子類化Greenlet類凳宙,重載它的_run方法熙揍。

import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet):

    def __init__(self, message, n):
        Greenlet.__init__(self)
        self.message = message
        self.n = n

    def _run(self):
        print(self.message)
        gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

執(zhí)行結(jié)果:

Hi there!

Greenlet狀態(tài)

greenlet的狀態(tài)通常是一個(gè)依賴于時(shí)間的參數(shù):

  • started -- Boolean, 指示此Greenlet是否已經(jīng)啟動(dòng)
  • ready() -- Boolean, 指示此Greenlet是否已經(jīng)停止
  • successful() -- Boolean, 指示此Greenlet是否已經(jīng)停止而且沒(méi)拋異常
  • value -- 任意值, 此Greenlet代碼返回的值
  • exception -- 異常, 此Greenlet內(nèi)拋出的未捕獲異常

程序停止

程序
當(dāng)主程序(main program)收到一個(gè)SIGQUIT信號(hào)時(shí),不能成功做yield操作的 Greenlet可能會(huì)令意外地掛起程序的執(zhí)行氏涩。這導(dǎo)致了所謂的僵尸進(jìn)程届囚, 它需要在Python解釋器之外被kill掉。

通用的處理模式就是在主程序中監(jiān)聽(tīng)SIGQUIT信號(hào)是尖,調(diào)用gevent.shutdown退出程序意系。

import gevent
import signal

def run_forever():
    gevent.sleep(1000)

    if __name__ == '__main__':
        gevent.signal(signal.SIGQUIT, gevent.shutdown)
        thread = gevent.spawn(run_forever)
        thread.join()

超時(shí)

通過(guò)超時(shí)可以對(duì)代碼塊兒或一個(gè)Greenlet的運(yùn)行時(shí)間進(jìn)行約束。

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

    try:
        gevent.spawn(wait).join()
    except Timeout:
        print('Could not complete')

超時(shí)類

import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

    class TooLong(Exception):
        pass

    with Timeout(time_to_wait, TooLong):
        gevent.sleep(10)

另外析砸,對(duì)各種Greenlet和數(shù)據(jù)結(jié)構(gòu)相關(guān)的調(diào)用昔字,gevent也提供了超時(shí)參數(shù)。

import gevent
from gevent import Timeout

def wait():
    gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
    thread1.join(timeout=timer)
except Timeout:
    print('Thread 1 timed out')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
    thread2.get(timeout=timer)
except Timeout:
    print('Thread 2 timed out')

# --

try:
    gevent.with_timeout(1, wait)
except Timeout:
    print('Thread 3 timed out')

執(zhí)行結(jié)果:

Thread 1 timed out
Thread 2 timed out
Thread 3 timed out

猴子補(bǔ)丁(Monkey patching)

gevent的死角.

import socket
print(socket.socket)

print("After monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)

import select
print(select.select)
monkey.patch_select()
print("After monkey patch")
print(select.select)

執(zhí)行結(jié)果:

class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'

built-in function select
After monkey patch
function select at 0x1924de8

Python的運(yùn)行環(huán)境允許我們?cè)谶\(yùn)行時(shí)修改大部分的對(duì)象首繁,包括模塊作郭,類甚至函數(shù)。 這是個(gè)一般說(shuō)來(lái)令人驚奇的壞主意弦疮,因?yàn)樗鼊?chuàng)造了“隱式的副作用”夹攒,如果出現(xiàn)問(wèn)題 它很多時(shí)候是極難調(diào)試的。雖然如此胁塞,在極端情況下當(dāng)一個(gè)庫(kù)需要修改Python本身 的基礎(chǔ)行為的時(shí)候咏尝,猴子補(bǔ)丁就派上用場(chǎng)了。在這種情況下啸罢,gevent能夠修改標(biāo)準(zhǔn)庫(kù)里面大部分的阻塞式系統(tǒng)調(diào)用编检,包括socket、ssl扰才、threading和 select等模塊允懂,而變?yōu)閰f(xié)作式運(yùn)行。

例如衩匣,Redis的python綁定一般使用常規(guī)的tcp socket來(lái)與redis-server實(shí)例通信蕾总。 通過(guò)簡(jiǎn)單地調(diào)用gevent.monkey.patch_all(),可以使得redis的綁定協(xié)作式的調(diào)度 請(qǐng)求琅捏,與gevent棧的其它部分一起工作生百。

這讓我們可以將一般不能與gevent共同工作的庫(kù)結(jié)合起來(lái),而不用寫哪怕一行代碼柄延。 雖然猴子補(bǔ)丁仍然是邪惡的(evil)蚀浆,但在這種情況下它是“有用的邪惡(useful evil)”。

數(shù)據(jù)結(jié)構(gòu)

  • 事件
  • 隊(duì)列
  • 組和池
  • 鎖和信號(hào)量
  • 線程局部變量
  • 子進(jìn)程
  • Actors

事件

事件(event)是一個(gè)在Greenlet之間異步通信的形式。

import gevent
from gevent.event import Event

evt = Event()

def setter():
    print('A: Hey wait for me, I have to do something')
    gevent.sleep(3)
    print("Ok, I'm done")
    evt.set()

def waiter():
    print("I'll wait for you")
    evt.wait()  # blocking
    print("It's about time")

def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter)
    ])

if __name__ == '__main__': 
    main()

執(zhí)行結(jié)果:

A: Hey wait for me, I have to do something
I'll wait for you
I'll wait for you
I'll wait for you
Ok, I'm done
It's about time
It's about time
It's about time

事件對(duì)象的一個(gè)擴(kuò)展是AsyncResult蜡坊,它允許你在喚醒調(diào)用上附加一個(gè)值杠输。 它有時(shí)也被稱作是future或defered赎败,因?yàn)樗钟幸粋€(gè)指向?qū)?lái)任意時(shí)間可設(shè)置為任何值的引用秕衙。

import gevent
from gevent.event import AsyncResult
a = AsyncResult()

def setter():
    gevent.sleep(3)
    a.set('Hello!')

def waiter():
    print(a.get())

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])

隊(duì)列

隊(duì)列是一個(gè)排序的數(shù)據(jù)集合,它有常見(jiàn)的put / get操作僵刮, 但是它是以在Greenlet之間可以安全操作的方式來(lái)實(shí)現(xiàn)的据忘。

import gevent
from gevent.queue import Queue

tasks = Queue()

def worker(n):
    while not tasks.empty():
        task = tasks.get()
        print('Worker %s got task %s' % (n, task))
        gevent.sleep(0)
    print('Quitting time!')

def boss():
    for i in xrange(1,10):
        tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'nancy'),
])

執(zhí)行結(jié)果:

Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker john got task 5
Worker nancy got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Quitting time!
Quitting time!
Quitting time!

put和get操作都是阻塞的,put_nowait和get_nowait不會(huì)阻塞搞糕, 然而在操作不能完成時(shí)拋出gevent.queue.Empty或gevent.queue.Full異常勇吊。

組和池

組(group)是一個(gè)運(yùn)行中g(shù)reenlet集合,集合中的greenlet像一個(gè)組一樣會(huì)被共同管理和調(diào)度窍仰。 它也兼飾了像Python的multiprocessing庫(kù)那樣的平行調(diào)度器的角色汉规,主要用在在管理異步任務(wù)的時(shí)候進(jìn)行分組。

import gevent
from gevent.pool import Group

def talk(msg):
    for i in xrange(2):
        print(msg)

g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()
group.add(g1)
group.add(g2)
group.join()

group.add(g3)
group.join()

執(zhí)行結(jié)果:

bar
bar
foo
foo
fizz
fizz

池(pool)是一個(gè)為處理數(shù)量變化并且需要限制并發(fā)的greenlet而設(shè)計(jì)的結(jié)構(gòu)驹吮。

import gevent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n):
    print('Size of pool %s' % len(pool))

    pool.map(hello_from, xrange(3))

執(zhí)行結(jié)果:

Size of pool 2
Size of pool 2
Size of pool 1

構(gòu)造一個(gè)socket池的類针史,在各個(gè)socket上輪詢。

from gevent.pool import Pool

class SocketPool(object):

    def __init__(self):
        self.pool = Pool(10)
        self.pool.start()

    def listen(self, socket):
        while True:
            socket.recv()

    def add_handler(self, socket):
        if self.pool.full():
            raise Exception("At maximum pool size")
        else:
            self.pool.spawn(self.listen, socket)

    def shutdown(self):
        self.pool.kill()

鎖和信號(hào)量

信號(hào)量是一個(gè)允許greenlet相互合作碟狞,限制并發(fā)訪問(wèn)或運(yùn)行的低層次的同步原語(yǔ)啄枕。 信號(hào)量有兩個(gè)方法,acquire和release族沃。在信號(hào)量是否已經(jīng)被 acquire或release频祝,和擁有資源的數(shù)量之間不同,被稱為此信號(hào)量的范圍 (the bound of the semaphore)脆淹。如果一個(gè)信號(hào)量的范圍已經(jīng)降低到0常空,它會(huì) 阻塞acquire操作直到另一個(gè)已經(jīng)獲得信號(hào)量的greenlet作出釋放。

from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker1(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)

def worker2(n):
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)

pool = Pool()
pool.map(worker1, xrange(0,2))

執(zhí)行結(jié)果:

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore

鎖(lock)是范圍為1的信號(hào)量盖溺。它向單個(gè)greenlet提供了互斥訪問(wèn)漓糙。 信號(hào)量和鎖常被用來(lái)保證資源只在程序上下文被單次使用。

線程局部變量

Gevent允許程序員指定局部于greenlet上下文的數(shù)據(jù)咐柜。 在內(nèi)部兼蜈,它被實(shí)現(xiàn)為以greenlet的getcurrent()為鍵, 在一個(gè)私有命名空間尋址的全局查找拙友。

import gevent
from gevent.local import local

stash = local()

def f1():
    stash.x = 1
    print(stash.x)

def f2():
    stash.y = 2
    print(stash.y)

    try:
        stash.x
    except AttributeError:
        print("x is not local to f2")

g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])

執(zhí)行結(jié)果:

1
2
x is not local to f2

很多集成了gevent的web框架將HTTP會(huì)話對(duì)象以線程局部變量的方式存儲(chǔ)在gevent內(nèi)为狸。 例如使用Werkzeug實(shí)用庫(kù)和它的proxy對(duì)象,我們可以創(chuàng)建Flask風(fēng)格的請(qǐng)求對(duì)象遗契。

from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager

from gevent.wsgi import WSGIServer

_requests = local()
request = LocalProxy(lambda: _requests.request)

@contextmanager
def sessionmanager(environ):
    _requests.request = Request(environ)
    yield
    _requests.request = None

def logic():
    return "Hello " + request.remote_addr

def application(environ, start_response):
    status = '200 OK'

    with sessionmanager(environ):
        body = logic()

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]
    WSGIServer(('', 8000), application).serve_forever()

子進(jìn)程

從gevent 1.0起辐棒,支持gevent.subprocess,支持協(xié)作式的等待子進(jìn)程。

import gevent
from gevent.subprocess import Popen, PIPE

def cron():
    while True:
        print("cron")
        gevent.sleep(0.2)

g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())

執(zhí)行結(jié)果:

   cron
   cron
   cron
   cron
   cron
   Linux

很多人也想將gevent和multiprocessing一起使用漾根。最明顯的挑戰(zhàn)之一 就是multiprocessing提供的進(jìn)程間通信默認(rèn)不是協(xié)作式的泰涂。由于基于 multiprocessing.Connection的對(duì)象(例如Pipe)暴露了它們下面的 文件描述符(file descriptor),gevent.socket.wait_read和wait_write 可以用來(lái)在直接讀寫之前協(xié)作式的等待ready-to-read/ready-to-write事件辐怕。

import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write

# To Process
a, b = Pipe()

# From Process
c, d = Pipe()

def relay():
    for i in xrange(5):
        msg = b.recv()
        c.send(msg + " in " + str(i))

def put_msg():
    for i in xrange(5):
        wait_write(a.fileno())
        a.send('hi')

def get_msg():
    for i in xrange(5):
        wait_read(d.fileno())
        print(d.recv())

if __name__ == '__main__':
    proc = Process(target=relay)
    proc.start()

    g1 = gevent.spawn(get_msg)
    g2 = gevent.spawn(put_msg)
    gevent.joinall([g1, g2], timeout=1)

執(zhí)行結(jié)果:

hi in 0
hi in 1
hi in 2
hi in 3
hi in 4

然而要注意逼蒙,組合multiprocessing和gevent必定帶來(lái) 依賴于操作系統(tǒng)(os-dependent)的缺陷,其中有:

在兼容POSIX的系統(tǒng)創(chuàng)建子進(jìn)程(forking)之后寄疏, 在子進(jìn)程的gevent的狀態(tài)是不適定的(ill-posed)是牢。一個(gè)副作用就是, multiprocessing.Process創(chuàng)建之前的greenlet創(chuàng)建動(dòng)作陕截,會(huì)在父進(jìn)程和子進(jìn)程兩方都運(yùn)行驳棱。

上例的put_msg()中的a.send()可能依然非協(xié)作式地阻塞調(diào)用的線程:一個(gè) ready-to-write事件只保證寫了一個(gè)byte。在嘗試寫完成之前底下的buffer可能是滿的农曲。

上面表示的基于wait_write()/wait_read()的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported))社搅,因?yàn)閃indows不能監(jiān)視 pipe事件。

Python包gipc以大體上透明的方式在 兼容POSIX系統(tǒng)和Windows上克服了這些挑戰(zhàn)乳规。它提供了gevent感知的基于 multiprocessing.Process的子進(jìn)程和gevent基于pipe的協(xié)作式進(jìn)程間通信形葬。

Actors

actor模型是一個(gè)由于Erlang變得普及的更高層的并發(fā)模型。 簡(jiǎn)單的說(shuō)它的主要思想就是許多個(gè)獨(dú)立的Actor驯妄,每個(gè)Actor有一個(gè)可以從 其它Actor接收消息的收件箱荷并。Actor內(nèi)部的主循環(huán)遍歷它收到的消息,并根據(jù)它期望的行為來(lái)采取行動(dòng)青扔。

Gevent沒(méi)有原生的Actor類型源织,但在一個(gè)子類化的Greenlet內(nèi)使用隊(duì)列, 我們可以定義一個(gè)非常簡(jiǎn)單的微猖。

import gevent
from gevent.queue import Queue

class Actor(gevent.Greenlet):

    def __init__(self):
        self.inbox = Queue()
        Greenlet.__init__(self)

    def receive(self, message):
        """
        Define in your subclass.
        """
        raise NotImplemented()

    def _run(self):
        self.running = True

        while self.running:
            message = self.inbox.get()
            self.receive(message)

下面是一個(gè)使用的例子:

import gevent
from gevent.queue import Queue
from gevent import Greenlet

class Pinger(Actor):
    def receive(self, message):
        print(message)
        pong.inbox.put('ping')
        gevent.sleep(0)

class Ponger(Actor):
    def receive(self, message):
        print(message)
        ping.inbox.put('pong')
        gevent.sleep(0)

ping = Pinger()
pong = Ponger()

ping.start()
pong.start()

ping.inbox.put('start')
gevent.joinall([ping, pong])

實(shí)際應(yīng)用

  • Gevent ZeroMQ
  • 簡(jiǎn)單server
  • WSGI Servers
  • 流式server
  • Long Polling
  • Websockets

簡(jiǎn)單server

# On Unix: Access with ``$ nc 127.0.0.1 5000``
# On Window: Access with ``$ telnet 127.0.0.1 5000``

from gevent.server import StreamServer

def handle(socket, address):
    socket.send("Hello from a telnet!\n")
    for i in range(5):
        socket.send(str(i) + '\n')
    socket.close()

server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()

WSGI Servers And Websockets

Gevent為HTTP內(nèi)容服務(wù)提供了兩種WSGI server谈息。從今以后就稱為 wsgi和pywsgi:

  • gevent.wsgi.WSGIServer
  • gevent.pywsgi.WSGIServer

glb中使用

import click
from flask import Flask
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler

import v1
from .settings import Config
from .sockethandler import handle_websocket

def create_app(config=None):
    app = Flask(__name__, static_folder='static')
    if config:
        app.config.update(config)
    else:
        app.config.from_object(Config)

    app.register_blueprint(
        v1.bp,
        url_prefix='/v1')
    return app

def wsgi_app(environ, start_response):
    path = environ['PATH_INFO']
    if path == '/websocket':
        handle_websocket(environ['wsgi.websocket'])
    else:
        return create_app()(environ, start_response)

@click.command()
@click.option('-h', '--host_port', type=(unicode, int),
    default=('0.0.0.0', 5000), help='Host and port of server.')
@click.option('-r', '--redis', type=(unicode, int, int),
    default=('127.0.0.1', 6379, 0),
    help='Redis url of server.')
@click.option('-p', '--port_range', type=(int, int),
    default=(50000, 61000),
    help='Port range to be assigned.')
def manage(host_port, redis=None, port_range=None):
    Config.REDIS_URL = 'redis://%s:%s/%s' % redis
    Config.PORT_RANGE = port_range
    http_server = WSGIServer(host_port,
                             wsgi_app, handler_class=WebSocketHandler)
    print '----GLB Server run at %s:%s-----' % host_port
    print '----Redis Server run at %s:%s:%s-----' % redis
    http_server.serve_forever()

缺陷

和其他異步I/O框架一樣,gevent也有一些缺陷:

  • 阻塞(真正的阻塞,在內(nèi)核級(jí)別)在程序中的某個(gè)地方停止了所有的東西.這很像C代碼中monkey patch沒(méi)有生效
  • 保持CPU處于繁忙狀態(tài).greenlet不是搶占式的,這可能導(dǎo)致其他greenlet不會(huì)被調(diào)度.
  • 在greenlet之間存在死鎖的可能.

一個(gè)gevent回避的缺陷是,你幾乎不會(huì)碰到一個(gè)和異步無(wú)關(guān)的Python庫(kù)--它將阻塞你的應(yīng)用程序,因?yàn)榧働ython庫(kù)使用的是monkey patch的stdlib.

轉(zhuǎn)自:
http://www.reibang.com/p/c6053a4c3dd5

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市凛剥,隨后出現(xiàn)的幾起案子侠仇,更是在濱河造成了極大的恐慌,老刑警劉巖犁珠,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件逻炊,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡犁享,警方通過(guò)查閱死者的電腦和手機(jī)余素,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)炊昆,“玉大人桨吊,你說(shuō)我怎么就攤上這事威根。” “怎么了视乐?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵洛搀,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我佑淀,道長(zhǎng)留美,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任渣聚,我火速辦了婚禮独榴,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘奕枝。我一直安慰自己,他們只是感情好瓶堕,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布隘道。 她就那樣靜靜地躺著,像睡著了一般郎笆。 火紅的嫁衣襯著肌膚如雪谭梗。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,784評(píng)論 1 290
  • 那天宛蚓,我揣著相機(jī)與錄音激捏,去河邊找鬼。 笑死凄吏,一個(gè)胖子當(dāng)著我的面吹牛远舅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播痕钢,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼图柏,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了任连?” 一聲冷哼從身側(cè)響起蚤吹,我...
    開(kāi)封第一講書(shū)人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎随抠,沒(méi)想到半個(gè)月后裁着,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體歉嗓,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡玛歌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鹉究。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片椭懊。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡诸蚕,死狀恐怖步势,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情背犯,我是刑警寧澤坏瘩,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站漠魏,受9級(jí)特大地震影響倔矾,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜柱锹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一哪自、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧禁熏,春花似錦壤巷、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至宙彪,卻和暖如春矩动,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背释漆。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工悲没, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人男图。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓示姿,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親享言。 傳聞我的和親對(duì)象是個(gè)殘疾皇子峻凫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容