之前幾篇文章我們了解了多媒體數(shù)據(jù)流的采集和編碼稠鼻,這一篇開始我們來了解數(shù)據(jù)傳輸?shù)牧鞒獭?/p>
在數(shù)據(jù)流編碼完成以后闸昨,程序會通過打包器將編碼完成的數(shù)據(jù)打包再傳輸出去,代碼如下:
// The packetizer encapsulates the bit stream in an RTP stream and send it over the network
mPacketizer.setDestination(mDestination, mRtpPort, mRtcpPort);
mPacketizer.setInputStream(new MediaCodecInputStream(mMediaCodec));
mPacketizer.start();
簡單來說蓖救,就是設(shè)置傳輸目的地地址瑞躺、Rtp端口和Rtcp端口,設(shè)置數(shù)據(jù)源的InputStream(MediaCodecInputStream繼承InputStream筋岛,從MediaCodec對象中獲取完成編碼的數(shù)據(jù)流)娶视,啟動start()方法執(zhí)行數(shù)據(jù)打包操作。下面我們來看一下打包器的源碼:
AbstractPacketizer類
AbstractPacketizer就是打包器的基類睁宰,封裝了對數(shù)據(jù)流打包操作的基本操作和一些公共參數(shù)變量肪获。
public AbstractPacketizer() {
int ssrc = new Random().nextInt();
ts = new Random().nextInt();
socket = new RtpSocket();
socket.setSSRC(ssrc);
}
...
/** Starts the packetizer. */
public abstract void start();
/** Stops the packetizer. */
public abstract void stop();
/** Updates data for RTCP SR and sends the packet. */
protected void send(int length) throws IOException {
socket.commitBuffer(length);
}
上面截取AbstractPacketizer類的部分代碼。AbstractPacketizer的構(gòu)造函數(shù)中直接創(chuàng)建了一個RtpSocket對象柒傻,就是將打包好的數(shù)據(jù)用Rtp協(xié)議傳輸出去的執(zhí)行者孝赫。ssrc:用于標識同步信源。ts:時間戳红符。AbstractPacketizer還提供了兩個抽象方法start()和stop()青柄,用于控制流的打包操作。在子類(根據(jù)數(shù)據(jù)格式生成不同的子類)實現(xiàn)的start()方法中预侯,會創(chuàng)建一個線程來執(zhí)行數(shù)據(jù)打包操作致开,數(shù)據(jù)打包的內(nèi)部原理涉及到音視頻的相關(guān)格式和相關(guān)傳輸協(xié)議,這里不再深入萎馅。send(int length)方法就是使用RtpSocket對象更新和發(fā)送數(shù)據(jù)包双戳。
RTP協(xié)議和RTCP協(xié)議
- RTP全名是Real-time Transport Protocol(實時傳輸協(xié)議),RTCP(Real-time Transport Control Protocol糜芳,即實時傳輸控制協(xié)議)飒货。
- RTP用來為IP網(wǎng)上的語音、圖像峭竣、傳真等多種需要實時傳輸?shù)亩嗝襟w數(shù)據(jù)提供端到端的實時傳輸服務(wù)塘辅。
- RTP為Internet上端到端的實時傳輸提供時間信息和流同步,但并不保證服務(wù)質(zhì)量皆撩,服務(wù)質(zhì)量由RTCP來提供扣墩。
- RTCP的主要功能是:服務(wù)質(zhì)量的監(jiān)視與反饋、媒體間的同步,以及多播組中成員的標識沮榜。在RTP會話期 間盘榨,各參與者周期性地傳送RTCP包。RTCP包中含有已發(fā)送的數(shù)據(jù)包的數(shù)量蟆融、丟失的數(shù)據(jù)包的數(shù)量等統(tǒng)計資料,因此守呜,各參與者可以利用這些信息動態(tài)地改變傳輸速率型酥,甚至改變有效載荷類型。RTP和RTCP配合使用查乒,它們能以有效的反饋和最小的開銷使傳輸效率最佳化弥喉,因而特別適合傳送網(wǎng)上的實時數(shù)據(jù)。
- Rtp和Rtcp分別使用兩個端口執(zhí)行通信玛迄。RTP數(shù)據(jù)發(fā)向偶數(shù)的UDP端口由境,而對應(yīng)的控制信號RTCP數(shù)據(jù)發(fā)向相鄰的奇數(shù)UDP端口(偶數(shù)的UDP端口+1),這樣就構(gòu)成一個UDP端口對蓖议。
- 參考資料:RTP協(xié)議分析 應(yīng)該稍微了解一下RTP的封裝和RTCP的封裝虏杰。
RtpSocket類
RtpSocket類是使用RTP協(xié)議的Socket的封裝實現(xiàn)。
/**
* This RTP socket implements a buffering mechanism relying on a FIFO of buffers and a Thread.
* @throws IOException
*/
public RtpSocket() {
mCacheSize = 00;
mBufferCount = 300; // TODO: reajust that when the FIFO is full
mBuffers = new byte[mBufferCount][];
mPackets = new DatagramPacket[mBufferCount];
mReport = new SenderReport();
mAverageBitrate = new AverageBitrate();
...
try {
mSocket = new MulticastSocket();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
在構(gòu)造函數(shù)中初始化各個參數(shù)變量和發(fā)送RTCP報文的實例(SenderReport)勒虾,mSocket對象(這里使用MulticastSocket纺阔,實現(xiàn)將數(shù)據(jù)報以廣播的方式發(fā)送到多個client)。
/** Sends the RTP packet over the network. */
public void commitBuffer(int length) throws IOException {
updateSequence();
mPackets[mBufferIn].setLength(length);
mAverageBitrate.push(length);
if (++mBufferIn>=mBufferCount) mBufferIn = 0;
mBufferCommitted.release();
if (mThread == null) {
mThread = new Thread(this);
mThread.start();
}
}
發(fā)送RTP協(xié)議包數(shù)據(jù)修然。這里啟動了一個線程來執(zhí)行發(fā)送動作笛钝,以一定的速率依次發(fā)送數(shù)據(jù)包。
/** The Thread sends the packets in the FIFO one by one at a constant rate. */
@Override
public void run() {
Statistics stats = new Statistics(50,3000);
try {
// Caches mCacheSize milliseconds of the stream in the FIFO.
Thread.sleep(mCacheSize);
long delta = 0;
while (mBufferCommitted.tryAcquire(4,TimeUnit.SECONDS)) {
if (mOldTimestamp != 0) {
// We use our knowledge of the clock rate of the stream and the difference between two timestamps to
// compute the time lapse that the packet represents.
if ((mTimestamps[mBufferOut]-mOldTimestamp)>0) {
stats.push(mTimestamps[mBufferOut]-mOldTimestamp);
long d = stats.average()/1000000;
//Log.d(TAG,"delay: "+d+" d: "+(mTimestamps[mBufferOut]-mOldTimestamp)/1000000);
// We ensure that packets are sent at a constant and suitable rate no matter how the RtpSocket is used.
if (mCacheSize>0) Thread.sleep(d);
} else if ((mTimestamps[mBufferOut]-mOldTimestamp)<0) {
Log.e(TAG, "TS: "+mTimestamps[mBufferOut]+" OLD: "+mOldTimestamp);
}
delta += mTimestamps[mBufferOut]-mOldTimestamp;
if (delta>500000000 || delta<0) {
//Log.d(TAG,"permits: "+mBufferCommitted.availablePermits());
delta = 0;
}
}
mReport.update(mPackets[mBufferOut].getLength(), System.nanoTime(),(mTimestamps[mBufferOut]/100L)*(mClock/1000L)/10000L);
mOldTimestamp = mTimestamps[mBufferOut];
if (mCount++>30) mSocket.send(mPackets[mBufferOut]);
if (++mBufferOut>=mBufferCount) mBufferOut = 0;
mBufferRequested.release();
}
} catch (Exception e) {
e.printStackTrace();
}
mThread = null;
resetFifo();
}
簡單流程就是:配置傳輸速率愕宋,使用while來循環(huán)執(zhí)行玻靡,如果一個執(zhí)行過程不超過4秒,則一直循環(huán)下去中贝。在循環(huán)執(zhí)行的動作包括:計算速率的平均值(按照這個平均值的速率來執(zhí)行傳輸囤捻,確保速率恒定),更新Rtcp報文雄妥,Socket執(zhí)行發(fā)送動作(Send)來給客戶端(接收端)傳輸數(shù)據(jù)包最蕾。
SenderReport類
SenderReport類是執(zhí)行Rtcp協(xié)議的實現(xiàn)類。
public SenderReport() {
...
try {
usock = new MulticastSocket();
} catch (IOException e) {
throw new RuntimeException(e.getMessage());
}
upack = new DatagramPacket(buffer, 1);
// By default we sent one report every 5 secconde
interval = 3000;
}
正如上面介紹時所說老厌,Rtp和Rtcp分別使用兩個端口執(zhí)行通信瘟则,所以SenderReport類的構(gòu)造函數(shù)也需要初始化一個Socket對象用于發(fā)送Rtcp報文。
/**
* Updates the number of packets sent, and the total amount of data sent.
* @param length The length of the packet
* @throws IOException
**/
public void update(int length, long ntpts, long rtpts) throws IOException {
packetCount += 1;
octetCount += length;
setLong(packetCount, 20, 24);
setLong(octetCount, 24, 28);
now = SystemClock.elapsedRealtime();
delta += oldnow != 0 ? now-oldnow : 0;
oldnow = now;
if (interval>0) {
if (delta>=interval) {
// We send a Sender Report
send(ntpts,rtpts);
delta = 0;
}
}
}
在Rtp的Socket不斷發(fā)送數(shù)據(jù)包的同時枝秤,SenderReport也不斷在更新和發(fā)送Rtcp的報文醋拧,
update()方法就是在不斷更新發(fā)送的數(shù)據(jù)包數(shù),以及發(fā)送的數(shù)據(jù)總量。
/** Sends the RTCP packet over the network. */
private void send(long ntpts, long rtpts) throws IOException {
long hb = ntpts/1000000000;
long lb = ( ( ntpts - hb*1000000000 ) * 4294967296L )/1000000000;
setLong(hb, 8, 12);
setLong(lb, 12, 16);
setLong(rtpts, 16, 20);
upack.setLength(28);
usock.send(upack);
}
send()方法就是用于在update()更新Rtcp報文后丹壕,將新的Rtcp報文通過Socket傳給接收端庆械。
到這里我們了解了打包器Packetizer的打包流程和Rtp&Rtcp協(xié)議的傳輸流程,下一篇將了解Rtsp協(xié)議和它在項目中的運用流程菌赖。