1. 內(nèi)存管理機制
1.1 介紹
- 概要
賦值語句內(nèi)存分析
垃圾回收機制
內(nèi)存管理機制 - 目標
掌握賦值語句內(nèi)存分析方法
掌握id()
和is
的使用
了解Python
的垃圾回收機制
了解Python
的內(nèi)存管理機制 - 內(nèi)存與硬盤
內(nèi)存是電腦的數(shù)據(jù)存儲設(shè)備之一亩冬,其特點為容量較小雹熬,但數(shù)據(jù)傳送速度較快燃箭,用以彌補硬盤雖然容量大但傳送速度慢的缺點。
1.2 內(nèi)存管理機制
-
is
、id()
和==
a is b
等價于id(a) == id(b)
,比較的是id
(內(nèi)存標識)是否相同。
a == b
比較的是value
(值)是否相同范抓。
a1 = 5
a2 = 5
print(id(a1)) # 4313714912
print(id(a1) == id(a2), a1 is a2) # True True
b1 = 'abc'
b2 = 'abc'
print(b1 == b2, b1 is b2) # True True
c1 = [1]
c2 = [1]
print(c1 == c2, c1 is c2) # True False
d1 = {'a': 1}
d2 = {'a': 1}
print(d1 == d2, d1 is d2) # True False
- 內(nèi)存管理示例
def extend_list(value, list=[]):
list.append(value)
return list
l1 = extend_list(10)
l2 = extend_list(123, [])
l3 = extend_list('a')
print(l1) # [10, 'a']
print(l2) # [123]
print(l3) # [10, 'a']
- 垃圾回收機制
以引用計數(shù)為主,分代收集為輔食铐。
class Cat(object):
def __init__(self):
print('{} init'.format(id(self)))
def __del__(self):
print('{} del'.format(id(self)))
while True:
""" 自動回收內(nèi)存 """
c1 = Cat()
l = []
while True:
""" 一直被引用匕垫,內(nèi)存不會釋放 """
c1 = Cat()
l.append(c1)
注意:在引用計數(shù)垃圾回收機制中,循環(huán)引用會導致內(nèi)存泄漏虐呻。
- 引用計數(shù)
每個對象都存在指向該對象的引用計數(shù)年缎。
查看某個對象的引用計數(shù):sys.getrefcount()
刪除某個引用:del
import sys
# 基本數(shù)據(jù)類型
i = 1
print(sys.getrefcount(i)) # 181
i1 = 1
i2 = i1
print(sys.getrefcount(i)) # 183
del i1
print(sys.getrefcount(i)) # 182
# 引用數(shù)據(jù)類型
l = []
print(sys.getrefcount(l)) # 2
l1 = l
l2 = l1
print(sys.getrefcount(l)) # 4
del l2
print(sys.getrefcount(l)) # 3
- 分代收集
Python
將所有的對象分為0
,1
铃慷,2
三代单芜。
所有新建對象都是0
代對象。
當某一代對象經(jīng)歷過垃圾回收犁柜,依然存活洲鸠,那么它就被歸入下一代對象。 - 垃圾回收時機
當Python
運行時馋缅,會記錄其中分配對象(object allocation
)和取消分配對象(object deallocation
)的次數(shù)扒腕。當兩者的差值高于某一個閾值時,垃圾回收才會啟動萤悴。
查看閾值:gc.get_threshold()
import gc
print(gc.get_threshold()) # (700, 10, 10) 依次表示第0瘾腰、1、2代垃圾回收閾值
- 手動回收
gc.collect()
手動回收覆履。
objgraph
模塊中的count()
可以記錄當前類產(chǎn)生的未回收實例對象個數(shù)蹋盆。
class Person(object):
pass
class Cat(object):
pass
p = Person()
c = Cat()
p.pet = c
c.master = p
print(sys.getrefcount(p)) # 3
print(sys.getrefcount(c)) # 3
print(objgraph.count('Person')) # 1
print(objgraph.count('Cat')) # 1
del p
del c
gc.collect() # p與c循環(huán)引用费薄。如果不使用del,則無法回收栖雾。
print(objgraph.count('Person')) # 0
print(objgraph.count('Cat')) # 0
- 內(nèi)存池(
memory pool
)機制
當創(chuàng)建大量小內(nèi)存對象時楞抡,頻繁調(diào)用new / malloc
會導致大量內(nèi)存碎片,效率降低析藕。內(nèi)存池就是預先在內(nèi)存中申請一定數(shù)量的召廷、大小相等的內(nèi)存塊留作備用,當有新的內(nèi)存需求時账胧,就先從內(nèi)存池中分配內(nèi)存給這個需求竞慢,不夠了再申請新的內(nèi)存。這樣做可以減少內(nèi)存碎片治泥,提高效率分别。
Python3
中的內(nèi)存管理機制——Pymalloc
針對小對象(<=512bytes
)碰辅,pymalloc
會在內(nèi)存池中申請內(nèi)存空間爹谭。
當內(nèi)存>512bytes
時奏寨,則使用PyMen_RawMalloc()
和PyMem_RawRealloc()
來申請新的內(nèi)存空間仑鸥。
注釋:1 Byte = 8 Bits
(即1B = 8b
)
1.3 總結(jié)
- 賦值語句內(nèi)存分析
使用id()
方法訪問內(nèi)存地址
使用is
比較內(nèi)存地址是否相同 - 垃圾回收機制
引用計數(shù)為主吮播,分代收集為輔
引用計數(shù)的缺陷是循環(huán)引用問題 - 內(nèi)存管理機制
內(nèi)存池(memory pool
)機制
2. 線程、進程眼俊、協(xié)程
2.1 介紹
- 章節(jié)概要
進程意狠、線程與并發(fā)
對多核的利用
實現(xiàn)一個線程
線程之間的通信
線程的調(diào)度和優(yōu)化 - 使用場景
快速高效的爬蟲程序
多用戶同時訪問Web
服務
電商秒殺、搶購活動
物聯(lián)網(wǎng)傳感器監(jiān)控服務
2.2 線程
- 線程介紹
在同一個進程下執(zhí)行疮胖,并共享相同的上下文环戈。
一個進程中的各個線程與主線程共享同一片數(shù)據(jù)空間。
線程包括開始澎灸、執(zhí)行順序和結(jié)束三部分院塞。
線程可以被強占(中斷)和臨時掛起(睡眠)——讓步 - 多核的利用
單核CPU
系統(tǒng)中,不存在真正的并發(fā)性昭。
GIL
全局解釋器鎖 - 強制在任何時候只有一個線程可以執(zhí)行Python
代碼拦止。
I/O
密集型應用與CPU
密集型應用。 -
GIL
執(zhí)行順序
(1) 設(shè)置GIL
(2) 切換進一個線程去執(zhí)行
(3) 執(zhí)行下面操作之一
指定數(shù)量的字節(jié)碼指令糜颠;
線程主動讓出控制權(quán)(可以調(diào)用time.sleep(0)
來完成)汹族。
(4) 把線程設(shè)置回睡眠狀態(tài)(切換出線程)
(5) 解鎖GIL
(6) 重復上述步驟 -
threading
模塊對象
threading模塊對象 -
Thread
對象方法
Thread對象方法 - 線程實現(xiàn)(面向過程)
import threading
import time
def loop():
""" 新的線程執(zhí)行的代碼 """
loop_thread = threading.current_thread()
print('loop_thread: {}'.format(loop_thread.name)) # loop_thread: loop_thread
n = 0
while n < 5:
print(n) # 0 1 2 3 4
n += 1
def use_thread():
""" 新的線程執(zhí)行函數(shù) """
now_thread = threading.current_thread()
print('now_thread: {}'.format(now_thread.name)) # now_thread: MainThread
# 創(chuàng)建線程
t = threading.Thread(target=loop, name='loop_thread')
# 啟動線程
t.start()
# 掛起線程
t.join()
if __name__ == '__main__':
use_thread()
- 線程實現(xiàn)(面向?qū)ο?
import threading
class LoopThread(threading.Thread):
""" 自定義線程 """
n = 0
def run(self):
loop_thread = threading.current_thread()
print('loop_thread: {}'.format(loop_thread.name)) # loop_thread: loop_thread
while self.n < 5:
print(self.n) # 0 1 2 3 4
self.n += 1
if __name__ == '__main__':
now_thread = threading.current_thread()
print('now_thread: {}'.format(now_thread.name)) # now_thread: MainThread
# 創(chuàng)建線程
t = LoopThread(name='loop_thread')
# 啟動線程
t.start()
# 掛起線程
t.join()
- 多線程及存在的問題
import threading
# 我的銀行賬戶
balance = 0
def change(n):
global balance # 局部作用域如果要修改全局變量,則需要global關(guān)鍵字聲明其兴。
balance = balance + n
balance = balance - n
if balance != 0:
print(balance) # balance有可能是-8 -5 0 5 8
class ChangeBalanceThread(threading.Thread):
def __init__(self, num, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num = num
def run(self):
for i in range(100000):
change(self.num)
if __name__ == '__main__':
# 創(chuàng)建線程
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
# 啟動線程
t1.start()
t2.start()
# 掛起線程
t1.join()
t2.join()
注意:局部作用域如果要修改全局變量顶瞒,則需要global
關(guān)鍵字聲明。
- 鎖
import threading
# 鎖
my_lock = threading.Lock()
# 我的銀行賬戶
balance = 0
def change(n):
global balance # 局部作用域如果要修改全局變量元旬,則需要global關(guān)鍵字聲明榴徐。
# 添加鎖
my_lock.acquire()
try:
balance = balance + n
balance = balance - n
if balance != 0:
print(balance)
finally:
# 釋放鎖
my_lock.release()
class ChangeBalanceThread(threading.Thread):
def __init__(self, num, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num = num
def run(self):
for i in range(100000):
change(self.num)
if __name__ == '__main__':
# 創(chuàng)建線程
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
# 啟動線程
t1.start()
t2.start()
# 掛起線程
t1.join()
t2.join()
- 鎖的簡單寫法
# 鎖
my_lock = threading.Lock()
# 我的銀行賬戶
balance = 0
def change(n):
global balance
# 添加鎖
with my_lock:
balance = balance + n
balance = balance - n
if balance != 0:
print(balance)
- 死鎖
# 鎖
my_lock = threading.Lock()
# 我的銀行賬戶
balance = 0
def change(n):
global balance
# 添加鎖
my_lock.acquire()
my_lock.acquire()
try:
balance = balance + n
balance = balance - n
print(balance)
finally:
# 釋放鎖
my_lock.release()
my_lock.release()
-
threading.RLock
我們使用threading.Lock()
來進行加鎖守问。threading
中還提供了另外一個threading.RLock()
鎖。在同一線程內(nèi)箕速,對RLock
進行多次acquire()
操作酪碘,程序不會阻塞。
# 鎖
my_lock = threading.RLock()
# 我的銀行賬戶
balance = 0
def change(n):
global balance # 局部作用域如果要修改全局變量盐茎,則需要global關(guān)鍵字聲明兴垦。
# 添加鎖
my_lock.acquire()
my_lock.acquire()
try:
balance = balance + n
balance = balance - n
print(balance)
finally:
# 釋放鎖
my_lock.release()
my_lock.release()
- 使用線程池
import time
import threading
from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing.dummy import Pool
def run(n):
""" 線程要做的事情 """
time.sleep(1)
print('thread: {}; n: {}'.format(threading.current_thread().name, n))
def main():
""" 主線程做任務 """
t1 = time.time()
for n in range(100):
run(n)
print(time.time() - t1) # 100.30s
def main_use_thread():
""" 多線程做任務。10個線程字柠。"""
t2 = time.time()
ls = []
for i in range(100):
t = threading.Thread(target=run, args=(i,))
ls.append(t)
t.start()
for l in ls:
l.join()
print(time.time() - t2) # 1.01s
def main_use_pool():
""" 線程池 """
t3 = time.time()
pool = Pool(10)
n_list = range(100)
pool.map(run, n_list)
pool.close()
pool.join()
print(time.time() - t3) # 12.08s
def main_use_executor():
""" 使用ThreadPoolExecutor來優(yōu)化 """
t4 = time.time()
n_list = range(100)
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(run, n_list)
print(time.time() - t4) # 10.03s
if __name__ == '__main__':
main()
main_use_thread()
main_use_pool()
main_use_executor()
注意:線程傳參的方式threading.Thread(target=run, args=(i,))
探越。其中,(i,)
表示元組窑业。
2.3 進程
- 進程介紹
是一個執(zhí)行中的程序钦幔。
每個進程都擁有自己的地址空間、內(nèi)存常柄、數(shù)據(jù)棧以及其他用于跟蹤執(zhí)行的輔助數(shù)據(jù)鲤氢。
操作系統(tǒng)管理所有進程的執(zhí)行,并為這些進程合理地分配時間西潘。
進程也可通過派生(fork
或spawn
)新的進程來執(zhí)行其他任務卷玉。 - 進程實現(xiàn)(面向過程)
import os
import time
from multiprocessing import Process
def do_sth(name):
print("進程名稱: {}, pid: {}".format(name, os.getpid())) # 進程名稱: my_process, pid: 47634
time.sleep(3)
print("進程要做的事情結(jié)束") # 進程要做的事情結(jié)束
if __name__ == '__main__':
print('當前進程pid: {}'.format(os.getpid())) # 當前進程名稱: 47651
p = Process(target=do_sth, args=('my_process',))
# 啟動進程
p.start()
# 掛起進程
p.join()
- 進程實現(xiàn)(面向?qū)ο?
import os
import time
from multiprocessing import Process
class MyProcess(Process):
def run(self):
print("進程名稱: {}, pid: {}".format(self.name, os.getpid())) # 進程名稱: my_process, pid: 47758
time.sleep(3)
print("進程要做的事情結(jié)束") # 進程要做的事情結(jié)束
if __name__ == '__main__':
print('當前進程pid: {}'.format(os.getpid())) # 當前進程名稱: 47757
p = MyProcess(name='my_process')
# 啟動進程
p.start()
# 掛起進程
p.join()
- 進程之間通信
通過Queue
、Pipes
等實現(xiàn)進程之間的通信喷市。
import time
import random
from multiprocessing import Process, Queue, current_process
class WriteProcess(Process):
""" 寫入的進程 """
def __init__(self, queue, *args, **kwargs):
self.queue = queue
super().__init__(*args, **kwargs)
def run(self):
ls = [
'第1行內(nèi)容',
'第2行內(nèi)容',
'第3行內(nèi)容',
'第4行內(nèi)容',
'第5行內(nèi)容',
]
for line in ls:
print('進程名稱: {}, 寫入內(nèi)容: {}'.format(current_process().name, line))
self.queue.put(line)
# 沒寫入一次相种,休息1-3s
time.sleep(random.randint(1, 3))
class ReadProcess(Process):
""" 讀取的進程 """
def __init__(self, queue, *args, **kwargs):
self.queue = queue
super().__init__(*args, **kwargs)
def run(self):
while True:
content = self.queue.get()
print('進程名稱: {}, 讀取內(nèi)容: {}'.format(self.name, content))
if __name__ == '__main__':
# 通過Queue共享數(shù)據(jù)
q = Queue()
# 寫入內(nèi)容的進程
t_write = WriteProcess(q)
t_write.start()
# 讀取內(nèi)容的進程
t_read = ReadProcess(q)
t_read.start()
t_write.join()
# t_read.join()
t_read.terminate()
注意:進程類中的self.name
等價于current_process().name
。
- 多進程中的鎖
import random
import time
from multiprocessing import Process, Lock
class WriteProcess(Process):
""" 寫入文件 """
lock = Lock()
def __init__(self, file_name, index, *args, **kwargs):
self.file_name = file_name
self.index = index
super().__init__(*args, **kwargs)
def run(self):
with self.lock:
for i in range(5):
with open(self.file_name, 'a+', encoding='utf-8') as f:
content = '現(xiàn)在是: {}, pid為: {}, 進程序號為: {}\n'.format(
self.name,
self.pid,
self.index
)
f.write(content)
time.sleep(random.randint(1, 5))
print(content)
if __name__ == '__main__':
file_name = 'test.txt'
for i in range(5):
p = WriteProcess(file_name, i)
p.start()
-
multiprocessing.RLock
我們使用multiprocessing.Lock()
來進行加鎖品姓。multiprocessing
中還提供了另外一個multiprocessing.RLock()
鎖寝并。在同一進程內(nèi),對RLock
進行多次acquire()
操作腹备,程序不會阻塞衬潦。 - 進程池
import os
import random
import time
from multiprocessing import current_process, Pool
def run(file_name, index):
""" 進程任務 """
with open(file_name, 'a+', encoding='utf-8') as f:
now_process = current_process()
content = '{} - {} - {} \n'.format(
now_process.name,
now_process.pid,
index
)
f.write(content)
time.sleep(random.randint(1, 3))
print(content)
return 'OK'
if __name__ == '__main__':
# print(os.cpu_count()) # 4核CPU
file_name = 'test_pool.txt'
pool = Pool(2) # 創(chuàng)建有兩個進程的進程池
for i in range(20):
# 同步添加任務
# rest = pool.apply(run, args=(file_name, i))
# 異步添加任務
rest = pool.apply_async(run, args=(file_name, i))
print('{} {}'.format(i, rest))
pool.close() # 關(guān)閉進程池
pool.join()
注釋:異步添加任務,不能保證任務的執(zhí)行順序植酥。同步添加任務镀岛,可以保證任務的執(zhí)行順序。
2.4 協(xié)程
- 協(xié)程介紹
協(xié)程就是協(xié)同多任務惧互。
協(xié)程在一個進程或者是一個線程中執(zhí)行哎媚。
協(xié)程不需要鎖機制。 -
yield
生成器
def count_down(n):
""" 倒計時效果 """
while n > 0:
yield n
n -= 1
if __name__ == '__main__':
rest = count_down(5)
print(next(rest)) # 5
print(next(rest)) # 4
print(next(rest)) # 3
print(next(rest)) # 2
print(next(rest)) # 1
-
Python3.5
以前協(xié)程實現(xiàn)
使用生成器(yield
)實現(xiàn)
def yield_test():
""" 實現(xiàn)協(xié)程函數(shù) """
while True:
n = (yield )
print(n)
if __name__ == '__main__':
rest = yield_test()
next(rest)
rest.send('1') # 1
rest.send('2') # 2
-
Python3.5
以后協(xié)程實現(xiàn)
使用async
和await
關(guān)鍵字實現(xiàn)
import asyncio
async def do_sth(x):
print("等待中: {}".format(x))
await asyncio.sleep(x)
# 判斷是否是協(xié)程函數(shù)
print(asyncio.iscoroutinefunction(do_sth)) # True
# 創(chuàng)建任務
coroutine = do_sth(5)
# 創(chuàng)建事件循環(huán)隊列
loop = asyncio.get_event_loop()
# 注冊任務
task = loop.create_task(coroutine)
print(task) # <Task pending coro=<do_sth() running at /Users/nimengwei/Code/mycode/python/Python零基礎(chǔ)入門/Python基礎(chǔ)知識/chapter09/async_test.py:4>>
# 等待協(xié)程任務執(zhí)行結(jié)束
loop.run_until_complete(task)
print(task) # <Task finished coro=<do_sth() done, defined at /Users/nimengwei/Code/mycode/python/Python零基礎(chǔ)入門/Python基礎(chǔ)知識/chapter09/async_test.py:4> result=None>
注意:必須使用asyncio.sleep()
喊儡,不能使用time.sleep()
拨与。只有前者能返回一個協(xié)程對象。
- 協(xié)程通信之嵌套調(diào)用
import asyncio
async def compute(x, y):
await asyncio.sleep(1)
return x + y
async def get_sum(x, y):
rest = await compute(x, y)
print("計算{} + {} = {}".format(x, y, rest))
loop = asyncio.get_event_loop()
# loop.run_until_complete(loop.create_task(get_sum(1, 2)))
loop.run_until_complete(get_sum(1, 2))
loop.close()
- 協(xié)程通信之隊列
import asyncio
import random
async def add(store):
""" 寫入數(shù)據(jù)到隊列 """
for i in range(5):
await asyncio.sleep(random.randint(1, 3))
await store.put(i)
print('add {}, queue size: {}'.format(i, store.qsize()))
async def reduce(store):
""" 從隊列中刪除數(shù)據(jù) """
for i in range(10):
rest = await store.get()
print('reduce {}, queue size: {}'.format(rest, store.qsize()))
if __name__ == '__main__':
# 準備一個隊列
store = asyncio.Queue(maxsize=5)
a1 = add(store)
a2 = add(store)
r1 = reduce(store)
# 添加到事件隊列
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(a1, a2, r1))
loop.close()