Day09的課程要點記錄
詳細教程地址:Day9 - 進程赡麦、線程虹统、協(xié)程篇
Python之路【第八篇】:堡壘機實例以及數(shù)據(jù)庫操作
一、堡壘機
1.1 前戲:paramiko模塊
1.1.1 SSHClient:用于連接遠程服務(wù)器并執(zhí)行基本命令
基于用戶名密碼連接
import paramiko
# 創(chuàng)建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務(wù)器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')
# 執(zhí)行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結(jié)果
result = stdout.read()
# 關(guān)閉連接
ssh.close()
基于公鑰密鑰連接
使用
ssh-keygen
來生成密鑰對
公鑰給別人隧甚,私鑰自己保存
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
# 創(chuàng)建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務(wù)器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
# 執(zhí)行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結(jié)果
result = stdout.read()
# 關(guān)閉連接
ssh.close()
1.1.2 SFTPClient:用于連接遠程服務(wù)器并執(zhí)行上傳下載
基于用戶名密碼上傳下載
import paramiko
transport = paramiko.Transport(('hostname',22))
transport.connect(username='wupeiqi',password='123')
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務(wù)器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()
基于公鑰密鑰上傳下載
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key )
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務(wù)器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()
二车荔、進程與線程(process & thread)
2.1 什么是進程 (process)
進程:程序并不能單獨運行,只有將程序裝載到內(nèi)存中戚扳,系統(tǒng)為它分配資源才能運行忧便,而這種執(zhí)行的程序就稱之為進程。
進程不能執(zhí)行帽借,只能通過線程執(zhí)行
進程最少有一個線程
2.2 什么是線程 (thread)
線程:線程是操作系統(tǒng)能夠進行運算調(diào)度的最小單位珠增。
它被包含在進程之中,是進程中的實際運作單位砍艾。
一條線程指的是進程中一個單一順序的控制流蒂教,一個進程中可以并發(fā)多個線程,每條線程并行執(zhí)行不同的任務(wù)
同一個進程中的所有線程脆荷,共享同一塊內(nèi)存空間
2.3 進程和線程的區(qū)別
線程啟動速度快凝垛,進程啟動速度慢。但運行速度沒有可比性蜓谋。
- 線程共享同一進程的內(nèi)存空間梦皮,進程的內(nèi)存空間是獨立的
- 線程可以訪問同一進程的數(shù)據(jù)片段,進程之間的數(shù)據(jù)是獨立的
- 同一進程中的線程之間可以直接通信桃焕,進程之間通信必須使用中間代理
- 新線程易于創(chuàng)建剑肯,創(chuàng)建新進程需要克隆其父進程
- 一個線程可以控制與操作同一進程中的其他線程,進程只能操作其子進程
- 對主線程的修改可能會影響到同一進程中的其他線程观堂,對于父進程的修改不會影響其子進程
2.4 并發(fā)的多線程效果演示(threading
模塊)
線程有2種調(diào)用方式
2.4.1 直接調(diào)用
import threading
import time
def run(n): # 定義每個線程要運行的函數(shù)
print("task ", n)
time.sleep(2)
t1 = threading.Thread(target=run, args=("t1",)) # 生成一個線程實例
t2 = threading.Thread(target=run, args=("t2",)) # 生成另一個線程實例
t1.start() # 啟動線程
t2.start() # 啟動另一個線程
print(t1.getName()) # 獲取線程名
print(t2.getName())
# 非多線程對比
# run("t1")
# run("t2")
2.4.2 繼承式調(diào)用
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n):
super(MyThread, self).__init__()
self.n = n
def run(self): # 定義每個線程要運行的函數(shù)
print("running task ", self.n)
time.sleep(2)
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
2.5 Join & Daemon
2.5.1 等待線程 Join
for
循環(huán)啟動線程
import threading
import time
def run(n):
print("task ", n)
time.sleep(2)
print("task done", n)
start_time = time.time()
t_objs = []
for i in range(50):
t = threading.Thread(target=run, args=("t-%s" % i, ))
t.start()
t_objs.append(t)
for i in t_objs:
t.join()
print("------ all threads have finished ------")
print("cost:", time.time() - start_time)
2.5.1 守護進程 Deamon
主線程結(jié)束時让网,不等待守護線程呀忧,直接結(jié)束
import threading
import time
def run(n):
print("task ", n)
time.sleep(2)
print("task done", n)
start_time = time.time()
for i in range(50):
t = threading.Thread(target=run, args=("t-%s" % i, ))
t.setDaemon(True) # 把當前線程設(shè)置為守護線程
t.start()
print("------ all threads have finished ------")
print(threading.current_thread(), threading.active_count())
print("cost:", time.time() - start_time)
三、多線程
3.1 全局解釋器鎖 Python GIL(Global Interpreter Lock)
無論你啟動多少個線程溃睹,你有多少個CPU而账,Python在執(zhí)行的時候只會在同一時刻只允許一個線程運行。當然丸凭,不同的線程可能是在不同的CPU上運行的。
3.2 線程鎖 threading Lock (互斥鎖Mutex)
一個進程下可以啟動多個線程腕铸,多個線程共享父進程的內(nèi)存空間惜犀,也就意味著每個線程可以訪問同一份數(shù)據(jù)
此時,如果2個線程同時要修改同一份數(shù)據(jù)狠裹,會出現(xiàn)什么狀況虽界?
def run():
global num
num += 1
num = 0
for i in range(500):
t = threading.Thread(target=run)
t.start()
print("------ all threads have finished ------")
print("num:", num)
正常的num
結(jié)果應(yīng)該是500, 但在python 2.7上多運行幾次涛菠,會發(fā)現(xiàn)最后打印出來的num
結(jié)果不總是500莉御。
原因是:假設(shè)有A,B兩個線程,此時都要對num
進行減1操作俗冻,由于2個線程是并發(fā)同時運行的礁叔,所以2個線程很有可能同時拿走了 num=0 這個初始變量交給cpu去運算,當A線程去處完的結(jié)果是1迄薄,但此時B線程運算完的結(jié)果也是1琅关,兩個線程同時CPU運算的結(jié)果再賦值給num變量后,結(jié)果就都是99讥蔽。
解決辦法就是:在每個線程在要修改公共數(shù)據(jù)時涣易,為了避免自己在還沒改完的時候別人也來修改此數(shù)據(jù),可以給這個數(shù)據(jù)加一把鎖冶伞, 這樣其它線程想修改此數(shù)據(jù)時就必須等待你修改完畢并把鎖釋放掉后才能再訪問此數(shù)據(jù)新症。
def run():
lock.acquire() # 修改數(shù)據(jù)前加鎖
global num
num += 1
lock.release() # 修改數(shù)據(jù)后釋放鎖
num = 0
lock = threading.Lock() # 生成全局鎖
for i in range(500):
t = threading.Thread(target=run)
t.start()
print("------ all threads have finished ------")
print("num:", num)
3.3 Python GIL vs threading Lock
那么Python已經(jīng)有一個GIL來保證同一時間只能有一個線程來執(zhí)行了,為什么這里還需要lock?
這里的lock是用戶級的lock响禽,和GIL沒關(guān)系徒爹,具體通過下圖和課程講解就明白了。
3.4 遞歸鎖(RLock)
簡單說明:一個大鎖中包含子鎖
import threading
import time
def run1():
print("Grab the first part data.")
lock.acquire()
global num
num += 1
lock.release()
return num
def run2():
print("Grab the second part data.")
lock.acquire()
global num2
num2 += 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print("------ between run1 and run2 ------")
res2 = run2()
lock.release()
print(res, res2)
if __name__ == '__main__':
num, num2 = 0, 0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print("------ All threads have finished ------")
print(num, num2)
3.5 信號量(Semaphore)
互斥鎖芋类,同時只允許一個線程更改數(shù)據(jù)瀑焦。
而Semaphore是同時允許一定數(shù)量的線程更改數(shù)據(jù) ,比如廁所有3個坑梗肝,那最多只允許3個人上廁所榛瓮,后面的人只能等里面有人出來了才能再進去。
import time
import threading
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" % n)
semaphore.release()
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5) # 最多允許5個線程同時運行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start()
while threading.active_count() != 1:
pass
else:
print("------ All threads done ------")
3.6 事件(Events)
通過Event來實現(xiàn)兩個或多個線程間的交互巫击。
一個紅綠燈的例子禀晓,即起動一個線程做交通指揮燈精续,生成幾個線程做車輛,車輛行駛按紅燈停粹懒,綠燈行的規(guī)則重付。
import time
import threading
event = threading.Event()
def light():
count = 0
event.set() # 先設(shè)置為綠燈
while True:
if count > 5 and count < 10: # 變?yōu)榧t燈
event.clear()
print("\033[1;41mRed light is turned on.\033[0m")
elif count > 10:
event.set() # 變?yōu)榫G燈
count = 0
else:
print("\033[1;42mGreen light is turned on.\033[0m")
time.sleep(1)
count += 1
def car(name):
while True:
if event.isSet():
print("[%s] is running." % name)
time.sleep(1)
else:
print("[%s] sees red light, it's waiting" % name)
event.wait()
print("\033[1;34m[%s] sees green light is on, it's keep going." % name)
lights = threading.Thread(target=light)
lights.start()
car1 = threading.Thread(target=car, args=("Jeep",))
car1.start()
3.7 隊列(queue)
作用:解耦,提高效率
3.7.1 queue
實例化方法
class queue.Queue(maxsize=0) # 先入先出
class queue.LifoQueue(maxsize=0) # last in first out
class queue.PriorityQueue(maxsize=0) # 存儲數(shù)據(jù)時可設(shè)置優(yōu)先級的隊列
3.7.2 queue
方法
Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)
3.8 生產(chǎn)者消費者模型
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題凫乖。
該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度确垫。
3.8.1 為什么要使用生產(chǎn)者和消費者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程帽芽,消費者就是消費數(shù)據(jù)的線程删掀。
在多線程開發(fā)當中,如果生產(chǎn)者處理速度很快导街,而消費者處理速度很慢披泪,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)搬瑰。
同樣的道理款票,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者泽论。
為了解決這個問題于是引入了生產(chǎn)者和消費者模式艾少。
3.8.2 什么是生產(chǎn)者消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。
生產(chǎn)者和消費者彼此之間不直接通訊翼悴,而通過阻塞隊列來進行通訊姆钉,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列抄瓦。
消費者不找生產(chǎn)者要數(shù)據(jù)潮瓶,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區(qū)钙姊,平衡了生產(chǎn)者和消費者的處理能力毯辅。
3.8.3 示例
最基本的生產(chǎn)者消費者模型
import queue
import threading
def producer(name):
for i in range(10):
q.put("bread%s" % i)
print("%s produced bread%s" % (name, i))
def consumer(name):
while q.qsize() > 0:
print("%s have eaten %s" % (name, q.get()))
q = queue.Queue()
p = threading.Thread(target=producer, args=("will",))
c = threading.Thread(target=consumer, args=("grubby",))
p.start()
c.start()
持續(xù)循環(huán)工作的生產(chǎn)者消費者模型
import time
import queue
import threading
def producer(name):
count = 1
while True:
q.put("bread%s" % count)
print("%s produced bread%s" % (name, count))
count += 1
time.sleep(0.5)
def consumer(name):
while True:
print("%s have eaten %s" % (name, q.get()))
time.sleep(1)
q = queue.Queue(maxsize=10)
p1 = threading.Thread(target=producer, args=("wang",))
p2 = threading.Thread(target=producer, args=("wei",))
c1 = threading.Thread(target=consumer, args=("will",))
c2 = threading.Thread(target=consumer, args=("lee",))
p1.start()
p2.start()
c1.start()
c2.start()
3.9 多線程的使用場景
I/O操作不占用CPU,計算占用CPU
Python的多線程煞额,不適合CPU密集操作型的任務(wù)思恐,適合I/O操作密集型的任務(wù)。
四膊毁、多進程(multiprocessing)
4.1 多進程的基本語法
4.1.1 啟動一個進程
import multiprocessing
import time
def run(name):
time.sleep(2)
print("Hello", name)
if __name__ == '__main__':
p = multiprocessing.Process(target=run, args=('bob',))
p.start()
4.1.2 啟動多進程
import time
import multiprocessing
def run(name):
time.sleep(2)
print("Hello", name)
if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=run, args=("bob %s" % i,))
p.start()
4.1.3 啟動多進程胀莹,進程中啟動一個子線程
import time
import threading
import multiprocessing
def run(name):
time.sleep(2)
print("Hello", name)
t = threading.Thread(target=thread_run, args=(name,))
t.start()
def thread_run(name):
print("%s's id" % name, threading.get_ident())
if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=run, args=("bob %s" % i,))
p.start()
4.1.4. 獲取進程ID
每一個進程都有一個父進程
例如Linux中init進程號為1,它啟動其他所有進程
import os
import multiprocessing
def info(title):
print(title)
print("Module name:", __name__)
print("Parent process id:", os.getppid())
print("Current process id:", os.getpid())
print("\n")
def f(name):
info("child process function f")
print("Hello", name)
if __name__ == '__main__':
info("Main process line")
p = multiprocessing.Process(target=f, args=("bob",))
p.start()
4.2 進程間數(shù)據(jù)交互
不同進程之間內(nèi)存是不共享的婚温,要想實現(xiàn)兩個進程間的數(shù)據(jù)交換描焰,有以下方法:
進程隊列(Queue)
import multiprocessing
def f(q):
q.put([42, None, "hello"])
if __name__ == '__main__':
q = multiprocessing.Queue()
p = multiprocessing.Process(target=f, args=(q,))
p.start()
print(q.get())
p.join()
管道(Pipe)
import multiprocessing
def f(conn):
conn.send([42, None, "hello from child"])
conn.send([42, None, "Are you Ok?"])
print(conn.recv())
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
print(parent_conn.recv())
parent_conn.send("Good for you.")
p.join()
Manager
import os
import multiprocessing
def f(d, l):
d["%s" % os.getpid()] = os.getpid()
l.append(os.getpid())
print("For now is", os.getpid())
print(d)
print(l)
if __name__ == '__main__':
manager = multiprocessing.Manager()
d = manager.dict() # 生成一個可在多個進程間共享和傳遞的字典
l = manager.list(range(5)) # 生成一個可在多個進程間共享和傳遞的列表
p_list = []
for i in range(10):
p = multiprocessing.Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
p.join()
進程同步
進程鎖的意義:進程共享同一塊屏幕,在顯示信息時保證不亂。
import multiprocessing
def f(l, i):
l.acquire()
print("Hello world", i)
l.release()
if __name__ == '__main__':
lock = multiprocessing.Lock()
for num in range(100):
multiprocessing.Process(target=f, args=(lock, num)).start()
4.3 進程池
進程池內(nèi)部維護一個進程序列荆秦,當使用時篱竭,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程步绸,那么程序就會等待掺逼,直到進程池中有可用進程為止。
進程池的方法:
- apply
- apply_async
import multiprocessing
import os
import time
def foo(i):
time.sleep(2)
print("In process:", os.getpid())
return i + 100
def bar(arg):
print("---> exec done:", arg, os.getpid())
if __name__ == '__main__':
pool = multiprocessing.Pool(5) # 允許進程池同時放入5個進程
print("Main process:", os.getpid())
for i in range(10):
# pool.apply(func=foo, args=(i,)) # apply 串行
# pool.apply_async(func=foo, args=(i,)) # apply 并行
pool.apply_async(func=foo, args=(i,), callback=bar) # callback 回調(diào)瓤介,func 執(zhí)行完畢后吕喘,執(zhí)行callback
pool.close() # 一定要先關(guān)閉進程池再join
pool.join() # 進程池中的進程執(zhí)行完畢后再關(guān)閉,如果注釋此行刑桑,那么程序直接關(guān)閉
第九周作業(yè)——類 Fabric 主機管理程序開發(fā)
類 Fabric 主機管理程序開發(fā):
- 運行程序列出主機組或者主機列表
- 選擇指定主機或主機組
- 選擇讓主機或者主機組執(zhí)行命令或者向其傳輸文件(上傳/下載)
- 充分使用多線程或多進程
- 不同主機的用戶名密碼氯质、端口可以不同