流是什么?
一種處理實(shí)時(shí)數(shù)據(jù)的方法返劲。還有一種使用場(chǎng)景是當(dāng)前要處理的數(shù)據(jù)很大玲昧,無(wú)法一次放入內(nèi)存,使用流能夠使用很小的內(nèi)存完成處理篮绿。python中主要是靠
生成器來(lái)解決孵延。即在調(diào)用時(shí)才處理。而不是預(yù)先要加載全部數(shù)據(jù)亲配。
def gen():
yield 1
yield 2
yield 3
只有使用next調(diào)用才執(zhí)行尘应。
在flask中的response是支持流的。
實(shí)現(xiàn)視頻流需要的格式是
multipart/x-mixed-replace
這是每一幀需要包含的信息吼虎。
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
程序的基本結(jié)構(gòu)如下犬钢。
index綁定到 / 路由上。返回一個(gè)界面思灰。
gen方法使用生成器產(chǎn)生實(shí)時(shí)數(shù)據(jù)
video_feed 進(jìn)行響應(yīng)
#!/usr/bin/env python
from flask import Flask, render_template, Response
from camera import Camera
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
def gen(camera):
while True:
frame = camera.get_frame()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
@app.route('/video_feed')
def video_feed():
return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True玷犹,threaded=True)
這是index.html模板,可以看到src指向上面的viedofeed函數(shù)洒疚,返回流響應(yīng)歹颓。
<html>
<head>
<title>Video Streaming Demonstration</title>
</head>
<body>
<h1>flask Video Streaming</h1>
![]({{ url_for('video_feed') }})
</body></html>
下一步就是獲取幀了。首先使用三個(gè)圖片來(lái)替代拳亿。
from time import time
class Camera(object):
def __init__(self):
self.frames = [open(f + '.jpg', 'rb').read() for f in ['1', '2', '3']]
def get_frame(self):
return self.frames[int(time()) % 3]
這樣實(shí)現(xiàn)的效果是三幅圖片持續(xù)循環(huán)晴股。
下一步是獲取真正的直播流。
使用opencv的python模塊就可以肺魁。
def frames():
camera = cv2.VideoCapture(0)
if not camera.isOpened():
raise RuntimeError('Could not start camera.')
while True:
# read current frame
_, img = camera.read()
# encode as a jpeg image and return it
yield cv2.imencode('.jpg', img)[1].tobytes()
一直捕獲數(shù)據(jù)电湘,然后通過(guò)生成器產(chǎn)生。將這個(gè)函數(shù)接入到上面的gen函數(shù)是可以直接執(zhí)行的鹅经。
性能提升
這種不做任何處理的方式會(huì)消耗大量的cpu資源寂呛,其中一個(gè)原因是因?yàn)楹笈_(tái)捕獲的線程和向
客戶端發(fā)送的線程不同步。這就導(dǎo)致后臺(tái)一直在大量獲取數(shù)據(jù)瘾晃,但并不是所有的數(shù)據(jù)都被傳送了出去贷痪。
為了提高性能將兩者進(jìn)行同步。只把后臺(tái)獲取的原始幀進(jìn)行發(fā)送蹦误。
使用線程的threading.event能夠?qū)崿F(xiàn)劫拢。
class CameraEvent(object):
# ...
class BaseCamera(object):
# ...
event = CameraEvent()
# ...
def get_frame(self):
"""Return the current camera frame."""
BaseCamera.last_access = time.time()
# 阻塞 等待相機(jī)線程來(lái)喚醒
BaseCamera.event.wait()
BaseCamera.event.clear()# 將標(biāo)志位設(shè)置為阻塞
return BaseCamera.frame
@classmethod
def _thread(cls):
# ...
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set() # 喚醒上面阻塞的地方
#判斷最后查看時(shí)間肉津,來(lái)決定是否關(guān)閉。
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
上面的wait方法會(huì)阻塞直到標(biāo)志位變?yōu)閠rue舱沧,clear方法將標(biāo)志位重置為false
set方法重置為true 通過(guò)這個(gè)機(jī)制實(shí)現(xiàn)同步妹沙。首先是等待喚醒,然后再將標(biāo)志位
變?yōu)閒lase熟吏,然后繼續(xù)進(jìn)行距糖。
在電腦上開(kāi)啟服務(wù),然后在手機(jī)上查看如下: