第四模塊: (1)并發(fā)編程

必備的理論基礎(chǔ)

1.操作系統(tǒng)作用:

  • 隱藏丑陋復(fù)雜的硬件接口寺枉,提供良好的抽象接口。
  • 管理調(diào)度進程瘾蛋,并將多個進程對硬件的競爭變得有序
    2.多道技術(shù):
  • 針對單核哺哼,實現(xiàn)并發(fā)
    • 現(xiàn)在的計算機是一般是多核取董,但是每個核都會用到多道技術(shù)甲葬,有4個cpu就廊勃,運行于cpu1的進程遇到io阻塞懈贺,就會等到io結(jié)束后重行調(diào)度,會被調(diào)度到4個cpu中的任何一個坡垫,由操作系統(tǒng)調(diào)度算法決定梭灿。
  • 空間上的復(fù)用:在內(nèi)存中同時運行多道程序。
  • 時間上復(fù)用:復(fù)用一個cpu時間片
    • 強調(diào):遇到io切冰悠,占用cpu時間過長也切堡妒,核心在于切之前將進程的狀態(tài)保存下來,這樣才能保證下次切回來時溉卓,能基于上次切走的位置繼續(xù)運行桑寨。

并發(fā)編程之多線程

進程理論

進程
正在進行的一個過程或者說是一個任務(wù)。而負(fù)責(zé)任務(wù)則是CPU辨图。

進程與程序的區(qū)別
程序僅僅是一堆代碼吆豹,而進程指的是程序的運行過程熏挎。
同一個程序執(zhí)行兩次养匈,那也是兩個進程。

并發(fā)與并行
無論是并行還是并發(fā)帝璧,在用戶看來都是‘同時’運行的诈闺,不管是進程還是線程,都只是一個任務(wù)而已耸弄,真正干活的是cpu震叮,cpu來做這些任務(wù),而一個cpu同一時刻只能執(zhí)行一個任務(wù)

  1. 并發(fā): 是偽并行哲嘲,即看起來是同時運行竣稽。單個cpu+多道技術(shù)就可以實現(xiàn)并發(fā)。
  2. 并行:同時運行,只有具備多個cpu才能實現(xiàn)并行

進程的創(chuàng)建
對于通用系統(tǒng)挽霉,需要有系統(tǒng)運行過程中創(chuàng)建或撤銷進程的能力疫衩,主要分為4中形式創(chuàng)建新的進程:

  1. 系統(tǒng)初始化
  2. 一個進程在運行過程中開啟了子進程
  3. 用戶的交互式請求鲤拿,而創(chuàng)建以個新進程
  4. 一個批處理作業(yè)的初始化
    無論是哪一種,新進程的創(chuàng)建都是由一個已經(jīng)存在的進程執(zhí)行了一個用于創(chuàng)建進程的系統(tǒng)調(diào)用而創(chuàng)建:
  5. 在UNIX中該系統(tǒng)調(diào)用是:fork,fork會創(chuàng)建一個與父進程一模一樣的副本,二者有相同的存儲映像蓉媳、同樣的環(huán)境字符串和同樣的打開文件;
  6. 在windows中該系統(tǒng)調(diào)用是: CreateProcess,CreateProcess即處理進程的創(chuàng)建,也負(fù)責(zé)把正確的程序裝入新進程屉凯。

關(guān)于創(chuàng)建的子進程灌旧,UNIX和windows:

  1. 相同的是:進程創(chuàng)建后,父進程和子進程有各自不同的地址空間(多道技術(shù)要切物理層面實現(xiàn)進程之間的內(nèi)存隔離), 任何一個進程在其地址空間中的修改都不會影響到另外一個進程。
  2. 不同的是:在unix中,子進程的初始地址空間是父進程的一個副本七咧,子進程和父進程是可以有只讀的共享內(nèi)存區(qū)的裹粤;windows系統(tǒng)來說,從一開始父進程與子進程地址空間就是不同的霉翔。

進程的終止

  1. 正常退出
  2. 出錯退出
  3. 嚴(yán)重錯誤
  4. 被其他進程殺死

進程的層次結(jié)構(gòu)
無論是UNIX還是windows序芦,進程只有一個父進程,不同的是:

  1. 在unix中所有的進程,都是以init進程為根某筐,組成樹形結(jié)構(gòu)蔽莱。父子進程共同組成一個進程組仪糖,這樣,當(dāng)從鍵盤發(fā)出一個信號時,該信號被送給當(dāng)前與鍵盤相關(guān)的進程 組中的所有成員劲室。
  2. 在windows中隧枫,沒有進程層次的概念,所有的進程都是地位相同的,唯一類似于進程層次的韓式,是在創(chuàng)建進程時铛楣,父進程得到一個特別的令牌(稱之為句柄)岸浑,該句柄可以用來控制進程,但是父進程有權(quán)把該句柄傳給其他子進程,這樣就沒有層次了。

進程的狀態(tài)
一個進程有如下三種狀態(tài):

進程的三種狀狀態(tài).png

進程并發(fā)的實現(xiàn)
進程并發(fā)的實現(xiàn)在于,硬件中斷一個正在運行的進程熄赡,把此時進程運行的所有狀態(tài)表寸下來乌助,為此,操作系統(tǒng)維護一張表格,即進程表(process table),每個進程占用一個進程表項(這些表項也稱為進程控制塊)

進程表.png

該表存放了進程狀態(tài)的重要信息:程序計數(shù)器、堆棧指針、內(nèi)存分配狀況、所有打開文件的狀態(tài)雇卷、帳號和調(diào)度信息,以及其他在進程有運行狀態(tài)轉(zhuǎn)為就緒狀態(tài)或阻塞狀態(tài)時春畔,必須保存的信息择份,從而保證該進程在再次啟動時,就像從未被中斷過一樣。

開啟進程的兩種方式

multiprocessing模塊介紹
python中的多線程無法利用多核優(yōu)勢,如果想要充分的使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要時用多進程君珠。
python中提供了multiprocessing舰攒。multiprocessing模塊用來開啟子進程芬骄,并在子進程中執(zhí)行我們定制的任務(wù),該模塊與多線程模塊threading的編程接口類似规丽。multiprocessing模塊的功能眾多: 支持子進程艘狭、通信和共享數(shù)據(jù)蕴轨、執(zhí)行不同形式的同步葫督, 提供了Process顷链、Queue在讶、Pipe、Lock等組件。
需要強調(diào)的一點是:與線程不同, 進程沒有任何共享狀態(tài),進程修改的數(shù)據(jù)边败,改動僅限于該進程內(nèi)登疗。

Process類介紹

Process([group [, target [, name [, args [, kwargs]]]]])智政,由該類實例化得到的對象牙瓢,可用來開啟一個子進程

強調(diào):
1. 需要使用關(guān)鍵字的方式來指定參數(shù)
2. args指定的為傳給target函數(shù)的位置參數(shù)憔足,是一個元組形式,必須有逗號

方法介紹:

p.start():啟動進程,并調(diào)用該子進程中的p.run() 
p.run():進程啟動時運行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實現(xiàn)該方法  

p.terminate():強制終止進程p抬闯,不會進行任何清理操作蒸播,如果p創(chuàng)建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導(dǎo)致死鎖
p.is_alive():如果p仍然運行,返回True

p.join([timeout]):主線程等待p終止(強調(diào):是主線程處于等的狀態(tài),而p是處于運行的狀態(tài))。timeout是可選的超時時間。

屬性介紹:

p.daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺運行的守護進程歧杏,當(dāng)p的父進程終止時懂更,p也隨之終止卓嫂,并且設(shè)定為True后奸腺,p不能創(chuàng)建自己的新進程,必須在p.start()之前設(shè)置

p.name:進程的名稱

p.pid:進程的pid

Process類的使用
注意:在windows中Process()必須放到# if __name__ == '__main__':
創(chuàng)建并開啟子進程的方式一:

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-19 上午7:27
# @File : 01 開啟進程的兩種方式.py
import time
import random
from multiprocessing import Process

def piao(name):
    print('%s piaoing' % name)
    time.sleep(random.randint(1, 5))
    print('%s piao end' % name)

if __name__ == '__main__':
    p1 = Process(target=piao,args=('alex',))    # 必須加逗號
    p2 = Process(target=piao,args=('egon',))
    p3 = Process(target=piao,args=('wupeiqi',))
    p4 = Process(target=piao,args=('yuanhao',))

    # 調(diào)用對象下的方法,啟動四個進程
    p1.start()
    p2.start()
    p3.start()
    p4.start()

    print("主進程")

創(chuàng)建并開啟子進程的方式二:

import time, random
from multiprocessing import Process

class Piao(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s is piaoing' % self.name)
        time.sleep(random.randint(1, 5))
        print('%s piao end' % self.name)

if __name__ == '__main__':
    p1 = Piao('egon')
    p2 = Piao('Alex')
    p3 = Piao('wupeiqi')
    p4 = Piao('yuanhao')

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    print('主進程')

練習(xí)題

  1. 思考開啟進程的方式一和方式二各開啟了幾個進程座慰?
    都有5個進程,一個主進程,四個子進程
  2. 進程之間的內(nèi)存空間是共享的還是隔離的?下述代碼的執(zhí)行結(jié)果是什么熙卡?
from multiprocessing import Process

n=100 #在windows系統(tǒng)中應(yīng)該把全局變量定義在if __name__ == '__main__'之上就可以了

def work():
    global n
    n=0
    print('子進程內(nèi): ',n)


if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主進程內(nèi): ',n)

主進程內(nèi): 100
子進程內(nèi): 0

  1. 基于多進程實現(xiàn)并發(fā)的套接字通信颓鲜?
#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-19 下午8:30
# @File : 服務(wù)端.py

import socket
import os
from multiprocessing import Process

def handle_data(conn, addr):
    print('Pid >>> ', os.getpid())
    print('connect by ', addr)
    while True:
        data = conn.recv(1024)
        if not data: break
        conn.send(data.upper())
    conn.close()

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 8080))
sock.listen(10)
print('Server starting...')

while True:
    conn, addr = sock.accept()
    p = Process(target=handle_data, args=(conn,addr,))
    p.start()

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-19 下午8:30
# @File : 客戶端.py

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sock.connect(('127.0.0.1', 8080))

while True:
    data = input('>>: ').strip()
    if not data: continue
    sock.send(data.encode('utf-8'))
    rt = sock.recv(1024).decode('utf-8')
    print(rt)
  1. 思考每來一個客戶端衣摩,服務(wù)端就開啟一個新的進程來服務(wù)它占婉,這種實現(xiàn)方式有無問題?

這種實現(xiàn)方式會造成占用過多的系統(tǒng)資源。

Process對象的join方法

在主進程運行過程中如果想并發(fā)地執(zhí)行其他的任務(wù),我們可以開啟子進程,此時主進程的任務(wù)與子進程的任務(wù)分兩種情況

情況一:在主進程的任務(wù)與子進程的任務(wù)彼此獨立的情況下谅海,主進程的任務(wù)先執(zhí)行完畢后蝌诡,主進程還需要等待子進程執(zhí)行完畢九杂,然后統(tǒng)一回收資源镀层。

情況二:如果主進程的任務(wù)在執(zhí)行到某一個階段時惶我,需要等待子進程執(zhí)行完畢后才能繼續(xù)執(zhí)行听怕,就需要有一種機制能夠讓主進程檢測子進程是否運行完畢翅睛,在子進程執(zhí)行完畢后才繼續(xù)執(zhí)行檐涝,否則一直在原地阻塞惰爬,這就是join方法的作用

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-19 下午9:53
# @File : 02 Process join.py

from multiprocessing import Process
import time
import os
import random

def task():
    print('%s is piaoing...' % os.getpid())
    time.sleep(random.randint(1,5))
    print('%s piao end' % os.getpid())

if __name__ == '__main__':
    p1 = Process(target=task, args=())

    p1.start()
    p1.join() # 等p1完成后才執(zhí)行后續(xù)代碼
    print('main process...')

有了join巩掺,程序不就是串行了嗎独令?招狸?累榜?

from multiprocessing import Process
import time
import os
import random

def task():
    print('%s is piaoing...' % os.getpid())
    time.sleep(random.randint(1,5))
    print('%s piao end' % os.getpid())

if __name__ == '__main__':
    p1 = Process(target=task, args=())
    p2 = Process(target=task, args=())
    p3 = Process(target=task, args=())
    p4 = Process(target=task, args=())

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    # 使用join方法后并不意味著變成了串行了木人,只是主進程需要等子進程運行完成信柿,
    # 各個子進程還是并行運行
    p1.join()
    p2.join()
    p3.join()
    p4.join()
    print('main process...')

Process對象的其他屬性或方法

from multiprocessing import Process
import time
import random

def task(name):
    print('%s is piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s is piao end' %name)

if __name__ == '__main__':
    p1=Process(target=task,args=('egon',))
    p1.start()

    p1.terminate()#關(guān)閉進程,不會立即關(guān)閉,所以is_alive立刻查看的結(jié)果可能還是存活
    print(p1.is_alive()) #結(jié)果為True

    print('主')
    print(p1.is_alive()) #結(jié)果為False

from multiprocessing import Process
import time
import random

def task(name):
    print('%s is piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s is piao end' %name)

if __name__ == '__main__':
    # p1=Process(target=task,args=('egon',),name='子進程1') #可以用關(guān)鍵參數(shù)來指定進程名
    p1=Process(target=task,args=('egon',)) # name='子進程1') #可以用關(guān)鍵參數(shù)來指定進程名
    p1.start()

    print(p1.name,p1.pid,)

守護進程

主進程創(chuàng)建子進程,然后將該進程設(shè)置成守護自己的進程.

  • 守護進程會在主進程代碼執(zhí)行結(jié)束后就終止
  • 守護進程內(nèi)無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

如果我們有兩個任務(wù)需要并發(fā)執(zhí)行形病,那么開一個主進程和一個子進程分別去執(zhí)行就ok了耍共,如果子進程的任務(wù)在主進程任務(wù)結(jié)束后就沒有存在的必要了银亲,那么該子進程應(yīng)該在開啟前就被設(shè)置成守護進程气嫁。主進程代碼運行結(jié)束简识,守護進程隨即終止

from multiprocessing import Process
import time
import random

def task(name):
    print('%s is piaoing' %name)
    time.sleep(random.randrange(1,3))
    print('%s is piao end' %name)


if __name__ == '__main__':
    p=Process(target=task,args=('egon',))
    p.daemon=True #一定要在p.start()前設(shè)置,設(shè)置p為守護進程,禁止p創(chuàng)建子進程,并且父進程代碼執(zhí)行結(jié)束,p即終止運行
    p.start()
    print('主') #只要終端打印出這一行內(nèi)容轧钓,那么守護進程p也就跟著結(jié)束掉了

練習(xí)題
思考下列代碼的執(zhí)行結(jié)果有可能有哪些情況?為什么?

from multiprocessing import Process

import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    print("main-------")

main------
456
end456

互斥鎖

進程之間數(shù)據(jù)不共享,但是共享同一套文件系統(tǒng),所以訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結(jié)果就是錯亂,如下:

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-20 下午8:03
# @File : 03互斥鎖.py

from multiprocessing import Process,Lock
import os, time

'''
由串行變成并行,避免了資源爭搶桂对,但是犧牲了運行效率
'''
def task(lock):
    lock.acquire()
    print(os.getpid(), 'is running...')
    time.sleep(2)
    print(os.getpid(), ' ending...')
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(3):
        p = Process(target=task, args=(lock,))
        p.start()

模擬搶票練習(xí)
多個進程共享同一文件璧亮,我們可以把文件當(dāng)數(shù)據(jù)庫暑劝,用多個進程模擬多個人執(zhí)行搶票任務(wù)

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-20 下午8:24
# @File : 03 模擬搶票軟件.py

import json, time
from multiprocessing import Process

def search(name):
    dic = json.load(open('db.json'))
    time.sleep(1)
    print('\033[43m %s 查到剩余票數(shù)%s\033[0m' %(name, dic['count']))


def get(name):
    dic = json.load(open('db.json'))
    time.sleep(1)
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)
        json.dump(dic, open('db.json', 'w'))
        print('\033[46m%s購票成功\033[0m' % name)

def task(name):
    search(name)
    get(name)


if __name__ == '__main__':
    for i in range(10):
        name = '<路人%s>' % i
        p = Process(target=task, args=(name,))
        p.start()
 '''
 運行結(jié)果:       
 <路人0> 查到剩余票數(shù)1
 <路人1> 查到剩余票數(shù)1
 <路人3> 查到剩余票數(shù)1
 <路人4> 查到剩余票數(shù)1
 <路人2> 查到剩余票數(shù)1
 <路人5> 查到剩余票數(shù)1
 <路人7> 查到剩余票數(shù)1
 <路人6> 查到剩余票數(shù)1
 <路人8> 查到剩余票數(shù)1
 <路人9> 查到剩余票數(shù)1
<路人0>購票成功
<路人1>購票成功
<路人3>購票成功
<路人4>購票成功
<路人2>購票成功
<路人5>購票成功
<路人7>購票成功
<路人6>購票成功
<路人8>購票成功
<路人9>購票成功
'''

并發(fā)運行猜嘱,效率高,但競爭寫同一文件,數(shù)據(jù)寫入錯亂,只有一張票,賣成功給了10個人

加鎖處理:購票行為由并發(fā)變成了串行听想,犧牲了運行效率,但保證了數(shù)據(jù)安全

import json, time
from multiprocessing import Process,Lock

def search(name):
    dic = json.load(open('db.json'))
    time.sleep(1)
    print('\033[43m %s 查到剩余票數(shù)%s\033[0m' %(name, dic['count']))


def get(name):
    dic = json.load(open('db.json'))
    time.sleep(1)
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)
        json.dump(dic, open('db.json', 'w'))
        print('\033[46m%s購票成功\033[0m' % name)

def task(name, lcok):
    search(name)
    with lock: #相當(dāng)于lock.acquire(),執(zhí)行完自代碼塊自動執(zhí)行l(wèi)ock.release()
        get(name)


if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        name = '<路人%s>' % i
        p = Process(target=task, args=(name,lock))
        p.start()
        
'''
 <路人1> 查到剩余票數(shù)1
 <路人0> 查到剩余票數(shù)1
 <路人2> 查到剩余票數(shù)1
 <路人3> 查到剩余票數(shù)1
 <路人5> 查到剩余票數(shù)1
 <路人4> 查到剩余票數(shù)1
 <路人6> 查到剩余票數(shù)1
 <路人7> 查到剩余票數(shù)1
 <路人9> 查到剩余票數(shù)1
 <路人8> 查到剩余票數(shù)1
<路人1>購票成功
'''

互斥鎖與join
使用join可以將并發(fā)變成串行丈积,互斥鎖的原理也是將并發(fā)變成穿行,那我們直接使用join就可以了啊溺忧,為何還要互斥鎖,說到這里我趕緊試了一下

import json, time
from multiprocessing import Process,Lock

def search(name):
    dic = json.load(open('db.json'))
    time.sleep(1)
    print('\033[43m %s 查到剩余票數(shù)%s\033[0m' %(name, dic['count']))


def get(name):
    dic = json.load(open('db.json'))
    time.sleep(1)
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)
        json.dump(dic, open('db.json', 'w'))
        print('\033[46m%s購票成功\033[0m' % name)

def task(name):
    search(name)
    get(name)


if __name__ == '__main__':
    for i in range(10):
        name = '<路人%s>' % i
        p = Process(target=task, args=(name,))
        p.start()
        p.join()
        
'''
 <路人0> 查到剩余票數(shù)1
<路人0>購票成功
 <路人1> 查到剩余票數(shù)0
 <路人2> 查到剩余票數(shù)0
 <路人3> 查到剩余票數(shù)0
 <路人4> 查到剩余票數(shù)0
 <路人5> 查到剩余票數(shù)0
 <路人6> 查到剩余票數(shù)0
 <路人7> 查到剩余票數(shù)0
 <路人8> 查到剩余票數(shù)0
 <路人9> 查到剩余票數(shù)0
 '''

發(fā)現(xiàn)使用join將并發(fā)改成穿行锻煌,確實能保證數(shù)據(jù)安全告希,但問題是連查票操作也變成只能一個一個人去查了桥言,很明顯大家查票時應(yīng)該是并發(fā)地去查詢而無需考慮數(shù)據(jù)準(zhǔn)確與否,此時join與互斥鎖的區(qū)別就顯而易見了筷狼,join是將一個任務(wù)整體串行,而互斥鎖的好處則是可以將一個任務(wù)中的某一段代碼串行,比如只讓task函數(shù)中的get任務(wù)串行

def task(name,):
    search(name) # 并發(fā)執(zhí)行

    lock.acquire()
    get(name) #串行執(zhí)行
    lock.release()

總結(jié)
加鎖可以保證多個進程修改同一塊數(shù)據(jù)時,同一時間只能有一個任務(wù)可以進行修改沫屡,即串行地修改,沒錯身隐,速度是慢了垫释,但犧牲了速度卻保證了數(shù)據(jù)安全烤惊。
雖然可以用文件共享數(shù)據(jù)實現(xiàn)進程間通信,但問題是:

  1. 效率低(共享數(shù)據(jù)基于文件沧踏,而文件是硬盤上的數(shù)據(jù))
  2. 需要自己加鎖處理
    這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道魂角。

隊列和管道都是將數(shù)據(jù)存放于內(nèi)存中捧韵,而隊列又是基于(管道+鎖)實現(xiàn)的,可以讓我們從復(fù)雜的鎖問題中解脫出來汉操,因而隊列才是進程間通信的最佳選擇再来。

我們應(yīng)該盡量避免使用共享數(shù)據(jù),盡可能使用消息傳遞和隊列磷瘤,避免處理復(fù)雜的同步和鎖問題芒篷,而且在進程數(shù)目增多時,往往可以獲得更好的可獲展性采缚。

隊列

隊列介紹
進程彼此之間相互隔離针炉,要實現(xiàn)進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道扳抽,這兩種方式都是使用消息傳遞的篡帕。
創(chuàng)建隊列的類(底層就是以管道和鎖定的方式實現(xiàn)):
Queue([maxsize]): 創(chuàng)建共享的進程隊列,Queue是多進程安全的隊列贸呢,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞赂苗。
參數(shù)介紹:

maxsize是隊列中允許最大項數(shù),省略則無大小限制贮尉。
但是要明確:
1. 隊列存放的是消息而非大數(shù)據(jù)拌滋。
2. 隊列占用的是內(nèi)存空間,因而maxsize即便是無大小限制也受限于內(nèi)存大小猜谚。

主要方法介紹:

q.put方法用以插入數(shù)據(jù)隊列中败砂。
q.get方法可以從隊列讀取并且刪除一個元素赌渣。

隊列的使用

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-20 下午9:26
# @File : 04 隊列.py

from multiprocessing import Process, Queue

q = Queue(3)

# put, get , put_nowait, get_nowait, full, empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) # 滿了
# q.put(4) # 再放就阻塞了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) # 空了

# print(q.get()) 2蹋 再取就阻塞了

生產(chǎn)者與消費者模型

生產(chǎn)者消費者模型介紹

為什么要使用生產(chǎn)者消費者模型
生產(chǎn)者指的是生產(chǎn)數(shù)據(jù)的任務(wù)坚芜,消費者指的是處理數(shù)據(jù)的任務(wù),在并發(fā)編程中斜姥,如果生產(chǎn)者處理速度很快鸿竖,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完铸敏,才能繼續(xù)生產(chǎn)數(shù)據(jù)缚忧。同樣的道理,如果消費者的處理能力大于生產(chǎn)者杈笔,那么消費者就必須等待生產(chǎn)者闪水。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。
什么是生產(chǎn)者和消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題蒙具。生產(chǎn)者和消費者彼此之間不直接通訊球榆,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理禁筏,直接扔給阻塞隊列持钉,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取篱昔,阻塞隊列就相當(dāng)于一個緩沖區(qū)每强,平衡了生產(chǎn)者和消費者的處理能力。

這個阻塞隊列就是用來給生產(chǎn)者和消費者解耦的

基于上一小節(jié)學(xué)習(xí)的隊列來實習(xí)一個生產(chǎn)者消費者模型:

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-20 下午9:37
# @File : 06 生產(chǎn)者消費者模式.py

from multiprocessing import Process, Queue
import time, random, os

def consumer(q, name):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print('\033[043m%s 吃 %s\033[0m' % (name,res))

def producer(q, name, food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res = '%s%s' % (food, i)
        q.put(res)
        print('\033[45m%s 生產(chǎn)了 %s\033[0m' % (name, res))


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q,'fbo', 'baozi ',))

    c1 = Process(target=consumer, args=(q,'alex',))

    p1.start()
    c1.start()
    print('main')

此時的問題是主進程永遠不會結(jié)束旱爆,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了舀射,但是消費者c在取空了q之后窘茁,則一直處于死循環(huán)中且卡在q.get()這一步怀伦。

解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊列中再發(fā)一個結(jié)束信號山林,這樣消費者在接收到結(jié)束信號后就可以break出死循環(huán)

from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('\033[43m%s 吃 %s\033[0m' %(name,res))

def producer(q,name,food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res='%s%s' %(food,i)
        q.put(res)
        print('\033[45m%s 生產(chǎn)了 %s\033[0m' %(name,res))

if __name__ == '__main__':
    q=Queue()
    #生產(chǎn)者們:即廚師們
    p1=Process(target=producer,args=(q,'egon1','包子'))
    p2=Process(target=producer,args=(q,'egon2','骨頭'))
    p3=Process(target=producer,args=(q,'egon3','泔水'))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,'alex1'))
    c2=Process(target=consumer,args=(q,'alex2'))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    q.put(None)
    print('主')

但上述解決方式房待,在有多個生產(chǎn)者和多個消費者時,我們則需要用一個很low的方式去解決,有幾個消費者就需要發(fā)送幾次結(jié)束信號
有另外一種隊列提供了這種機制,JoinableQueue([maxsize])
參數(shù)介紹
maxsize是隊列中允許最大項數(shù)驼抹,省略則無大小限制桑孩。
方法介紹:

JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發(fā)出信號,表示q.get()的返回項目已經(jīng)被處理框冀。如果調(diào)用此方法的次數(shù)大于從隊列中刪除項目的數(shù)量流椒,將引發(fā)ValueError異常
q.join():生產(chǎn)者調(diào)用此方法進行阻塞,直到隊列中所有的項目均被處理明也。阻塞將持續(xù)到隊列中的每個項目均調(diào)用q.task_done()方法為止

基于JoinableQueue實現(xiàn)生產(chǎn)者消費者模型

from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q,name):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[43m%s 吃 %s\033[0m' %(name,res))
        q.task_done() #發(fā)送信號給q.join()宣虾,說明已經(jīng)從隊列中取走一個數(shù)據(jù)并處理完畢了

def producer(q,name,food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res='%s%s' %(food,i)
        q.put(res)
        print('\033[45m%s 生產(chǎn)了 %s\033[0m' %(name,res))
    q.join() #等到消費者把自己放入隊列中的所有的數(shù)據(jù)都取走之后惯裕,生產(chǎn)者才結(jié)束

if __name__ == '__main__':
    q=JoinableQueue() #使用JoinableQueue()

    #生產(chǎn)者們:即廚師們
    p1=Process(target=producer,args=(q,'egon1','包子'))
    p2=Process(target=producer,args=(q,'egon2','骨頭'))
    p3=Process(target=producer,args=(q,'egon3','泔水'))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,'alex1'))
    c2=Process(target=consumer,args=(q,'alex2'))
    c1.daemon=True
    c2.daemon=True

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    #1、主進程等生產(chǎn)者p1绣硝、p2蜻势、p3結(jié)束
    #2、而p1鹉胖、p2握玛、p3是在消費者把所有數(shù)據(jù)都取干凈之后才會結(jié)束
    #3、所以一旦p1甫菠、p2挠铲、p3結(jié)束了,證明消費者也沒必要存在了淑蔚,應(yīng)該隨著主進程一塊死掉市殷,因而需要將生產(chǎn)者們設(shè)置成守護進程
    print('主')

生產(chǎn)者消費者模型總結(jié)

  1. 程序中有兩類角色
  • 一類負(fù)責(zé)生產(chǎn)數(shù)據(jù)(生產(chǎn)者)
  • 一類負(fù)責(zé)處理數(shù)據(jù)(消費者)
  1. 引入生產(chǎn)者消費者模型為了解決的問題是
  • 平衡生產(chǎn)者與消費者之間的速度差
  • 程序解開耦合
  1. 如何實現(xiàn)生產(chǎn)者消費者模型
  • 生產(chǎn)者<--->隊列<--->消費者

并發(fā)編程之多線程

線程理論

什么是線程
在傳統(tǒng)操作系統(tǒng)中,每個進程有一個地址空間刹衫,而且默認(rèn)就有一個控制線程

線程顧名思義醋寝,就是一條流水線工作的過程(流水線的工作需要電源,電源就相當(dāng)于cpu)带迟,而一條流水線必須屬于一個車間音羞,一個車間的工作過程是一個進程,車間負(fù)責(zé)把資源整合到一起仓犬,是一個資源單位嗅绰,而一個車間內(nèi)至少有一條流水線。

所以搀继,進程只是用來把資源集中到一起(進程只是一個資源單位窘面,或者說資源集合),而線程才是cpu上的執(zhí)行單位叽躯。

多線程(即多個控制線程)的概念是财边,在一個進程中存在多個線程,多個線程共享該進程的地址空間点骑,相當(dāng)于一個車間內(nèi)有多條流水線酣难,都共用一個車間的資源。例如黑滴,北京地鐵與上海地鐵是不同的進程憨募,而北京地鐵里的13號線是一個線程,北京地鐵所有的線路共享北京地鐵所有的資源袁辈,比如所有的乘客可以被所有線路拉菜谣。

線程與進程的區(qū)別

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

總結(jié)上述區(qū)別,無非兩個關(guān)鍵點,這也是我們在特定的場景下需要使用多線程的原因:

  1. 同一個進程內(nèi)的多個線程共享該進程內(nèi)的地址資源
  2. 創(chuàng)建線程的開銷要遠小于創(chuàng)建進程的開銷(創(chuàng)建一個進程尾膊,就是創(chuàng)建一個車間甘磨,涉及到申請空間,而且在該空間內(nèi)建至少一條流水線眯停,但創(chuàng)建線程济舆,就只是在一個車間內(nèi)造一條流水線,無需申請空間莺债,所以創(chuàng)建開銷凶叹酢)

開啟線程的兩種方式

threading模塊介紹
multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面齐邦,有很大的相似性椎侠,因而不再詳細(xì)介紹
開啟線程的兩種方式

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-21 下午9:18
# @File : 07 開啟線程的兩種方式.py

from threading import Thread
import time

def sayhi(name):
    time.sleep(2)
    print('%s say hello' % name)

if __name__ == '__main__':
    t = Thread(target=sayhi, args=('fbo',))
    t.start()
    print('main thread')

from threading import Thread
import time

class Sayhi(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)

if __name__ == '__main__':
    t = Sayhi('fbo')
    t.start()
    print('main threading')

練習(xí)題

  1. 基于多線程實現(xiàn)并發(fā)的套接字通信
#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-19 下午8:30
# @File : 服務(wù)端.py

import socket
import os
from threading import Thread

def handle_data(conn, addr):
    print('Pid >>> ', os.getpid())
    print('connect by ', addr)
    while True:
        data = conn.recv(1024)
        if not data: break
        conn.send(data.upper())
    conn.close()

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 8080))
sock.listen(10)
print('Server starting...')

while True:
    conn, addr = sock.accept()
    p = Thread(target=handle_data, args=(conn,addr,))
    p.start()

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-19 下午8:30
# @File : 客戶端.py

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sock.connect(('127.0.0.1', 8080))

while True:
    data = input('>>: ').strip()
    if not data: continue
    sock.send(data.encode('utf-8'))
    rt = sock.recv(1024).decode('utf-8')
    print(rt)
  1. 編寫一個簡單的文本處理工具,具備三個任務(wù)措拇,一個接收用戶輸入我纪,一個將用戶輸入的內(nèi)容格式化成大寫,一個將格式化后的結(jié)果存入文件

from threading import Thread

msg_l = []
format_l = []


def talk():
    while True:
        msg = input(">>>").strip()
        if not msg:
            break
        msg_l.append(msg)


def format_msg():
    while True:
        if msg_l:
            res = msg_l.pop()
            format_l.append(res.upper())


def save():
    while True:
        if format_l:
            with open('db.txt', 'a', encoding='utf-8') as f:
                res = format_l.pop()
                print(res)
                f.write('%s\n' % res)


if __name__ == '__main__':
    t1 = Thread(target=talk)
    t2 = Thread(target=format_msg)
    t3 = Thread(target=save)

    t1.start()
    t2.start()
    t3.start()

多線程與多進程的區(qū)別

幾乎是t.start ()的同時就將線程開啟了丐吓,然后先打印出了hello浅悉,證明線程的創(chuàng)建開銷極小

p.start ()將開啟進程的信號發(fā)給操作系統(tǒng)后,操作系統(tǒng)要申請內(nèi)存空間券犁,讓好拷貝父進程地址空間到子進程术健,開銷遠大于線程

在主進程下開啟多個線程,每個線程都跟主進程的pid一樣

開多個進程,每個進程都有不同的pid

進程之間地址空間是隔離的

同一進程內(nèi)開啟的多個線程是共享該進程地址空間的

Thread對象的其他屬性或方法

介紹

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名粘衬。
  # setName(): 設(shè)置線程名荞估。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當(dāng)前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list稚新。正在運行指線程啟動后勘伺、結(jié)束前,不包括啟動前和終止后的線程褂删。
  # threading.activeCount(): 返回正在運行的線程數(shù)量飞醉,與len(threading.enumerate())有相同的結(jié)果。

驗證

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內(nèi)有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

'''
MainThread
<_MainThread(MainThread, started 140735268892672)>
[<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
主線程/主進程
Thread-1
'''

主線程等待子線程結(jié)束

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())

'''
egon say hello
主線程
False
'''

守護線程

無論是進程還是線程笤妙,都遵循:守護xxx會等待主xxx運行完畢后被銷毀

需要強調(diào)的是:運行完畢并非終止運行

1冒掌、對主進程來說噪裕,運行完畢指的是主進程代碼運行完畢

2蹲盘、對主線程來說,運行完畢指的是主線程所在的進程內(nèi)所有非守護線程統(tǒng)統(tǒng)運行完畢膳音,主線程才算運行完畢

1召衔、主進程在其代碼結(jié)束后就已經(jīng)算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產(chǎn)生僵尸進程),才會結(jié)束祭陷,

2苍凛、主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)趣席。因為主線程的結(jié)束意味著進程的結(jié)束,進程整體的資源都將被回收醇蝴,而進程必須保證非守護線程都運行完畢后才能結(jié)束宣肚。

實例:

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()之前設(shè)置
    t.start()

    print('主線程')
    print(t.is_alive())

'''
主線程
True
'''

練習(xí)
思考下述代碼的執(zhí)行結(jié)果有可能是哪些情況?為什么悠栓?

from threading import Thread
import time

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")

GIL全局解析器鎖

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)
首先需要明確的一點是GIL并不是Python的特性慌申,它是在實現(xiàn)Python解析器(CPython)時所引入的一個概念彤委。就好比C++是一套語言(語法)標(biāo)準(zhǔn),但是可以用不同的編譯器來編譯成可執(zhí)行代碼。

有名的編譯器例如GCC法瑟,INTEL C++,Visual C++等锣笨。Python也一樣登舞,同樣一段代碼可以通過CPython,PyPy凄杯,Psyco等不同的Python執(zhí)行環(huán)境來執(zhí)行错洁。像其中的JPython就沒有GIL。然而因為CPython是大部分環(huán)境下默認(rèn)的Python執(zhí)行環(huán)境戒突。所以在很多人的概念里CPython就是Python墓臭,也就想當(dāng)然的把GIL歸結(jié)為Python語言的缺陷。所以這里要先明確一點:GIL并不是Python的特性妖谴,Python完全可以不依賴于GIL
GIL介紹
GIL本質(zhì)就是一把互斥鎖窿锉,既然是互斥鎖,所有互斥鎖的本質(zhì)都一樣膝舅,都是將并發(fā)運行變成串行嗡载,以此來控制同一時間內(nèi)共享數(shù)據(jù)只能被一個任務(wù)所修改,進而保證數(shù)據(jù)安全仍稀。

可以肯定的一點是:保護不同的數(shù)據(jù)的安全洼滚,就應(yīng)該加不同的鎖。

要想了解GIL技潘,首先確定一點:每次執(zhí)行python程序遥巴,都會產(chǎn)生一個獨立的進程。例如python test.py享幽,python aaa.py铲掐,python bbb.py會產(chǎn)生3個不同的python進程

#test.py內(nèi)容
import os,time
print(os.getpid())
time.sleep(1000)

#打開終端執(zhí)行
python3 test.py

#在windows下查看
tasklist |findstr python

#在linux下下查看
ps aux |grep python

在一個python的進程內(nèi),不僅有test.py的主線程或者由該主線程開啟的其他線程值桩,還有解釋器開啟的垃圾回收等解釋器級別的線程摆霉,總之,所有線程都運行在這一個進程內(nèi),毫無疑問

  1. 所有數(shù)據(jù)都是共享的携栋,這其中搭盾,代碼作為一種數(shù)據(jù)也是被所有線程共享的(test.py的所有代碼以及Cpython解釋器的所有代碼)
    例如:test.py定義一個函數(shù)work(代碼內(nèi)容如下圖),在進程內(nèi)所有線程都能訪問到work的代碼婉支,于是我們可以開啟三個線程然后target都指向該代碼鸯隅,能訪問到意味著就是可以執(zhí)行。
  2. 所有線程的任務(wù)向挖,都需要將任務(wù)的代碼當(dāng)做參數(shù)傳給解釋器的代碼去執(zhí)行滋迈,即所有的線程要想運行自己的任務(wù),首先需要解決的是能夠訪問到解釋器的代碼户誓。

綜上:
如果多個線程的target=work饼灿,那么執(zhí)行流程是:
多個線程先訪問到解釋器的代碼,即拿到執(zhí)行權(quán)限帝美,然后將target的代碼交給解釋器的代碼去執(zhí)行

解釋器的代碼是所有線程共享的碍彭,所以垃圾回收線程也可能訪問到解釋器的代碼而去執(zhí)行,這就導(dǎo)致了一個問題:對于同一個數(shù)據(jù)100悼潭,可能線程1執(zhí)行x=100的同時庇忌,而垃圾回收執(zhí)行的是回收100的操作,解決這種問題沒有什么高明的方法舰褪,就是加鎖處理皆疹,如下圖的GIL,保證python解釋器同一時間只能執(zhí)行一個任務(wù)的代碼

GIL.png

GIL和LOCK
機智的同學(xué)可能會問到這個問題:Python已經(jīng)有一個GIL來保證同一時間只能有一個線程來執(zhí)行了占拍,為什么這里還需要lock?
首先略就,我們需要達成共識:鎖的目的是為了保護共享的數(shù)據(jù),同一時間只能有一個線程來修改共享的數(shù)據(jù)
然后晃酒,我們可以得出結(jié)論:保護不同的數(shù)據(jù)就應(yīng)該加不同的鎖表牢。
最后,問題就很明朗了贝次,GIL 與Lock是兩把鎖崔兴,保護的數(shù)據(jù)不一樣,前者是解釋器級別的(當(dāng)然保護的就是解釋器級別的數(shù)據(jù)蛔翅,比如垃圾回收的數(shù)據(jù))敲茄,后者是保護用戶自己開發(fā)的應(yīng)用程序的數(shù)據(jù),很明顯GIL不負(fù)責(zé)這件事山析,只能用戶自定義加鎖處理堰燎,即Lock,如下圖:

protect-data.png

分析:

  1. 100個線程去搶GIL鎖盖腿,即搶執(zhí)行權(quán)限
  2. 肯定有一個線程先搶到GIL(暫且稱為線程1)爽待,然后開始執(zhí)行,一旦執(zhí)行就會拿到lock.acquire()
  3. 極有可能線程1還未運行完畢翩腐,就有另外一個線程2搶到GIL鸟款,然后開始運行,但線程2發(fā)現(xiàn)互斥鎖lock還未被線程1釋放茂卦,于是阻塞何什,被迫交出執(zhí)行權(quán)限,即釋放GIL
  4. 直到線程1重新?lián)尩紾IL等龙,開始從上次暫停的位置繼續(xù)執(zhí)行处渣,直到正常釋放互斥鎖lock,然后其他的線程再重復(fù)2 3 4的過程
from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結(jié)果肯定為0蛛砰,由原來的并發(fā)執(zhí)行變成串行罐栈,犧牲了執(zhí)行效率保證了數(shù)據(jù)安全,不加鎖則結(jié)果可能為99

多線程用于IO密集型泥畅,如socket荠诬,爬蟲,web
多進程用于計算密集型位仁,如金融分析

死鎖和遞歸鎖

所謂死鎖: 是指兩個或兩個以上的進程或線程在執(zhí)行過程中柑贞,因爭奪資源而造成的一種互相等待的現(xiàn)象,若無外力作用聂抢,它們都將無法推進下去钧嘶。此時稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠在互相等待的進程稱為死鎖進程

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

遞歸鎖
解決方法琳疏,遞歸鎖有决,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock空盼。

這個RLock內(nèi)部維護著一個Lock和一個counter變量疮薇,counter記錄了acquire的次數(shù),從而使得資源可以被多次require我注。直到一個線程所有的acquire都被release按咒,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock但骨,則不會發(fā)生死鎖励七,二者的區(qū)別是:遞歸鎖可以連續(xù)acquire多次,而互斥鎖只能acquire一次

from threading import Thread,RLock
import time

mutexA=mutexB=RLock() #一個線程拿到鎖奔缠,counter加1,該線程內(nèi)又碰到加鎖的情況掠抬,則counter繼續(xù)加1,這期間所有其他線程都只能等待校哎,等待該線程釋放所有鎖两波,即counter遞減到0為止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

信號量瞳步,Event,定時器

信號量
信號量也是一把鎖腰奋,可以指定信號量為5单起,對比互斥鎖同一時間只能有一個任務(wù)搶到鎖去執(zhí)行,信號量同一時間可以有5個任務(wù)拿到鎖去執(zhí)行劣坊,如果說互斥鎖是合租房屋的人去搶一個廁所嘀倒,那么信號量就相當(dāng)于一群路人爭搶公共廁所,公共廁所有多個坑位局冰,這意味著同一時間可以有多個人上公共廁所测蘑,但公共廁所容納的人數(shù)是一定的,這便是信號量的大小

from threading import Thread,Semaphore
import threading
import time

def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()

if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

'''
Semaphore管理一個內(nèi)置的計數(shù)器康二,
每當(dāng)調(diào)用acquire()時內(nèi)置計數(shù)器-1碳胳;
調(diào)用release() 時內(nèi)置計數(shù)器+1;
計數(shù)器不能小于0沫勿;當(dāng)計數(shù)器為0時固逗,acquire()將阻塞線程直到其他線程調(diào)用release()。
'''

Event
線程的一個關(guān)鍵特性是每個線程都是獨立運行且狀態(tài)不可預(yù)測藕帜。如果程序中的其 他線程需要通過判斷某個線程的狀態(tài)來確定自己下一步的操作,這時線程同步問題就會變得非常棘手烫罩。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設(shè)置的信號標(biāo)志,它允許線程等待某些事件的發(fā)生洽故。在 初始情況下,Event對象中的信號標(biāo)志被設(shè)置為假贝攒。如果有線程等待一個Event對象, 而這個Event對象的標(biāo)志為假,那么這個線程將會被一直阻塞直至該標(biāo)志為真。一個線程如果將一個Event對象的信號標(biāo)志設(shè)置為真,它將喚醒所有等待這個Event對象的線程时甚。如果一個線程等待一個已經(jīng)被設(shè)置為真的Event對象,那么它將忽略這個事件, 繼續(xù)執(zhí)行

from threading import Event

event.isSet():返回event的狀態(tài)值隘弊;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設(shè)置event的狀態(tài)值為True荒适,所有阻塞池的線程激活進入就緒狀態(tài)梨熙, 等待操作系統(tǒng)調(diào)度;

event.clear():恢復(fù)event的狀態(tài)值為False刀诬。

例如咽扇,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務(wù)正常才讓那些工作線程去連接MySQL服務(wù)器陕壹,如果連接不成功质欲,都會去嘗試重新連接。那么我們就可以采用threading.Event機制來協(xié)調(diào)各個工作線程的連接操作

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('鏈接超時')
        print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>鏈接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

定時器

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

線程隊列

線程queue
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
有三種不同的用法

  • class queue.Queue(maxsize=0) 先進先出
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())



'''
結(jié)果(先進先出):
first
second
third
'''
  • class queue.LifoQueue(maxsize=0) last in first out
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())



'''
結(jié)果(后進先出):
third
second
first
'''
  • class queue.PriorityQueue(maxsize=0) 優(yōu)先隊列:存儲時可以設(shè)置優(yōu)先級隊列
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優(yōu)先級(通常是數(shù)字,也可以是非數(shù)字之間的比較),數(shù)字越小優(yōu)先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())



'''
結(jié)果(數(shù)字越小優(yōu)先級越高,優(yōu)先級高的優(yōu)先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

進程池與線城池

在剛開始學(xué)多進程或多線程時糠馆,我們迫不及待地基于多進程或多線程實現(xiàn)并發(fā)的套接字通信嘶伟,然而這種實現(xiàn)方式的致命缺陷是:服務(wù)的開啟的進程數(shù)或線程數(shù)都會隨著并發(fā)的客戶端數(shù)目地增多而增多,這會對服務(wù)端主機帶來巨大的壓力又碌,甚至于不堪重負(fù)而癱瘓九昧,于是我們必須對服務(wù)端開啟的進程數(shù)或線程數(shù)加以控制绊袋,讓機器在一個自己可以承受的范圍內(nèi)運行,這就是進程池或線程池的用途铸鹰,例如進程池癌别,就是用來存放進程的池子,本質(zhì)還是基于多進程掉奄,只不過是對開啟進程的數(shù)目加上了限制

介紹:
官網(wǎng):https://docs.python.org/dev/library/concurrent.futures.html
concurrent.futures模塊提供了高度封裝的異步調(diào)用接口
ThreadPoolExecutor:線程池规个,提供異步調(diào)用
ProcessPoolExecutor: 進程池凤薛,提供異步調(diào)用
Both implement the same interface, which is defined by the abstract Executor class.

基本方法:

1姓建、submit(fn, *args, **kwargs)
異步提交任務(wù)

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循環(huán)submit的操作

3缤苫、shutdown(wait=True) 
相當(dāng)于進程池的pool.close()+pool.join()操作
wait=True速兔,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù)
wait=False,立即返回活玲,并不會等待池內(nèi)的任務(wù)執(zhí)行完畢
但不管wait參數(shù)為何值涣狗,整個程序都會等到所有任務(wù)執(zhí)行完畢
submit和map必須在shutdown之前

4、result(timeout=None)
取得結(jié)果

5舒憾、add_done_callback(fn)
回調(diào)函數(shù)

進程池
介紹:
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

用法:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

線程池
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

用法:
把ProcessPoolExecutor換成ThreadPoolExecutor镀钓,其余用法全部相同

map方法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit

回調(diào)函數(shù)
可以為進程池或線程池內(nèi)的每個進程或線程綁定一個函數(shù),該函數(shù)在進程或線程的任務(wù)執(zhí)行完畢后自動觸發(fā)镀迂,并接收任務(wù)的返回值當(dāng)作參數(shù)丁溅,該函數(shù)稱為回調(diào)函數(shù)

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結(jié)果

并發(fā)編程之協(xié)程

協(xié)程介紹

本節(jié)的主題是基于單線程來實現(xiàn)并發(fā)探遵,即只用一個主線程(很明顯可利用的cpu只有一個)情況下實現(xiàn)并發(fā)窟赏,為此我們需要先回顧下并發(fā)的本質(zhì):切換+保存狀態(tài)

cpu正在運行一個任務(wù),會在兩種情況下切走去執(zhí)行其他的任務(wù)(切換由操作系統(tǒng)強制控制)箱季,一種情況是該任務(wù)發(fā)生了阻塞涯穷,另外一種情況是該任務(wù)計算的時間過長或有一個優(yōu)先級更高的程序替代了它

三種執(zhí)行狀態(tài).png

ps:在介紹進程理論時,提及進程的三種執(zhí)行狀態(tài)藏雏,而線程才是執(zhí)行單位拷况,所以也可以將上圖理解為線程的三種狀態(tài)

單純地切換反而會降低運行效率:

#!/usr/bin/env python3
# Author    : fbo
# @Time : 18-5-23 上午5:59
# @File : 10 協(xié)程.py

# 串行執(zhí)行
import time
def consumer(res):
    '''
    任務(wù)1:接收數(shù)據(jù),處理數(shù)據(jù)
    :param res:
    :return:
    '''
    pass

def producer():
    '''任務(wù)2:生產(chǎn)數(shù)據(jù)'''
    res = []
    for i in range(10000000):
        res.append(i)
    return res

start = time.time()
# 串行執(zhí)行
res = producer()
consumer(res) # 寫成consumer(producer())會降低執(zhí)行那個效率
stop = time.time()
print(stop - start)

# 基于yield并發(fā)執(zhí)行
import time

def consumer():
    while True:
        x = yield
        print(x)

def producer():
    g = consumer()
    next(g)
    for i in range(100000):
        g.send(i)

start = time.time()
# 基于yield保存狀態(tài),實現(xiàn)兩個任務(wù)直接來回切換,即并發(fā)的效果
# PS:如果每個任務(wù)中都加上打印,那么明顯地看到兩個任務(wù)的打印
# 是你一次我一次,即并發(fā)執(zhí)行的.
producer()
stop = time.time()
print(stop-start)

第一種情況的切換。在任務(wù)一遇到io情況下掘殴,切到任務(wù)二去執(zhí)行蝠嘉,這樣就可以利用任務(wù)一阻塞的時間完成任務(wù)二的計算,效率的提升就在于此杯巨。

yield并不能實現(xiàn)遇到io切換

import time
def consumer():
    '''任務(wù)1:接收數(shù)據(jù),處理數(shù)據(jù)'''
    while True:
        x=yield

def producer():
    '''任務(wù)2:生產(chǎn)數(shù)據(jù)'''
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)
        time.sleep(2)

start=time.time()
producer() #并發(fā)執(zhí)行,但是任務(wù)producer遇到io就會阻塞住,并不會切到該線程內(nèi)的其他任務(wù)去執(zhí)行

stop=time.time()
print(stop-start)

對于單線程下蚤告,我們不可避免程序中出現(xiàn)io操作,但如果我們能在自己的程序中(即用戶程序級別服爷,而非操作系統(tǒng)級別)控制單線程下的多個任務(wù)能在一個任務(wù)遇到io阻塞時就切換到另外一個任務(wù)去計算杜恰,這樣就保證了該線程能夠最大限度地處于就緒態(tài)获诈,即隨時都可以被cpu執(zhí)行的狀態(tài),相當(dāng)于我們在用戶程序級別將自己的io操作最大限度地隱藏起來心褐,從而可以迷惑操作系統(tǒng)舔涎,讓其看到:該線程好像是一直在計算,io比較少逗爹,從而更多的將cpu的執(zhí)行權(quán)限分配給我們的線程亡嫌。

協(xié)程的本質(zhì)就是在單線程下,由用戶自己控制一個任務(wù)遇到io阻塞了就切換另外一個任務(wù)去執(zhí)行掘而,以此來提升效率挟冠。為了實現(xiàn)它,我們需要找尋一種可以同時滿足以下條件的解決方案:

  1. 可以控制多個任務(wù)之間的切換袍睡,切換之前將任務(wù)的狀態(tài)保存下來知染,以便重新運行時,可以基于暫停的位置繼續(xù)執(zhí)行斑胜。
  2. 作為1的補充:可以檢測io操作控淡,在遇到io操作的情況下才發(fā)生切換

協(xié)程介紹
協(xié)程:是單線程下的并發(fā),又稱微線程止潘,纖程掺炭。英文名Coroutine。一句話說明什么是線程:協(xié)程是一種用戶態(tài)的輕量級線程凭戴,即協(xié)程是由用戶程序自己控制調(diào)度的涧狮。

需要強調(diào)的是:

  1. python的線程屬于內(nèi)核級別的,即由操作系統(tǒng)控制調(diào)度(如單線程遇到io或執(zhí)行時間過長就會被迫交出cpu執(zhí)行權(quán)限簇宽,切換其他線程運行)
  2. 單線程內(nèi)開啟協(xié)程勋篓,一旦遇到io,就會從應(yīng)用程序級別(而非操作系統(tǒng))控制切換魏割,以此來提升效率(F┫!钞它!非io操作的切換與效率無關(guān))

對比操作系統(tǒng)控制線程的切換拜银,用戶在單線程內(nèi)控制協(xié)程的切換,優(yōu)點如下:

  1. 協(xié)程的切換開銷更小遭垛,屬于程序級別的切換尼桶,操作系統(tǒng)完全感知不到,因而更加輕量級
  2. 單線程內(nèi)就可以實現(xiàn)并發(fā)的效果锯仪,最大限度地利用cpu

缺點如下:

  1. 協(xié)程的本質(zhì)是單線程下泵督,無法利用多核,可以是一個程序開啟多個進程庶喜,每個進程內(nèi)開啟多個線程小腊,每個線程內(nèi)開啟協(xié)程
  2. 協(xié)程指的是單個線程救鲤,因而一旦協(xié)程出現(xiàn)阻塞,將會阻塞整個線程

總結(jié)協(xié)程的特點:

  1. 必須只有一個單線程里實現(xiàn)并發(fā)
  2. 修改共享數(shù)據(jù)不需要加鎖
  3. 用戶程序里自己保存多個控制流的上下文棧
  4. 一個協(xié)程里遇到IO操作自動切換到其他協(xié)程(如何實現(xiàn)檢測IO秩冈, yield本缠、greenlet都無法實現(xiàn), 就用到了gevent模塊(select機制))

greenlet模塊

如果我們在單個線程內(nèi)有20個任務(wù)入问,要想實現(xiàn)在多個任務(wù)之間切換丹锹,使用yield生成器的方式過于麻煩(需要先得到初始化一次的生成器,然后再調(diào)用send芬失。楣黍。。非常麻煩)麸折,而使用greenlet模塊可以非常簡單地實現(xiàn)這20個任務(wù)直接的切換

#安裝:pip3 install greenlet
from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)
    g2.switch('egon')
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('egon')#可以在第一次switch時傳入?yún)?shù)锡凝,以后都不需要

單純的切換(在沒有io的情況下或者沒有重復(fù)開辟內(nèi)存空間的操作)粘昨,反而會降低程序的執(zhí)行速度

#順序執(zhí)行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337

#切換
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524

greenlet只是提供了一種比generator更加便捷的切換方式垢啼,當(dāng)切到一個任務(wù)執(zhí)行時如果遇到io,那就原地阻塞张肾,仍然是沒有解決遇到IO自動切換來提升效率的問題芭析。

單線程里的這20個任務(wù)的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執(zhí)行任務(wù)1時遇到阻塞吞瞪,就利用阻塞的時間去執(zhí)行任務(wù)2馁启。。芍秆。惯疙。如此,才能提高效率妖啥,這就用到了Gevent模塊霉颠。

gevent模塊

Gevent 是一個第三方庫,可以輕松通過gevent實現(xiàn)并發(fā)同步或異步編程荆虱,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協(xié)程蒿偎。 Greenlet全部運行在主程序操作系統(tǒng)進程的內(nèi)部,但它們被協(xié)作式地調(diào)度怀读。

用法:

g1=gevent.spawn(func,1,,2,3,x=4,y=5)創(chuàng)建一個協(xié)程對象g1诉位,spawn括號內(nèi)第一個參數(shù)是函數(shù)名,如eat菜枷,后面可以有多個參數(shù)苍糠,可以是位置實參或關(guān)鍵字實參,都是傳給函數(shù)eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結(jié)束

g2.join() #等待g2結(jié)束
#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

遇到IO阻塞時會自動切換任務(wù)

import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('主')

上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,

time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面啤誊,如time岳瞭,socket模塊之前

或者我們干脆記憶成:要用gevent歹袁,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭

練習(xí)
服務(wù)端:

from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)

多線程并發(fā)多個客戶端

from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM) #套接字對象一定要加到函數(shù)內(nèi)劫恒,即局部名稱空間內(nèi)俗孝,放在函數(shù)外則被所有線程共享,則大家公用一個套接字對象怨咪,那么客戶端端口永遠一樣了
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()

IO模型

http://www.cnblogs.com/linhaifeng/articles/7430066.html#_label4

同步(synchronous) IO和異步(asynchronous) IO乏矾,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什么孟抗,到底有什么區(qū)別?這個問題其實不同的人給出的答案都可能不同钻心,比如wiki凄硼,就認(rèn)為asynchronous IO和non-blocking IO是一個東西。這其實是因為不同的人的知識背景不同捷沸,并且在討論這個問題的時候上下文(context)也不相同摊沉。所以,為了更好的回答這個問題痒给,我先限定一下本文的上下文说墨。

本文討論的背景是Linux環(huán)境下的network IO。本文最重要的參考文獻是Richard Stevens的“UNIX? Network Programming Volume 1, Third Edition: The Sockets Networking ”苍柏,6.2節(jié)“I/O Models ”尼斧,Stevens在這節(jié)中詳細(xì)說明了各種IO的特點和區(qū)別,如果英文夠好的話试吁,推薦直接閱讀棺棵。Stevens的文風(fēng)是有名的深入淺出,所以不用擔(dān)心看不懂熄捍。本文中的流程圖也是截取自參考文獻烛恤。

Stevens在文章中一共比較了五種IO Model:

  • blocking IO
  • nonblocking IO
  • IO multiplexing
  • signal driven IO
  • asynchronous IO
    由signal driven IO(信號驅(qū)動IO)在實際中并不常用,所以主要介紹其余四種IO Model余耽。

再說一下IO發(fā)生時涉及的對象和步驟缚柏。對于一個network IO (這里我們以read舉例),它會涉及到兩個系統(tǒng)對象宾添,一個是調(diào)用這個IO的process (or thread)船惨,另一個就是系統(tǒng)內(nèi)核(kernel)。當(dāng)一個read操作發(fā)生時缕陕,該操作會經(jīng)歷兩個階段:

  1. 等待數(shù)據(jù)準(zhǔn)備 (Waiting for the data to be ready)
  2. 將數(shù)據(jù)從內(nèi)核拷貝到進程中(Copying the data from the kernel to the process)

記住這兩點很重要粱锐,因為這些IO模型的區(qū)別就是在兩個階段上各有不同的情況。

阻塞IO (blocking IO)

在linux中扛邑,默認(rèn)情況下所有的socket都是blocking怜浅,一個典型的讀操作流程大概是這樣的:


blocking_io.png

當(dāng)用戶進程調(diào)用了recvfrom這個系統(tǒng)調(diào)用,kernel就開始了IO的第一個階段:準(zhǔn)備數(shù)據(jù)。對于network io來說恶座,很多時候數(shù)據(jù)在一開始還未到達搀暑,這個時候kernel就要等待足夠的數(shù)據(jù)到來;
而在用戶進程這邊跨琳,整個進程會被阻塞自点。當(dāng)kernel一直等到數(shù)據(jù)準(zhǔn)備好,就會將數(shù)據(jù)從kernel中拷貝到用戶內(nèi)存脉让,然后kernel返回結(jié)果桂敛,用戶進程才接錯block的狀態(tài),重新運行起來溅潜。
所以术唬,blocking IO的特點就是在IO執(zhí)行的兩個階段(等待數(shù)據(jù)和拷貝數(shù)據(jù)的兩個階段)都被block了

幾乎所有的程序員第一次接觸到的網(wǎng)絡(luò)編程都是從listen()、send()滚澜、recv() 等接口開始的粗仓,
使用這些接口可以很方便的構(gòu)建服務(wù)器/客戶機的模型。然而大部分的socket接口都是阻塞型的设捐。如下圖


image.png

ps:
所謂阻塞型接口是指系統(tǒng)調(diào)用(一般是IO接口)不返回調(diào)用結(jié)果并讓當(dāng)前線程一直阻塞
只有當(dāng)該系統(tǒng)調(diào)用獲得結(jié)果或者超時出錯時才返回借浊。

實際上,除非特別指定挡育,幾乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的巴碗。這給網(wǎng)絡(luò)編程帶來了一個很大的問題朴爬,如在調(diào)用recv(1024)的同時即寒,線程將被阻塞,在此期間召噩,線程將無法執(zhí)行任何運算或響應(yīng)任何的網(wǎng)絡(luò)請求母赵。

一個簡單的解決方案:
在服務(wù)器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每個連接都擁有獨立的線程(或進程)具滴,
這樣任何一個連接的阻塞都不會影響其他的連接凹嘲。

該方案的問題是:
開啟多進程或都線程的方式,在遇到要同時響應(yīng)成百上千路的連接請求构韵,則無論多線程還是多進程都會嚴(yán)重占據(jù)系統(tǒng)資源周蹭,
降低系統(tǒng)對外界響應(yīng)效率,而且線程與進程本身也更容易進入假死狀態(tài)疲恢。

改進方案:
很多程序員可能會考慮使用“線程池”或“連接池”凶朗。“線程池”旨在減少創(chuàng)建和銷毀線程的頻率显拳,
其維持一定合理數(shù)量的線程棚愤,并讓空閑的線程重新承擔(dān)新的執(zhí)行任務(wù)。“連接池”維持連接的緩存池宛畦,盡量重用已有的連接瘸洛、
減少創(chuàng)建和關(guān)閉連接的頻率。這兩種技術(shù)都可以很好的降低系統(tǒng)開銷次和,都被廣泛應(yīng)用很多大型系統(tǒng)反肋,如websphere、tomcat和各種數(shù)據(jù)庫等踏施。

改進后方案其實也存在著問題:
“線程池”和“連接池”技術(shù)也只是在一定程度上緩解了頻繁調(diào)用IO接口帶來的資源占用囚玫。而且,所謂“池”始終有其上限读规,
當(dāng)請求大大超過上限時抓督,“池”構(gòu)成的系統(tǒng)對外界的響應(yīng)并不比沒有池的時候效果好多少。所以使用“池”必須考慮其面臨的響應(yīng)規(guī)模束亏,
并根據(jù)響應(yīng)規(guī)模調(diào)整“池”的大小铃在。

對應(yīng)上例中的所面臨的可能同時出現(xiàn)的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力碍遍,但是不能解決所有問題定铜。總之怕敬,多線程模型可以方便高效的解決小規(guī)模的服務(wù)請求揣炕,但面對大規(guī)模的服務(wù)請求,多線程模型也會遇到瓶頸东跪,可以用非阻塞接口來嘗試解決這個問題畸陡。

非阻塞IO (non-blocking IO)

Linux下,可以通過設(shè)置socket使其變?yōu)閚on-blocking虽填。當(dāng)對一個non-blocking socket執(zhí)行讀操作時丁恭,流程是這個樣子:

none-blocking.png

從圖中可以看出,當(dāng)用戶進程發(fā)出read操作時斋日,如果kernel中的數(shù)據(jù)還沒有準(zhǔn)備好牲览,那么它并不會block用戶進程,而是立刻返回一個error恶守。從用戶進程角度講 第献,它發(fā)起一個read操作后,并不需要等待兔港,而是馬上就得到了一個結(jié)果庸毫。用戶進程判斷結(jié)果是一個error時,它就知道數(shù)據(jù)還沒有準(zhǔn)備好押框,于是用戶就可以在本次到下次再發(fā)起read詢問的時間間隔內(nèi)做其他事情岔绸,或者直接再次發(fā)送read操作。一旦kernel中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進程的system call盒揉,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存(這一階段仍然是阻塞的)晋被,然后返回。
所以刚盈,在非阻塞式IO中羡洛,用戶進程其實是需要不斷的主動詢問kernel數(shù)據(jù)準(zhǔn)備好了沒有。

示例:

#服務(wù)端
from socket import *

server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8099))
server.listen(5)
server.setblocking(False)


rlist=[]
wlist=[]
while True:
    try:
        conn, addr = server.accept()
        rlist.append(conn)
        print(rlist)

    except BlockingIOError:
        del_rlist=[]
        for sock in rlist:
            try:
                data=sock.recv(1024)
                if not data:
                    del_rlist.append(sock)
                wlist.append((sock,data.upper()))
            except BlockingIOError:
                continue
            except Exception:
                sock.close()
                del_rlist.append(sock)

        del_wlist=[]
        for item in wlist:
            try:
                sock = item[0]
                data = item[1]
                sock.send(data)
                del_wlist.append(item)
            except BlockingIOError:
                pass

        for item in del_wlist:
            wlist.remove(item)


        for sock in del_rlist:
            rlist.remove(sock)

server.close()


#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

但是非阻塞IO模型絕不被推薦藕漱。

我們不能否則其優(yōu)點:能夠在等待任務(wù)完成的時間里干其他活了(包括提交其他任務(wù)欲侮,也就是 “后臺” 可以有多個任務(wù)在“”同時“”執(zhí)行)。
但是也難掩其缺點:

  1. 循環(huán)調(diào)用recv()將大幅度推高CPU占用率肋联;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現(xiàn)卡機情況
  2. 任務(wù)完成的響應(yīng)延遲增大了威蕉,因為每過一段時間才去輪詢一次read操作,而任務(wù)可能在兩次輪詢之間的任意時間完成橄仍。
    這會導(dǎo)致整體數(shù)據(jù)吞吐量的降低韧涨。

此外,在這個方案中recv()更多的是起到檢測“操作是否完成”的作用侮繁,實際操作系統(tǒng)提供了更為高效的檢測“操作是否完成“作用的接口虑粥,例如select()多路復(fù)用模式,可以一次檢測多個連接是否活躍宪哩。

多路復(fù)用IO(IO multiplexing)

IO multiplexing這個詞可能有點陌生娩贷,但是如果我說select/epoll,大概就都能明白了锁孟。有些地方也稱這種IO方式為事件驅(qū)動IO(event driven IO)彬祖。我們都知道,select/epoll的好處就在于單個process就可以同時處理多個網(wǎng)絡(luò)連接的IO罗岖。它的基本原理就是select/epoll這個function會不斷的輪詢所負(fù)責(zé)的所有socket涧至,當(dāng)某個socket有數(shù)據(jù)到達了,就通知用戶進程桑包。它的流程如圖:

image.png

當(dāng)用戶進程調(diào)用了select,那么整個進程會被block纺非,而同時哑了,kernel會“監(jiān)視”所有select負(fù)責(zé)的socket,
當(dāng)任何一個socket中的數(shù)據(jù)準(zhǔn)備好了烧颖,select就會返回弱左。這個時候用戶進程再調(diào)用read操作,將數(shù)據(jù)從kernel拷貝到用戶進程炕淮。
這個圖和blocking IO的圖其實并沒有太大的不同拆火,事實上還更差一些。因為這里需要使用兩個系統(tǒng)調(diào)用(select和recvfrom),
而blocking IO只調(diào)用了一個系統(tǒng)調(diào)用(recvfrom)们镜。但是币叹,用select的優(yōu)勢在于它可以同時處理多個connection。

強調(diào):

  1. 如果處理的連接數(shù)不是很高的話模狭,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好颈抚,可能延遲還更大。select/epoll的優(yōu)勢并不是對于單個連接能處理得更快嚼鹉,而是在于能處理更多的連接贩汉。
  2. 在多路復(fù)用模型中,對于每一個socket锚赤,一般都設(shè)置成為non-blocking匹舞,但是,如上圖所示线脚,整個用戶的process其實是一直被block的策菜。只不過process是被select這個函數(shù)block,而不是被socket IO給block酒贬。

結(jié)論: select的優(yōu)勢在于可以處理多個連接又憨,不適用于單個連接

示例:

#服務(wù)端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
server.setblocking(False)
print('starting...')

rlist=[server,]
wlist=[]
wdata={}

while True:
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print(wl)
    for sock in rl:
        if sock == server:
            conn,addr=sock.accept()
            rlist.append(conn)
        else:
            try:
                data=sock.recv(1024)
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                wlist.append(sock)
                wdata[sock]=data.upper()
            except Exception:
                sock.close()
                rlist.remove(sock)

    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)


#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8081))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

select監(jiān)聽fd變化的過程分析:
用戶進程創(chuàng)建socket對象,拷貝監(jiān)聽的fd到內(nèi)核空間锭吨,每一個fd會對應(yīng)一張系統(tǒng)文件表蠢莺,內(nèi)核空間的fd響應(yīng)到數(shù)據(jù)后,就會發(fā)送信號給用戶進程數(shù)據(jù)已到零如;
用戶進程再發(fā)送系統(tǒng)調(diào)用躏将,比如(accept)將內(nèi)核空間的數(shù)據(jù)copy到用戶空間,同時作為接受數(shù)據(jù)端內(nèi)核空間的數(shù)據(jù)清除考蕾,這樣重新監(jiān)聽時fd再有新的數(shù)據(jù)又可以響應(yīng)到了(發(fā)送端因為基于TCP協(xié)議所以需要收到應(yīng)答后才會清除)祸憋。

該模型的優(yōu)點:
相比其他模型,使用select() 的事件驅(qū)動模型只用單線程(進程)執(zhí)行肖卧,占用資源少蚯窥,不消耗太多 CPU,同時能夠為多客戶端提供服務(wù)塞帐。
如果試圖建立一個簡單的事件驅(qū)動的服務(wù)器程序拦赠,這個模型有一定的參考價值。

該模型的缺點:
首先select()接口并不是實現(xiàn)“事件驅(qū)動”的最好選擇葵姥。因為當(dāng)需要探測的句柄值較大時荷鼠,select()接口本身需要消耗大量時間去輪詢各個句柄。
很多操作系統(tǒng)提供了更為高效的接口榔幸,如linux提供了epoll允乐,BSD提供了kqueue矮嫉,Solaris提供了/dev/poll,…牍疏。
如果需要實現(xiàn)更高效的服務(wù)器程序蠢笋,類似epoll這樣的接口更被推薦。遺憾的是不同的操作系統(tǒng)特供的epoll接口有很大差異麸澜,
所以使用類似于epoll的接口實現(xiàn)具有較好跨平臺能力的服務(wù)器會比較困難挺尿。
其次,該模型將事件探測和事件響應(yīng)夾雜在一起炊邦,一旦事件響應(yīng)的執(zhí)行體龐大编矾,則對整個模型是災(zāi)難性的。

異步IO(Asynchronous I/O)

Linux下的asynchronous IO其實用得不多馁害,從內(nèi)核2.6版本才開始引入窄俏。先看一下它的流程:


image.png

用戶進程發(fā)起read操作之后,立刻就可以開始去做其它的事碘菜。而另一方面凹蜈,從kernel的角度,當(dāng)它受到一個asynchronous read之后忍啸,首先它會立刻返回仰坦,所以不會對用戶進程產(chǎn)生任何block。然后计雌,kernel會等待數(shù)據(jù)準(zhǔn)備完成悄晃,然后將數(shù)據(jù)拷貝到用戶內(nèi)存,當(dāng)這一切都完成之后凿滤,kernel會給用戶進程發(fā)送一個signal妈橄,告訴它read操作完成了。

IO模型分析比較

到目前為止翁脆,已經(jīng)將四個IO Model都介紹完了【祢荆現(xiàn)在回過頭來回答最初的那幾個問題:blocking和non-blocking的區(qū)別在哪,synchronous IO和asynchronous IO的區(qū)別在哪反番。
先回答最簡單的這個:blocking vs non-blocking沙热。前面的介紹中其實已經(jīng)很明確的說明了這兩者的區(qū)別。調(diào)用blocking IO會一直block住對應(yīng)的進程直到操作完成恬口,而non-blocking IO在kernel還準(zhǔn)備數(shù)據(jù)的情況下會立刻返回校读。

再說明synchronous IO和asynchronous IO的區(qū)別之前,需要先給出兩者的定義祖能。Stevens給出的定義(其實是POSIX的定義)是這樣子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
兩者的區(qū)別就在于synchronous IO做”IO operation”的時候會將process阻塞。按照這個定義蛾洛,四個IO模型可以分為兩大類养铸,
之前所述的blocking IO雁芙,non-blocking IO,IO multiplexing都屬于synchronous IO這一類钞螟,而 asynchronous I/O后一類 兔甘。

有人可能會說,non-blocking IO并沒有被block啊鳞滨。這里有個非扯幢海“狡猾”的地方,定義中所指的”IO operation”是指真實的IO操作拯啦,
就是例子中的recvfrom這個system call澡匪。non-blocking IO在執(zhí)行recvfrom這個system call的時候,如果kernel的數(shù)據(jù)沒有準(zhǔn)備好褒链,
這時候不會block進程唁情。但是,當(dāng)kernel中數(shù)據(jù)準(zhǔn)備好的時候甫匹,recvfrom會將數(shù)據(jù)從kernel拷貝到用戶內(nèi)存中甸鸟,這個時候進程是被block了,
在這段時間內(nèi)兵迅,進程是被block的抢韭。而asynchronous IO則不一樣,當(dāng)進程發(fā)起IO 操作之后恍箭,就直接返回再也不理睬了刻恭,直到kernel發(fā)送一個信號,
告訴進程說IO完成季惯。在這整個過程中吠各,進程完全沒有被block。

各個IO Model的比較如圖所示:


comparison.png

經(jīng)過上面的介紹勉抓,會發(fā)現(xiàn)non-blocking IO和asynchronous IO的區(qū)別還是很明顯的贾漏。在non-blocking IO中,雖然進程大部分時間都不會被block藕筋,但是它仍然要求進程去主動的check纵散,并且當(dāng)數(shù)據(jù)準(zhǔn)備完成以后,也需要進程主動的再次調(diào)用recvfrom來將數(shù)據(jù)拷貝到用戶內(nèi)存隐圾。而asynchronous IO則完全不同伍掀。它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然后他人做完后發(fā)信號通知暇藏。在此期間蜜笤,用戶進程不需要去檢查IO操作的狀態(tài),也不需要主動的去拷貝數(shù)據(jù)盐碱。

selectors模型

了解select把兔,poll沪伙,epoll

IO復(fù)用:為了解釋這個名詞,首先來理解下復(fù)用這個概念县好,復(fù)用也就是共用的意思围橡,這樣理解還是有些抽象,為此缕贡,咱們來理解下復(fù)用在通信領(lǐng)域的使用翁授,在通信領(lǐng)域中為了充分利用網(wǎng)絡(luò)連接的物理介質(zhì),往往在同一條網(wǎng)絡(luò)鏈路上采用時分復(fù)用或頻分復(fù)用的技術(shù)使其在同一鏈路上傳輸多路信號晾咪,到這里我們就基本上理解了復(fù)用的含義收擦,即公用某個“介質(zhì)”來盡可能多的做同一類(性質(zhì))的事,那IO復(fù)用的“介質(zhì)”是什么呢禀酱?為此我們首先來看看服務(wù)器編程的模型炬守,客戶端發(fā)來的請求服務(wù)端會產(chǎn)生一個進程來對其進行服務(wù),每當(dāng)來一個客戶請求就產(chǎn)生一個進程來服務(wù)剂跟,然而進程不可能無限制的產(chǎn)生减途,因此為了解決大量客戶端訪問的問題,引入了IO復(fù)用技術(shù)曹洽,即:一個進程可以同時對多個客戶請求進行服務(wù)鳍置。

也就是說IO復(fù)用的“介質(zhì)”是進程(準(zhǔn)確的說復(fù)用的是select和poll,因為進程也是靠調(diào)用select和poll來實現(xiàn)的)送淆,

復(fù)用一個進程(select和poll)來對多個IO進行服務(wù)税产,雖然客戶端發(fā)來的IO是并發(fā)的但是IO所需的讀寫數(shù)據(jù)多數(shù)情況下是沒有準(zhǔn)備好的,
因此就可以利用一個函數(shù)(select和poll)來監(jiān)聽IO所需的這些數(shù)據(jù)的狀態(tài)偷崩,一旦IO有數(shù)據(jù)可以進行讀寫了辟拷,進程就來對這樣的IO進行服務(wù)。

理解完IO復(fù)用后阐斜,我們在來看下實現(xiàn)IO復(fù)用中的三個API(select衫冻、poll和epoll)的區(qū)別和聯(lián)系

select,poll谒出,epoll都是IO多路復(fù)用的機制隅俘,I/O多路復(fù)用就是通過一種機制,可以監(jiān)視多個描述符笤喳,一旦某個描述符就緒(一般是讀就緒或者寫就緒)为居,能夠通知應(yīng)用程序進行相應(yīng)的讀寫操作。但select杀狡,poll蒙畴,epoll本質(zhì)上都是同步I/O逗堵,因為他們都需要在讀寫事件就緒后自己負(fù)責(zé)進行讀寫驾窟,也就是說這個讀寫過程是阻塞的腐晾,而異步I/O則無需自己負(fù)責(zé)進行讀寫芥颈,異步I/O的實現(xiàn)會負(fù)責(zé)把數(shù)據(jù)從內(nèi)核拷貝到用戶空間拇惋。三者的原型如下所示:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  1. select的第一個參數(shù)nfds為fdset集合中最大描述符值加1等舔,fdset是一個位數(shù)組饼暑,其大小限制為__FD_SETSIZE(1024)辅甥,位數(shù)組的每一位代表其對應(yīng)的描述符是否需要被檢查子姜。第二三四參數(shù)表示需要關(guān)注讀祟绊、寫、錯誤事件的文件描述符位數(shù)組哥捕,這些參數(shù)既是輸入?yún)?shù)也是輸出參數(shù)牧抽,可能會被內(nèi)核修改用于標(biāo)示哪些描述符上發(fā)生了關(guān)注的事件,所以每次調(diào)用select前都需要重新初始化fdset遥赚。timeout參數(shù)為超時時間扬舒,該結(jié)構(gòu)會被內(nèi)核修改,其值為超時剩余的時間凫佛。
    select的調(diào)用步驟如下:
    (1)使用copy_from_user從用戶空間拷貝fdset到內(nèi)核空間
    (2)注冊回調(diào)函數(shù)__pollwait
    (3)遍歷所有fd讲坎,調(diào)用其對應(yīng)的poll方法(對于socket,這個poll方法是sock_poll愧薛,sock_poll根據(jù)情況會調(diào)用到tcp_poll,udp_poll
    或者datagram_poll)
    (4)以tcp_poll為例晨炕,其核心實現(xiàn)就是__pollwait,也就是上面注冊的回調(diào)函數(shù)毫炉。
    (5)__pollwait的主要工作就是把current(當(dāng)前進程)掛到設(shè)備的等待隊列中瓮栗,不同的設(shè)備有不同的等待隊列,對于tcp_poll 來說瞄勾,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中并不代表進程已經(jīng)睡眠了)费奸。在設(shè)備收到一條消息(網(wǎng)絡(luò)設(shè)備)或填寫完文件數(shù)據(jù)(磁盤設(shè)備)后,會喚醒設(shè)備等待隊列上睡眠的進程进陡,這時current便被喚醒了愿阐。
    (6)poll方法返回時會返回一個描述讀寫操作是否就緒的mask掩碼,根據(jù)這個mask掩碼給fd_set賦值四濒。
    (7)如果遍歷完所有的fd换况,還沒有返回一個可讀寫的mask掩碼,則會調(diào)用schedule_timeout是調(diào)用select的進程(也就是 current)進入睡眠盗蟆。當(dāng)設(shè)備驅(qū)動發(fā)生自身資源可讀寫后戈二,會喚醒其等待隊列上睡眠的進程。如果超過一定的超時時間(schedule_timeout 指定)喳资,還是沒人喚醒觉吭,則調(diào)用select的進程會重新被喚醒獲得CPU,進而重新遍歷fd仆邓,判斷有沒有就緒的fd鲜滩。
    (8)把fd_set從內(nèi)核空間拷貝到用戶空間伴鳖。

總結(jié)下select的幾大缺點:
(1)每次調(diào)用select,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài)徙硅,這個開銷在fd很多時會很大
(2)同時每次調(diào)用select都需要在內(nèi)核遍歷傳遞進來的所有fd榜聂,這個開銷在fd很多時也很大
(3)select支持的文件描述符數(shù)量太小了,默認(rèn)是1024

2. poll與select不同嗓蘑,通過一個pollfd數(shù)組向內(nèi)核傳遞需要關(guān)注的事件须肆,故沒有描述符個數(shù)的限制,pollfd中的events字段和revents分別
用于標(biāo)示關(guān)注的事件和發(fā)生的事件桩皿,故pollfd數(shù)組只需要被初始化一次豌汇。

poll的實現(xiàn)機制與select類似,其對應(yīng)內(nèi)核中的sys_poll泄隔,只不過poll向內(nèi)核傳遞pollfd數(shù)組拒贱,然后對pollfd中的每個描述符進行poll,相比處理fdset來說佛嬉,poll效率更高逻澳。poll返回后,需要對pollfd中的每個元素檢查其revents值巷燥,來得指事件是否發(fā)生赡盘。

3.直到Linux2.6才出現(xiàn)了由內(nèi)核直接支持的實現(xiàn)方法,那就是epoll缰揪,被公認(rèn)為Linux2.6下性能最好的多路I/O就緒通知方法陨享。
epoll可以同時支持水平觸發(fā)和邊緣觸發(fā)(Edge Triggered,只告訴進程哪些文件描述符剛剛變?yōu)榫途w狀態(tài)钝腺,它只說一遍抛姑,如果我們沒有采取行動,那么它將不會再次告知艳狐,這種方式稱為邊緣觸發(fā))定硝,理論上邊緣觸發(fā)的性能要更高一些,但是代碼實現(xiàn)相當(dāng)復(fù)雜毫目。
epoll同樣只告知那些就緒的文件描述符蔬啡,而且當(dāng)我們調(diào)用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符镀虐,而是一個代表就緒描述符數(shù)量的值箱蟆,你只需要去epoll指定的一個數(shù)組中依次取得相應(yīng)數(shù)量的文件描述符即可,這里也使用了內(nèi)存映射(mmap)技術(shù)刮便,這樣便徹底省掉了這些文件描述符在系統(tǒng)調(diào)用時復(fù)制的開銷空猜。另一個本質(zhì)的改進在于epoll采用基于事件的就緒通知方式。在select/poll中,進程只有在調(diào)用一定的方法后辈毯,內(nèi)核才對所有監(jiān)視的文件描述符進行掃描坝疼,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基于某個文件描述符就緒時谆沃,內(nèi)核會采用類似callback的回調(diào)機制钝凶,迅速激活這個文件描述符,當(dāng)進程調(diào)用epoll_wait()時便得到通知管毙。

epoll既然是對select和poll的改進腿椎,就應(yīng)該能避免上述的三個缺點。那epoll都是怎么解決的呢夭咬?在此之前,我們先看一下epoll 和select和
poll的調(diào)用接口上的不同铆隘,select和poll都只提供了一個函數(shù)——select或者poll函數(shù)卓舵。而epoll提供了三個函 數(shù),epoll_create,epoll_ctl和epoll_wait膀钠,epoll_create是創(chuàng)建一個epoll句柄掏湾;epoll_ctl是注 冊要監(jiān)聽的事件類型;
epoll_wait則是等待事件的產(chǎn)生肿嘲。

對于第一個缺點融击,epoll的解決方案在epoll_ctl函數(shù)中。每次注冊新的事件到epoll句柄中時(在epoll_ctl中指定 EPOLL_CTL_ADD)雳窟,會把所有的fd拷貝進內(nèi)核尊浪,而不是在epoll_wait的時候重復(fù)拷貝。epoll保證了每個fd在整個過程中只會拷貝 一次封救。

對于第二個缺點拇涤,epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應(yīng)的設(shè)備等待隊列中,而只在 epoll_ctl時把current掛一遍(這一遍必不可少)并為每個fd指定一個回調(diào)函數(shù)誉结,當(dāng)設(shè)備就緒鹅士,喚醒等待隊列上的等待者時,就會調(diào)用這個回調(diào) 函數(shù)惩坑,而這個回調(diào)函數(shù)會把就緒的fd加入一個就緒鏈表)掉盅。epoll_wait的工作實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用schedule_timeout()實現(xiàn)睡一會,判斷一會的效果以舒,和select實現(xiàn)中的第7步是類似的)趾痘。

對于第三個缺點,epoll沒有這個限制稀轨,它所支持的FD上限是最大可以打開文件的數(shù)目扼脐,這個數(shù)字一般遠大于2048,舉個例子, 在1GB內(nèi)存的機器上大約是10萬左右,具體數(shù)目可以cat /proc/sys/fs/file-max察看,一般來說這個數(shù)目和系統(tǒng)內(nèi)存關(guān)系很大。

總結(jié):

  1. select瓦侮,poll實現(xiàn)需要自己不斷輪詢所有fd集合艰赞,直到設(shè)備就緒,期間可能要睡眠和喚醒多次交替肚吏。而epoll其實也需要調(diào)用 epoll_wait
    不斷輪詢就緒鏈表方妖,期間也可能多次睡眠和喚醒交替,但是它是設(shè)備就緒時罚攀,調(diào)用回調(diào)函數(shù)党觅,把就緒fd放入就緒鏈表中,并喚醒在 epoll_wait中進入睡眠的進程斋泄。雖然都要睡眠和交替杯瞻,但是select和poll在“醒著”的時候要遍歷整個fd集合,而epoll在“醒著”的 時候只要判斷一下就緒鏈表是否為空就行了炫掐,這節(jié)省了大量的CPU時間魁莉,這就是回調(diào)機制帶來的性能提升。
  2. select募胃,poll每次調(diào)用都要把fd集合從用戶態(tài)往內(nèi)核態(tài)拷貝一次旗唁,并且要把current往設(shè)備等待隊列中掛一次,而epoll只要 一次拷貝痹束,
    而且把current往等待隊列上掛也只掛一次(在epoll_wait的開始检疫,注意這里的等待隊列并不是設(shè)備等待隊列,只是一個epoll內(nèi)
    部定義的等待隊列)祷嘶,這也能節(jié)省不少的開銷屎媳。

selectors模塊
這三種IO多路復(fù)用模型在不同的平臺有著不同的支持,而epoll在windows下就不支持抹蚀,好在我們有selectors模塊剿牺,幫我們默認(rèn)選擇當(dāng)前平臺下最合適的

#服務(wù)端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #設(shè)置socket的接口為非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相當(dāng)于網(wǎng)select的讀列表里append了一個文件句柄
                                                         #server_fileobj,并且綁定了一個回調(diào)函數(shù)accept

while True:
    events=sel.select() #檢測所有的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

基于selectors模塊實現(xiàn)并發(fā)的FTP

參考:鏈接: https://pan.baidu.com/s/1qYPrHCg 密碼: 9is4

練習(xí)題

1环壤、簡述計算機操作系統(tǒng)中的“中斷”的作用晒来?

中斷:計算機執(zhí)行期間,系統(tǒng)內(nèi)發(fā)生任何非尋常的或非預(yù)期的急需處理事件,使得cpu暫時中斷當(dāng)前正在執(zhí)行的程序而轉(zhuǎn)去執(zhí)行相應(yīng)的事件處理程序.
待處理完畢后又返回原來被中斷處繼續(xù)執(zhí)行或調(diào)度新的進程執(zhí)行的過程.
它使計算機可以更好更快利用有限的系統(tǒng)資源解決系統(tǒng)響應(yīng)速度和運行效率的一種控制技術(shù).
實時響應(yīng) + 系統(tǒng)調(diào)用

2、簡述計算機內(nèi)存中的“內(nèi)核態(tài)”和“用戶態(tài)”郑现;

操作系統(tǒng)由操作系統(tǒng)的內(nèi)核(運行于內(nèi)核態(tài)湃崩,管理硬件資源)以及系統(tǒng)調(diào)用(運行于用戶態(tài),為應(yīng)用程序員寫的應(yīng)用程序提供系統(tǒng)調(diào)用接口)兩部分組成接箫;

內(nèi)核態(tài):cpu可以訪問內(nèi)存的所有數(shù)據(jù)攒读,包括外圍設(shè)備,例如硬盤辛友,網(wǎng)卡薄扁,cpu也可以將自己從一個程序切換到另一個程序剪返。

用戶態(tài):只能受限的訪問內(nèi)存,且不允許訪問外圍設(shè)備邓梅,占用cpu的能力被剝奪脱盲,cpu資源可以被其他程序獲取。

用戶態(tài)的應(yīng)用程序可以通過三種方式來訪問內(nèi)核態(tài)的資源:
1)系統(tǒng)調(diào)用
2)庫函數(shù)
3)Shell腳本
用戶態(tài)到內(nèi)核態(tài)的切換:
1.系統(tǒng)調(diào)用 用戶程序主動發(fā)起的 軟中斷 os.fork() process
2.異常 被動的 當(dāng)CPU正在執(zhí)行運行在用戶態(tài)的程序時日缨,突然發(fā)生某些預(yù)先不可知的異常事件钱反,這個時候就會觸發(fā)從當(dāng)前用戶態(tài)執(zhí)行的進程
轉(zhuǎn)向內(nèi)核態(tài)執(zhí)行相關(guān)的異常事件,典型的如缺頁異常匣距。
3.外圍設(shè)備的硬中斷 被動的 外圍設(shè)備完成用戶的請求操作后面哥,會像CPU發(fā)出中斷信號,此時毅待,CPU就會暫停執(zhí)行下一條即將要執(zhí)行的指令尚卫,
轉(zhuǎn)而去執(zhí)行中斷信號對應(yīng)的處理程序,如果先前執(zhí)行的指令是在用戶態(tài)下恩静,則自然就發(fā)生從用戶態(tài)到內(nèi)核態(tài)的轉(zhuǎn)換焕毫。

image

詳見:用戶態(tài)和內(nèi)核態(tài)的區(qū)別 https://blog.csdn.net/youngyoungla/article/details/53106671

https://blog.csdn.net/qq_34228570/article/details/72995997

3、進程間通信方式有哪些驶乾?

1、管道
2循签、有名管道(FIFO)
3级乐、消息隊列
4、信號量
5县匠、共享內(nèi)存
6风科、套接字(socket)

4、簡述你對管道乞旦、隊列的理解贼穆;

管道通常指無名管道
    1、它是半雙工的(即數(shù)據(jù)只能在一個方向上流動)兰粉,具有固定的讀端和寫端
    2故痊、它只能用于具有親緣關(guān)系的進程中通信(也就是父與子進程或者兄弟進程之間)
    3、數(shù)據(jù)不可反復(fù)讀取了玖姑,即讀了之后歡喜紅區(qū)中就沒有了
  消息隊列
    1愕秫、消息隊列是面向記錄的,其中的消息具有特定的格式以及特定的優(yōu)先級
    2焰络、消息隊列獨立于發(fā)送與接收進程戴甩。進程終止時,消息隊列及其內(nèi)容不會被刪除闪彼。
    3甜孤、消息隊列可以實現(xiàn)消息隨機查詢。

mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道。
  隊列和管道都是將數(shù)據(jù)存放于內(nèi)存中缴川,而隊列又是基于(管道+鎖)實現(xiàn)的茉稠,
可以讓我們從復(fù)雜的鎖問題中解脫出來,因而隊列才是進程間通信的最佳選擇二跋。
  我們應(yīng)該盡量避免使用共享數(shù)據(jù)战惊,盡可能使用消息傳遞和隊列,避免處理復(fù)雜的同步和鎖問題扎即,
而且在進程數(shù)目增多時吞获,往往可以獲得更好的可展性。
隊列 = 管道 + 鎖

from multiprocessing import Queue,Process
queue = Queue()
queue.put(url)
url = queue.get()
from multiprocessing import Pipe,Process
pipe = Pipe()
pipe.send(url)
pipe.recv()

5谚鄙、請列舉你知道的進程間通信方式各拷;

進程間通信(IPC)

1)管道

管道分為有名管道和無名管道

無名管道是一種半雙工的通信方式,數(shù)據(jù)只能單向流動,而且只能在具有親緣關(guān)系的進程間使用.進程的親緣關(guān)系一般指的是父子關(guān)系。無明管道一般用于兩個不同進程之間的通信闷营。當(dāng)一個進程創(chuàng)建了一個管道,并調(diào)用fork創(chuàng)建自己的一個子進程后,父進程關(guān)閉讀管道端,子進程關(guān)閉寫管道端,這樣提供了兩個進程之間數(shù)據(jù)流動的一種方式烤黍。

有名管道也是一種半雙工的通信方式,但是它允許無親緣關(guān)系進程間的通信。

2)信號量

信號量是一個計數(shù)器,可以用來控制多個線程對共享資源的訪問.,它不是用于交換大批數(shù)據(jù),而用于多線程之間的同步.它常作為一種鎖機制,防止某進程在訪問資源時其它進程也訪問該資源.因此,主要作為進程間以及同一個進程內(nèi)不同線程之間的同步手段.

3)信號

信號是一種比較復(fù)雜的通信方式,用于通知接收進程某個事件已經(jīng)發(fā)生.

4)消息隊列

消息隊列是消息的鏈表,存放在內(nèi)核中并由消息隊列標(biāo)識符標(biāo)識.消息隊列克服了信號傳遞信息少,管道只能承載無格式字節(jié)流以及緩沖區(qū)大小受限等特點.消息隊列是UNIX下不同進程之間可實現(xiàn)共享資源的一種機制,UNIX允許不同進程將格式化的數(shù)據(jù)流以消息隊列形式發(fā)送給任意進程.對消息隊列具有操作權(quán)限的進程都可以使用msget完成對消息隊列的操作控制.通過使用消息類型,進程可以按任何順序讀信息,或為消息安排優(yōu)先級順序.

5)共享內(nèi)存

共享內(nèi)存就是映射一段能被其他進程所訪問的內(nèi)存,這段共享內(nèi)存由一個進程創(chuàng)建,但多個進程都可以訪問.共享內(nèi)存是最快的IPC(進程間通信)方式,它是針對其它進程間通信方式運行效率低而專門設(shè)計的.它往往與其他通信機制,如信號量,配合使用,來實現(xiàn)進程間的同步與通信.

6)套接字:可用于不同及其間的進程通信

6傻盟、什么是同步I/O速蕊,什么是異步I/O?

同步是指:發(fā)送方發(fā)出數(shù)據(jù)后娘赴,等接收方發(fā)回響應(yīng)以后才發(fā)下一個數(shù)據(jù)包的通訊方式规哲。
異步是指:發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng)诽表,接著發(fā)送下個數(shù)據(jù)包的通訊方式唉锌。

7、請問multiprocessing模塊中的Value竿奏、Array類的作用是什么袄简?舉例說明它們的使用場景

Value( typecode, arg1, … argN, lock )
在共享內(nèi)容中常見ctypes對象。typecode要么是包含array模塊使用的相同類型代碼(如’i’,’d’等)的字符串泛啸,要么是來自ctypes模塊的類型對象(如ctypes.c_int绿语、ctypes.c_double等)。
所有額外的位置參數(shù)arg1, arg2 ….. argN將傳遞給指定類型的構(gòu)造函數(shù)平痰。lock是只能使用關(guān)鍵字調(diào)用的參數(shù)汞舱,如果把它置為True(默認(rèn)值),將創(chuàng)建一個新的鎖定來包含對值的訪問宗雇。
如果傳入一個現(xiàn)有鎖定昂芜,比如Lock或RLock實例,該鎖定將用于進行同步赔蒲。如果v是Value創(chuàng)建的共享值的實例泌神,便可使用v.value訪問底層的值良漱。例如,讀取v.value將獲取值欢际,而賦值v.value將修改值母市。

RawValue( typecode, arg1, … ,argN)
同Value對象,但不存在鎖定损趋。

Array( typecode, initializer, lock )
在共享內(nèi)存中創(chuàng)建ctypes數(shù)組患久。typecode描述了數(shù)組的內(nèi)容,意義與Value()函數(shù)中的相同浑槽。initializer要么是設(shè)置數(shù)組初始大小的整數(shù)蒋失,要么是項目序列,其值和大小用于初始化數(shù)組桐玻。lock是只能使用關(guān)鍵字調(diào)用的參數(shù)篙挽,意義與Value()函數(shù)中相同。
如果a是Array創(chuàng)建的共享數(shù)組的實例镊靴,便可使用標(biāo)準(zhǔn)的python索引铣卡、切片和迭代操作訪問它的內(nèi)容,其中每種操作均由鎖定進行同步偏竟。對于字節(jié)字符串煮落,a還具有a.value屬性,可以吧整個數(shù)組當(dāng)做一個字符串進行訪問踊谋。

RawArray(typecode, initializer )
同Array對象州邢,但不存在鎖定。當(dāng)所編寫的程序必須一次性操作大量的數(shù)組項時褪子,如果同時使用這種數(shù)據(jù)類型和用于同步的單獨鎖定(如果需要的話),性能將得到極大的提升骗村。
詳見:https://blog.csdn.net/winterto1990/article/details/48106505

http://xiaorui.cc/2016/05/10/%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90multiprocessing%E7%9A%84value-array%E5%85%B1%E4%BA%AB%E5%86%85%E5%AD%98%E5%8E%9F%E7%90%86/

https://blog.csdn.net/qdx411324962/article/details/46810421

8嫌褪、請問multiprocessing模塊中的Manager類的作用是什么?與Value和Array類相比胚股,Manager的優(yōu)缺點是什么笼痛?

可以通過使用Value或者Array把數(shù)據(jù)存儲在一個共享的內(nèi)存表中;Manager()返回一個manager類型琅拌,控制一個server process缨伊,可以允許其它進程通過代理復(fù)制一些python objects 支持list,dict,Namespace,Lock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value,Array ;

Manager類的作用共享資源进宝,manger的的優(yōu)點是可以在poor進程池中使用刻坊,缺點是windows下環(huán)境下性能比較差,因為windows平臺需要把Manager.list放在if name='main'下党晋,而在實例化子進程時谭胚,必須把Manager對象傳遞給子進程徐块,否則lists無法被共享,而這個過程會消耗巨大資源灾而,因此性能很差胡控。

參考:http://www.kaka-ace.com/python-multiprocessing-module-2-managers-first-profile/

https://blog.csdn.net/alvine008/article/details/24310939

9、寫一個程序旁趟,包含十個線程昼激,子線程必須等待主線程sleep 10秒鐘之后才執(zhí)行,并打印當(dāng)前時間锡搜;

from threading import Thread
import time

def task(name):
    print(name, time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))

if __name__ == '__main__':
    time.sleep(10)
    for i in range(10):
        t = Thread(target=task, args=('線程 %s'%i, ))
        t.start()

10橙困、寫一個程序,包含十個線程余爆,同時只能有五個子線程并行執(zhí)行纷宇;

from threading import Thread,currentThread
from concurrent.futures import ThreadPoolExecutor
import time
import random

def task():
    print(currentThread().getName())
    time.sleep(random.randint(1,3))

if __name__ == "__main__":
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task)

11、寫一個程序蛾方,要求用戶輸入用戶名和密碼像捶,要求密碼長度不少于6個字符,且必須以字母開頭桩砰,如果密碼合法拓春,則將該密碼使用md5算法加密后的十六進制概要值存入名為password.txt的文件,超過三次不合法則退出程序亚隅;

import hashlib
import json
import re

def func():
    count = 0
    while count < 3:
        username = input('username>>>:').strip()
        password = input('password>>>:').strip()
        if len(password) < 6 or not re.search('\A([a-z]|[A-Z])',password):
            count += 1
        else:
            obj = {'usename':username,'password':hashlib.md5(password.encode('utf-8')).hexdigest()}
            json.dump(obj,open('password.txt','a',encoding='utf-8'))
            break

if __name__ == "__main__":
    func()

12硼莽、寫一個程序,使用socketserver模塊煮纵,實現(xiàn)一個支持同時處理多個客戶端請求的服務(wù)器懂鸵,要求每次啟動一個新線程處理客戶端請求;

# server
import socketserver

class Handler(socketserver.BaseRequestHandler):   # 必須繼承BaseRequestHandler
    def handle(self):    # 必須有handle方法
        print('new connection:',self.client_address)
        while True:
            try:
                data = self.request.recv(1024)
                if not data:break
                print('client data:',data.decode())
                self.request.send(data.upper())
            except Exception as e:
                print(e)
                break

if __name__ == "__main__":
    server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),Handler)  # 實例化對象行疏,實現(xiàn)多線程的socket
    server.serve_forever()  # 事件監(jiān)聽匆光,并調(diào)用handler方法

# client
import socket
client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
    msg = input(">>>:").strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode('utf-8'))
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市酿联,隨后出現(xiàn)的幾起案子终息,更是在濱河造成了極大的恐慌,老刑警劉巖贞让,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件周崭,死亡現(xiàn)場離奇詭異,居然都是意外死亡喳张,警方通過查閱死者的電腦和手機续镇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蹲姐,“玉大人磨取,你說我怎么就攤上這事人柿。” “怎么了忙厌?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵凫岖,是天一觀的道長。 經(jīng)常有香客問我逢净,道長哥放,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任爹土,我火速辦了婚禮甥雕,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘胀茵。我一直安慰自己社露,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布琼娘。 她就那樣靜靜地躺著峭弟,像睡著了一般。 火紅的嫁衣襯著肌膚如雪脱拼。 梳的紋絲不亂的頭發(fā)上瞒瘸,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機與錄音熄浓,去河邊找鬼情臭。 笑死,一個胖子當(dāng)著我的面吹牛赌蔑,可吹牛的內(nèi)容都是我干的俯在。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼娃惯,長吁一口氣:“原來是場噩夢啊……” “哼朝巫!你這毒婦竟也來了子寓?” 一聲冷哼從身側(cè)響起洪灯,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤贞滨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后潮孽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡筷黔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年往史,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片佛舱。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡椎例,死狀恐怖挨决,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情订歪,我是刑警寧澤脖祈,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站刷晋,受9級特大地震影響盖高,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜眼虱,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一喻奥、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧捏悬,春花似錦撞蚕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至抒和,卻和暖如春矫渔,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背摧莽。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工庙洼, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人镊辕。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓油够,卻偏偏與公主長得像,于是被迫代替她去往敵國和親征懈。 傳聞我的和親對象是個殘疾皇子石咬,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容

  • @(python)[筆記] 目錄 一鬼悠、什么是進程 1.1 進程的概念 進程的概念起源于操作系統(tǒng),是操作系統(tǒng)最核心的...
    CaiGuangyin閱讀 1,261評論 0 9
  • 一亏娜、背景知識 顧名思義焕窝,進程即正在執(zhí)行的一個過程。進程是對正在運行程序的一個抽象维贺。 進程的概念起源于操作系統(tǒng)它掂,是操...
    馬小跳_閱讀 634評論 0 0
  • 操作系統(tǒng)概論 操作系統(tǒng)的概念 操作系統(tǒng)是指控制和管理計算機的軟硬件資源,并合理的組織調(diào)度計算機的工作和資源的分配溯泣,...
    野狗子嗷嗷嗷閱讀 11,931評論 3 34
  • 1.虛擬機內(nèi)存結(jié)構(gòu) 線程私有:虛擬機棧虐秋,本地方法棧榕茧,程序計數(shù)器 線程共享:堆,方法區(qū)(包括運行時常量池) 1....
    什么都不會的碼農(nóng)丶閱讀 331評論 0 0
  • 要下雨了客给,身體已報告了用押。從下午陰天開始,腰就不太舒服起愈,各個關(guān)節(jié)也有些隱隱作痛只恨。 以前,經(jīng)常聽別人說...
    張家小鬧閱讀 214評論 0 0