背景
場景內(nèi)有7臺攝像頭浓利,需要對攝像頭拍攝范圍內(nèi)人員進(jìn)行識別然后上報型将。
單進(jìn)程單攝像頭的處理方式由于圖片預(yù)測邏輯比較耗時彻坛,會導(dǎo)致延遲會比較高顷啼,所以需要異步預(yù)測。
并且最好還要對多攝像頭的管理有較好的機(jī)制昌屉。隨添隨用
思路
1 使用 multiprocessing 起動多個子進(jìn)程钙蒙,每個子進(jìn)程分配唯一的 rtsp 地址
2 使用 Queue 避免對視頻幀處理耗時導(dǎo)致獲取到的幀延遲較大的情況
看代碼
用到的核心庫如下
import cv2
import multiprocessing
from multiprocessing import Process,Queue,Pool
邏輯部分偽代碼
cameraList = [
{'name':'xxx1', 'rtsp':'rtsp://username:pwd@192.168.5.1/main/Channels/2'},
{'name':'xxx2', 'rtsp':'rtsp://username:pwd@192.168.5.2/main/Channels/2'},
{'name':'xxx3', 'rtsp':'rtsp://username:pwd@192.168.5.3/main/Channels/2'},
....
]
step = 4 #設(shè)置每4s抓一次幀
#消費(fèi)進(jìn)程
def consumer(q, tmp) :
while True:
try:
#發(fā)現(xiàn)堆積嚴(yán)重時釋放隊列內(nèi)數(shù)據(jù)
if q.qsize() > 100:
while q.empty() == False:
if q.qsize() > 10:
q.get(True, 1)
value = q.get()
handler(value['index'], value['frame'], value['t'])#將攝像頭索引和對應(yīng)的幀進(jìn)行處理
except Exception,e:
tlog('handler fail ' + e.message)
def readRTSP(q, index) :
#打散
time.sleep(index + random.randint(1,len(cameraList)))
video_info = cameraList[index]
rtspUrl = video_info['rtsp']
cap = cv2.VideoCapture(rtspUrl)
fps = cap.get(cv2.CAP_PROP_FPS)
i = 0
while cap.isOpened():
try:
success,frame = cap.read()
i += 1
if success == True:
if i % (fps * step) != 0:
cv2.waitKey(1)
continue
i = 0
q.put_nowait({'index':index, 't':int(time.time()), 'frame':frame})#將該幀推入隊列
cv2.waitKey(1)
continue
except:
tlog('read fail')
cap.release()
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進(jìn)程創(chuàng)建Queue,并傳給各個子進(jìn)程:
q = manager.Queue()
p = Pool()
#設(shè)置消費(fèi)者數(shù)量
for i in range(4):
pr = p.apply_async(consumer,args=(q, 'consumer'))
for i in range(len(cameraList)):
pr = p.apply_async(readRTSP,args=(q, i))
p.close()
p.join()