librtmp是我們平常工作中進(jìn)行推拉流開(kāi)發(fā)的重要工具沙廉,官方提供的版本是基于C/C++技術(shù)棧的,但是有不少的其它高級(jí)語(yǔ)言技術(shù)棧也都提供了相應(yīng)的包裝或移植版本。
RTMP協(xié)議非常復(fù)雜辜御,網(wǎng)上又鮮有較為完整的文檔,再加上它的一些設(shè)計(jì)理念比較奇特(奇特的事件會(huì)剌激我們的反向思維屈张,讓我們對(duì)現(xiàn)有認(rèn)知產(chǎn)生否定)擒权,以至于我們完全理解它需要耗費(fèi)非常多的時(shí)間和腦力袱巨。
從RTMP的實(shí)現(xiàn)版本也側(cè)面驗(yàn)證了這點(diǎn),現(xiàn)有的主流實(shí)現(xiàn)仍然遵循TLV范式(不了解的話請(qǐng)百度)碳抄,協(xié)議層面基本拋棄了消息頭的建議愉老,鏈路層復(fù)用只使用塊頭,塊頭之上就是負(fù)載剖效,對(duì)指令來(lái)說(shuō)負(fù)載就是AMF嫉入,對(duì)媒體來(lái)說(shuō)負(fù)載就是編碼幀,清爽干凈璧尸!
本文不講解RTMP協(xié)議規(guī)范的相關(guān)知識(shí)咒林,讀者朋友們請(qǐng)自行百度。本文的主要目的是配合大家一起閱讀librtmp的核心源碼爷光,但是在開(kāi)始之前建議大家有一些RTMP協(xié)議的基本認(rèn)識(shí)垫竞,我希望在完成本文的閱讀之后,大家對(duì)librtmp的使用能上一個(gè)更高的層次蛀序。
下載源碼:
在閱讀源碼前欢瞪,請(qǐng)大家到官方網(wǎng)址下載:http://rtmpdump.mplayerhq.hu/download
最新的是2.3版本,作者已經(jīng)很多年沒(méi)有更新了徐裸,說(shuō)明還是比較穩(wěn)定的遣鼓。
下源源碼后,解壓重贺,文件樹(shù)結(jié)構(gòu)如下譬正,部分文件我做了說(shuō)明:
.
├── ChangeLog
├── COPYING
├── librtmp // rtmp協(xié)議的一個(gè)客戶端庫(kù)實(shí)現(xiàn)
│?? ├── amf.c // AMF序列化實(shí)現(xiàn)
│?? ├── amf.h // AMF序列化頭文件
│?? ├── bytes.h
│?? ├── COPYING
│?? ├── dhgroups.h
│?? ├── dh.h // DH非對(duì)稱加密算法調(diào)用封裝
│?? ├── handshake.h // 加密版本的握手實(shí)現(xiàn)
│?? ├── hashswf.c // SWF哈希校驗(yàn)相關(guān)實(shí)現(xiàn)
│?? ├── http.h // HTTP請(qǐng)求頭文件定義
│?? ├── librtmp.3
│?? ├── librtmp.3.html
│?? ├── librtmp.pc.in
│?? ├── log.c // 日志輸出實(shí)現(xiàn)
│?? ├── log.h // 日志輸出定義
│?? ├── Makefile
│?? ├── parseurl.c // RTMP網(wǎng)址解析實(shí)現(xiàn)
│?? ├── rtmp.c // RTMP主邏輯實(shí)現(xiàn)
│?? ├── rtmp.h // RTMP主邏輯頭文件
│?? └── rtmp_sys.h
├── Makefile
├── README
├── rtmpdump.1
├── rtmpdump.1.html
├── rtmpdump.c // 一個(gè)很強(qiáng)大的FLV文件解析、拉流的例子
├── rtmpgw.8
├── rtmpgw.8.html
├── rtmpgw.c // 一個(gè)HTTP服務(wù)代理的實(shí)現(xiàn)
├── rtmpsrv.c // 一個(gè)簡(jiǎn)單的RTMP服務(wù)器的實(shí)現(xiàn)的基本框架
├── rtmpsuck.c
├── thread.c // 線程封裝
└── thread.h // 線程封裝頭文件
編譯的話一般執(zhí)行make就可以了檬姥,需要openssl依賴曾我,生成的庫(kù)在librtmp子目錄下。
主要結(jié)構(gòu)定義:
在rtmp.h中(以下除非特殊說(shuō)明健民,均指rtmp.h或rtmp.c)抒巢,定義了4種塊頭格式:
#define RTMP_PACKET_SIZE_LARGE 0 // 對(duì)應(yīng)于基本塊頭的0格式
#define RTMP_PACKET_SIZE_MEDIUM 1 // 對(duì)應(yīng)于基本塊頭的1格式
#define RTMP_PACKET_SIZE_SMALL 2 // 對(duì)應(yīng)于基本塊頭的2格式
#define RTMP_PACKET_SIZE_MINIMUM 3 // 對(duì)應(yīng)于基本塊頭的3格式
4種格式的塊頭大小:
static const int packetSize[] = { 12, 8, 4, 1 };
整個(gè)塊頭的大小是不固定的秉犹,最小為1字節(jié)(基本塊頭為格式3蛉谜,且塊流ID大于1小于64時(shí)),最大為18字節(jié)(基本塊頭為格式0崇堵,且塊流ID大于319型诚,且時(shí)間戳大于0x00ffffff時(shí))。這里packetSize定義的是4種塊頭格式的消息塊頭大小+1字節(jié)的基本塊頭鸳劳。
塊頭結(jié)構(gòu)定義:
typedef struct RTMPPacket
{
uint8_t m_headerType; // 塊頭格式
uint8_t m_packetType; // 命令類型
uint8_t m_hasAbsTimestamp; // 是否絕對(duì)時(shí)間
int m_nChannel; // 塊流ID
uint32_t m_nTimeStamp; // 時(shí)間戳
int32_t m_nInfoField2; // 特殊字段狰贯,通常用于保存0x14號(hào)遠(yuǎn)程調(diào)用時(shí)的流ID
uint32_t m_nBodySize; // 負(fù)載大小
uint32_t m_nBytesRead; // 當(dāng)前讀取的負(fù)載大小,合包處理時(shí)使用
RTMPChunk *m_chunk; // 保存原始的chunk數(shù)據(jù)流
char *m_body; // 負(fù)載數(shù)據(jù)指針
} RTMPPacket;
RTMPPacket非常強(qiáng)大,它負(fù)責(zé)處理發(fā)送和接收過(guò)程中的協(xié)議解析涵紊、分包傍妒、合包等復(fù)雜邏輯。
RTMP套接字上下文:
typedef struct RTMPSockBuf
{
int sb_socket; // 套接字
int sb_size; // 緩沖區(qū)可讀大小
char *sb_start; // 緩沖區(qū)讀取位置
char sb_buf[RTMP_BUFFER_CACHE_SIZE]; // 套接字讀取緩沖區(qū)
int sb_timedout; // 超時(shí)標(biāo)志
void *sb_ssl; // TLS上下文
} RTMPSockBuf;
RTMP在與底層套接口通訊時(shí)摸柄,使用了這個(gè)與邏輯無(wú)關(guān)的讀緩沖颤练。
RTMP協(xié)議層連接上下文:
typedef struct RTMP_LNK
{
AVal hostname; // 目標(biāo)主機(jī)地址
AVal sockshost; // socks代理地址
// 連接和推拉流涉及的一些參數(shù)信息
AVal playpath0; /* parsed from URL */
AVal playpath; /* passed in explicitly */
AVal tcUrl;
AVal swfUrl;
AVal pageUrl;
AVal app;
AVal auth;
AVal flashVer;
AVal subscribepath;
AVal token;
AMFObject extras;
int edepth;
int seekTime; // 播放流的開(kāi)始時(shí)間
int stopTime; // 播放流的停止時(shí)間
#define RTMP_LF_AUTH 0x0001 /* using auth param */
#define RTMP_LF_LIVE 0x0002 /* stream is live */
#define RTMP_LF_SWFV 0x0004 /* do SWF verification */
#define RTMP_LF_PLST 0x0008 /* send playlist before play */
#define RTMP_LF_BUFX 0x0010 /* toggle stream on BufferEmpty msg */
#define RTMP_LF_FTCU 0x0020 /* free tcUrl on close */
int lFlags;
int swfAge;
int protocol; // 連接使用的協(xié)議
int timeout; // 連接超時(shí)時(shí)間
unsigned short socksport; // socks代理端口
unsigned short port; // 目標(biāo)主機(jī)端口
#ifdef CRYPTO
#define RTMP_SWF_HASHLEN 32
void *dh; /* for encryption */
void *rc4keyIn;
void *rc4keyOut;
uint32_t SWFSize;
uint8_t SWFHash[RTMP_SWF_HASHLEN];
char SWFVerificationResponse[RTMP_SWF_HASHLEN+10];
#endif
} RTMP_LNK;
RTMP_LNK包含了連接的服務(wù)器地址,以及推拉流所需的各種參數(shù)信息驱负,一些需要在連接前進(jìn)行設(shè)置嗦玖。
RTMP_Read()操作的附加上下文:
typedef struct RTMP_READ
{
char *buf; // 讀取緩沖區(qū)
char *bufpos; // 緩沖區(qū)讀取位置
unsigned int buflen; // 當(dāng)前緩沖區(qū)數(shù)據(jù)的長(zhǎng)度
uint32_t timestamp; // 讀取的最新時(shí)間戳
uint8_t dataType; // 讀取到的元數(shù)據(jù)媒體類型
uint8_t flags; // 讀取標(biāo)志集合
#define RTMP_READ_HEADER 0x01
#define RTMP_READ_RESUME 0x02
#define RTMP_READ_NO_IGNORE 0x04
#define RTMP_READ_GOTKF 0x08
#define RTMP_READ_GOTFLVK 0x10
#define RTMP_READ_SEEKING 0x20
int8_t status; // 當(dāng)前讀取的狀態(tài)
#define RTMP_READ_COMPLETE -3
#define RTMP_READ_ERROR -2
#define RTMP_READ_EOF -1
#define RTMP_READ_IGNORE 0
/* if bResume == TRUE */
uint8_t initialFrameType;
uint32_t nResumeTS;
char *metaHeader;
char *initialFrame;
uint32_t nMetaHeaderSize;
uint32_t nInitialFrameSize;
uint32_t nIgnoredFrameCounter;
uint32_t nIgnoredFlvFrameCounter;
} RTMP_READ;
RTMP_READ結(jié)構(gòu)定義了RTMP_Read()函數(shù)工作時(shí)需要的附加上下文和緩沖區(qū)。
RTMP_Read()與RTMP_ReadPacket()的主要區(qū)別是跃脊,RTMP_Read()返回的是FLV格式的流踏揣,它需要做兩層操作,首先是解RTMP協(xié)議匾乓,其次是編碼為FLV格式捞稿,而RTMP_ReadPacket()只需要執(zhí)行一步。
0x14命令遠(yuǎn)程過(guò)程調(diào)用隊(duì)列子項(xiàng):
typedef struct RTMP_METHOD
{
AVal name; // 當(dāng)前的調(diào)用過(guò)程名稱
int num; // 操作流水號(hào)拼缝,初使為1娱局,自增
} RTMP_METHOD;
處理所有RTMP操作的連接上下文:
typedef struct RTMP
{
int m_inChunkSize; // 最大接收塊大小
int m_outChunkSize; // 最大發(fā)送塊大小
int m_nBWCheckCounter; // 帶寬檢測(cè)計(jì)數(shù)器
int m_nBytesIn; // 接收數(shù)據(jù)計(jì)數(shù)器
int m_nBytesInSent; // 當(dāng)前數(shù)據(jù)已回應(yīng)計(jì)數(shù)器
int m_nBufferMS; // 當(dāng)前緩沖的時(shí)間長(zhǎng)度,以MS為單位
int m_stream_id; // 當(dāng)前連接的流ID
int m_mediaChannel; // 當(dāng)前連接媒體使用的塊流ID
uint32_t m_mediaStamp; // 當(dāng)前連接媒體最新的時(shí)間戳
uint32_t m_pauseStamp; // 當(dāng)前連接媒體暫停時(shí)的時(shí)間戳
int m_pausing; // 是否暫停狀態(tài)
int m_nServerBW; // 服務(wù)器帶寬
int m_nClientBW; // 客戶端帶寬
uint8_t m_nClientBW2; // 客戶端帶寬調(diào)節(jié)方式
uint8_t m_bPlaying; // 當(dāng)前是否推流或連接中
uint8_t m_bSendEncoding; // 連接服務(wù)器時(shí)發(fā)送編碼
uint8_t m_bSendCounter; // 設(shè)置是否向服務(wù)器發(fā)送接收字節(jié)應(yīng)答
int m_numInvokes; // 0x14命令遠(yuǎn)程過(guò)程調(diào)用計(jì)數(shù)
int m_numCalls; // 0x14命令遠(yuǎn)程過(guò)程請(qǐng)求隊(duì)列數(shù)量
RTMP_METHOD *m_methodCalls; // 遠(yuǎn)程過(guò)程調(diào)用請(qǐng)求隊(duì)列
RTMPPacket *m_vecChannelsIn[RTMP_CHANNELS]; // 對(duì)應(yīng)塊流ID上一次接收的報(bào)文
RTMPPacket *m_vecChannelsOut[RTMP_CHANNELS]; // 對(duì)應(yīng)塊流ID上一次發(fā)送的報(bào)文
int m_channelTimestamp[RTMP_CHANNELS]; // 對(duì)應(yīng)塊流ID媒體的最新時(shí)間戳
double m_fAudioCodecs; // 音頻編碼器代碼
double m_fVideoCodecs; // 視頻編碼器代碼
double m_fEncoding; /* AMF0 or AMF3 */
double m_fDuration; // 當(dāng)前媒體的時(shí)長(zhǎng)
int m_msgCounter; // 使用HTTP協(xié)議發(fā)送請(qǐng)求的計(jì)數(shù)器
int m_polling; // 使用HTTP協(xié)議接收消息主體時(shí)的位置
int m_resplen; // 使用HTTP協(xié)議接收消息主體時(shí)的未讀消息計(jì)數(shù)
int m_unackd; // 使用HTTP協(xié)議處理時(shí)無(wú)響應(yīng)的計(jì)數(shù)
AVal m_clientID; // 使用HTTP協(xié)議處理時(shí)的身份ID
RTMP_READ m_read; // RTMP_Read()操作的上下文
RTMPPacket m_write; // RTMP_Write()操作使用的可復(fù)用報(bào)文對(duì)象
RTMPSockBuf m_sb; // RTMP_ReadPacket()讀包操作的上下文
RTMP_LNK Link; // RTMP連接上下文
} RTMP;
RTMP做為整個(gè)推拉流操作的上下文咧七,從握手開(kāi)始到關(guān)閉連接衰齐,它慣穿了整個(gè)會(huì)話的生存期。
主要函數(shù)實(shí)現(xiàn):
報(bào)文操作:
RTMPPacket是librtmp收發(fā)報(bào)文的關(guān)鍵結(jié)構(gòu)继阻,基中m_body緩沖區(qū)是塊頭和負(fù)載公用的耻涛,這里有個(gè)技巧,請(qǐng)看代碼:
// 為報(bào)文結(jié)構(gòu)分配指定負(fù)載大小的內(nèi)存
int
RTMPPacket_Alloc(RTMPPacket *p, int nSize)
{
// 這里多分配了18個(gè)字節(jié)的內(nèi)存
char *ptr = calloc(1, nSize + RTMP_MAX_HEADER_SIZE);
if (!ptr)
return FALSE;
// 讓負(fù)載指向內(nèi)存的第19字節(jié)
p->m_body = ptr + RTMP_MAX_HEADER_SIZE;
p->m_nBytesRead = 0;
return TRUE;
}
// 釋放負(fù)載內(nèi)存
void
RTMPPacket_Free(RTMPPacket *p)
{
if (p->m_body)
{
free(p->m_body - RTMP_MAX_HEADER_SIZE);
p->m_body = NULL;
}
}
// 重置除負(fù)載內(nèi)存以外的其它字段
void
RTMPPacket_Reset(RTMPPacket *p)
{
p->m_headerType = 0;
p->m_packetType = 0;
p->m_nChannel = 0;
p->m_nTimeStamp = 0;
p->m_nInfoField2 = 0;
p->m_hasAbsTimestamp = FALSE;
p->m_nBodySize = 0;
p->m_nBytesRead = 0;
}
RTMPPacket_Alloc()多分配18個(gè)字節(jié)的內(nèi)存瘟檩,其好處在于抹缕,發(fā)送報(bào)文時(shí),頭部和負(fù)載可以序列化在一段連續(xù)的緩沖區(qū)墨辛,理想情況下只需要執(zhí)行一個(gè)Write調(diào)用卓研。
RTMP上下文的初使化操作:
librtmp提供RTMP上下文內(nèi)存分配、初使化睹簇、釋放的函數(shù)奏赘,雖然也不復(fù)雜,但使用標(biāo)準(zhǔn)接口是個(gè)好習(xí)慣太惠。
// 分配RTMP內(nèi)存
RTMP *
RTMP_Alloc()
{
return calloc(1, sizeof(RTMP));
}
// 釋放RTMP內(nèi)存
void
RTMP_Free(RTMP *r)
{
free(r);
}
// 初使化RTMP內(nèi)存
void
RTMP_Init(RTMP *r)
{
#ifdef CRYPTO
if (!RTMP_TLS_ctx)
RTMP_TLS_Init();
#endif
memset(r, 0, sizeof(RTMP)); // 這里將所有的內(nèi)存置0
r->m_sb.sb_socket = -1;
r->m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE; // 默認(rèn)最大接收塊限制128字節(jié)
r->m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE; // 默認(rèn)最大發(fā)送塊限制128字節(jié)
r->m_nBufferMS = 30000; // 默認(rèn)最大時(shí)長(zhǎng)緩沖設(shè)置磨淌,需通知服務(wù)器
r->m_nClientBW = 2500000; // 默認(rèn)最大客戶端帶寬
r->m_nClientBW2 = 2; // 默認(rèn)最大客戶端帶寬的調(diào)整方式
r->m_nServerBW = 2500000; // 默認(rèn)最大服務(wù)器帶寬
r->m_fAudioCodecs = 3191.0;
r->m_fVideoCodecs = 252.0;
r->Link.timeout = 30; // 默認(rèn)連接超時(shí)
r->Link.swfAge = 30;
}
參數(shù)設(shè)置:
大多數(shù)參數(shù)設(shè)置函數(shù)必須在連接服務(wù)器即RTMP_Connect()之前調(diào)用,否則可能不會(huì)生效凿渊。
// 設(shè)置推流操作選項(xiàng)梁只,這樣在RTMP_ConnectStream()操作內(nèi)部缚柳,將使用推流請(qǐng)求代替拉流請(qǐng)求。
void
RTMP_EnableWrite(RTMP *r)
{
r->Link.protocol |= RTMP_FEATURE_WRITE;
}
// 設(shè)置服務(wù)器緩存的流時(shí)間長(zhǎng)度
void
RTMP_SetBufferMS(RTMP *r, int size)
{
r->m_nBufferMS = size;
}
// 設(shè)置RTMP推拉流的完整地址
int RTMP_SetupURL(RTMP *r, char *url)
{
......
// 解析流地址
ret = RTMP_ParseURL(url, &r->Link.protocol, &r->Link.hostname,
&port, &r->Link.playpath0, &r->Link.app);
if (!ret)
return ret;
r->Link.port = port;
r->Link.playpath = r->Link.playpath0;
......
// 解析其他KV參數(shù)
while (ptr) {
*ptr++ = '\0';
p1 = ptr;
p2 = strchr(p1, '=');
if (!p2)
break;
opt.av_val = p1;
opt.av_len = p2 - p1;
*p2++ = '\0';
arg.av_val = p2;
ptr = strchr(p2, ' ');
if (ptr) {
*ptr = '\0';
arg.av_len = ptr - p2;
/* skip repeated spaces */
while(ptr[1] == ' ')
*ptr++ = '\0';
} else {
arg.av_len = strlen(p2);
}
arg.av_len = p2 - arg.av_val;
ret = RTMP_SetOpt(r, &opt, &arg);
if (!ret)
return ret;
}
......
}
連接和握手操作:
在完成選項(xiàng)設(shè)置后敛纲,接下來(lái)調(diào)用RTMP_Connect()進(jìn)行RTMP的握手操作。握手完成后剂癌,客戶端還需要主動(dòng)發(fā)送connect遠(yuǎn)程過(guò)程調(diào)用淤翔,這些操作都封裝在RTMP_Connect(),請(qǐng)看代碼:
// 對(duì)用戶開(kāi)放的RTMP連接接口
int
RTMP_Connect(RTMP *r, RTMPPacket *cp)
{
struct sockaddr_in service;
if (!r->Link.hostname.av_len)
return FALSE;
memset(&service, 0, sizeof(struct sockaddr_in));
service.sin_family = AF_INET;
// 設(shè)置直接連接的服務(wù)器地址
if (r->Link.socksport)
{
/* Connect via SOCKS */
if (!add_addr_info(&service, &r->Link.sockshost, r->Link.socksport))
return FALSE;
}
else
{
/* Connect directly */
if (!add_addr_info(&service, &r->Link.hostname, r->Link.port))
return FALSE;
}
// 發(fā)起網(wǎng)絡(luò)連接
if (!RTMP_Connect0(r, (struct sockaddr *)&service))
return FALSE;
r->m_bSendCounter = TRUE;
// 發(fā)起握手協(xié)商
return RTMP_Connect1(r, cp);
}
// 執(zhí)行基礎(chǔ)網(wǎng)絡(luò)連接
int
RTMP_Connect0(RTMP *r, struct sockaddr * service)
{
int on = 1;
r->m_sb.sb_timedout = FALSE;
r->m_pausing = 0;
r->m_fDuration = 0.0;
// 創(chuàng)建套接字
r->m_sb.sb_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (r->m_sb.sb_socket != -1)
{
// 連接對(duì)端
if (connect(r->m_sb.sb_socket, service, sizeof(struct sockaddr)) < 0)
{
int err = GetSockError();
RTMP_Log(RTMP_LOGERROR, "%s, failed to connect socket. %d (%s)",
__FUNCTION__, err, strerror(err));
RTMP_Close(r);
return FALSE;
}
// 執(zhí)行Socks協(xié)商
if (r->Link.socksport)
{
RTMP_Log(RTMP_LOGDEBUG, "%s ... SOCKS negotiation", __FUNCTION__);
if (!SocksNegotiate(r))
{
RTMP_Log(RTMP_LOGERROR, "%s, SOCKS negotiation failed.", __FUNCTION__);
RTMP_Close(r);
return FALSE;
}
}
}
else
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to create socket. Error: %d", __FUNCTION__,
GetSockError());
return FALSE;
}
// 設(shè)置接收網(wǎng)絡(luò)超時(shí)
{
SET_RCVTIMEO(tv, r->Link.timeout);
if (setsockopt
(r->m_sb.sb_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)))
{
RTMP_Log(RTMP_LOGERROR, "%s, Setting socket timeout to %ds failed!",
__FUNCTION__, r->Link.timeout);
}
}
setsockopt(r->m_sb.sb_socket, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on));
return TRUE;
}
// 繼續(xù)執(zhí)行SSL或HTTP協(xié)商佩谷,以及RTMP握手
int
RTMP_Connect1(RTMP *r, RTMPPacket *cp)
{
// SSL握手處理
if (r->Link.protocol & RTMP_FEATURE_SSL)
{
#if defined(CRYPTO) && !defined(NO_SSL)
TLS_client(RTMP_TLS_ctx, r->m_sb.sb_ssl);
TLS_setfd(r->m_sb.sb_ssl, r->m_sb.sb_socket);
if (TLS_connect(r->m_sb.sb_ssl) < 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, TLS_Connect failed", __FUNCTION__);
RTMP_Close(r);
return FALSE;
}
#else
RTMP_Log(RTMP_LOGERROR, "%s, no SSL/TLS support", __FUNCTION__);
RTMP_Close(r);
return FALSE;
#endif
}
// HTTP代理協(xié)商
if (r->Link.protocol & RTMP_FEATURE_HTTP)
{
r->m_msgCounter = 1;
r->m_clientID.av_val = NULL;
r->m_clientID.av_len = 0;
HTTP_Post(r, RTMPT_OPEN, "", 1);
HTTP_read(r, 1);
r->m_msgCounter = 0;
}
RTMP_Log(RTMP_LOGDEBUG, "%s, ... connected, handshaking", __FUNCTION__);
// RTMP握手
if (!HandShake(r, TRUE))
{
RTMP_Log(RTMP_LOGERROR, "%s, handshake failed.", __FUNCTION__);
RTMP_Close(r);
return FALSE;
}
RTMP_Log(RTMP_LOGDEBUG, "%s, handshaked", __FUNCTION__);
// 發(fā)送第一個(gè)連接報(bào)文
if (!SendConnectPacket(r, cp))
{
RTMP_Log(RTMP_LOGERROR, "%s, RTMP connect failed.", __FUNCTION__);
RTMP_Close(r);
return FALSE;
}
return TRUE;
}
// SOCKS協(xié)商處理
static int
SocksNegotiate(RTMP *r)
{
unsigned long addr;
struct sockaddr_in service;
memset(&service, 0, sizeof(struct sockaddr_in));
add_addr_info(&service, &r->Link.hostname, r->Link.port);
addr = htonl(service.sin_addr.s_addr);
{
char packet[] = {
4, 1, /* SOCKS 4, connect */
(r->Link.port >> 8) & 0xFF,
(r->Link.port) & 0xFF,
(char)(addr >> 24) & 0xFF, (char)(addr >> 16) & 0xFF,
(char)(addr >> 8) & 0xFF, (char)addr & 0xFF,
0
}; /* NULL terminate */
WriteN(r, packet, sizeof packet);
if (ReadN(r, packet, 8) != 8)
return FALSE;
if (packet[0] == 0 && packet[1] == 90)
{
return TRUE;
}
else
{
RTMP_Log(RTMP_LOGERROR, "%s, SOCKS returned error code %d", packet[1]);
return FALSE;
}
}
}
connect遠(yuǎn)程調(diào)用:
在前面RTMP_Connect1()的最后一步旁壮,調(diào)用了SendConnectPacket()這個(gè)函數(shù),實(shí)際上的用途就是發(fā)起0x14命令connect遠(yuǎn)程調(diào)用谐檀,代碼摘錄并簡(jiǎn)化如下:
static int
SendConnectPacket(RTMP *r, RTMPPacket *cp)
{
RTMPPacket packet;
char pbuf[4096], *pend = pbuf + sizeof(pbuf);
char *enc;
// 填寫(xiě)塊頭字段
packet.m_nChannel = 0x03; /* control channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14; /* INVOKE */
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
enc = packet.m_body;
// 壓入connect命令和操作流水號(hào)
enc = AMF_EncodeString(enc, pend, &av_connect);
enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
*enc++ = AMF_OBJECT;
// 壓入對(duì)象的各個(gè)屬性
enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app);
if (!enc)
return FALSE;
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
enc = AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate);
if (!enc)
return FALSE;
}
if (r->Link.flashVer.av_len)
{
enc = AMF_EncodeNamedString(enc, pend, &av_flashVer, &r->Link.flashVer);
if (!enc)
return FALSE;
}
......
// 壓入屬性結(jié)束標(biāo)記
*enc++ = 0;
*enc++ = 0; /* end of object - 0x00 0x00 0x09 */
*enc++ = AMF_OBJECT_END;
packet.m_nBodySize = enc - packet.m_body;
// 發(fā)送報(bào)文抡谐,并記入應(yīng)答隊(duì)列
return RTMP_SendPacket(r, &packet, TRUE);
}
然后,我們?cè)倏纯碦TMP_SendPacket()的實(shí)現(xiàn)桐猬,代碼摘錄并簡(jiǎn)化如下(相關(guān)邏輯的解釋添加在源碼中):
int
RTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue)
{
// 取出對(duì)應(yīng)塊流ID上一次發(fā)送的報(bào)文
const RTMPPacket *prevPacket = r->m_vecChannelsOut[packet->m_nChannel];
uint32_t last = 0;
int nSize;
int hSize, cSize;
char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c;
......
// 嘗試對(duì)非LARGE報(bào)文進(jìn)行字段壓縮
if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE)
{
// MEDIUM報(bào)文可以嘗試壓縮為SMALL報(bào)文
if (prevPacket->m_nBodySize == packet->m_nBodySize
&& prevPacket->m_packetType == packet->m_packetType
&& packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM)
packet->m_headerType = RTMP_PACKET_SIZE_SMALL;
// MALL報(bào)文可以嘗試壓縮為MINIMUM報(bào)文
if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp
&& packet->m_headerType == RTMP_PACKET_SIZE_SMALL)
packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM;
last = prevPacket->m_nTimeStamp;
}
......
// 根據(jù)壓縮后的報(bào)文類型預(yù)設(shè)報(bào)頭大小
nSize = packetSize[packet->m_headerType];
hSize = nSize; cSize = 0;
t = packet->m_nTimeStamp - last;
// 預(yù)設(shè)報(bào)頭的緩沖區(qū)
if (packet->m_body)
{
header = packet->m_body - nSize;
hend = packet->m_body;
}
else
{
header = hbuf + 6;
hend = hbuf + sizeof(hbuf);
}
// 計(jì)算基本頭的擴(kuò)充大小
if (packet->m_nChannel > 319)
cSize = 2;
else if (packet->m_nChannel > 63)
cSize = 1;
if (cSize)
{
header -= cSize;
hSize += cSize;
}
// 根據(jù)時(shí)間戳計(jì)算是否需要擴(kuò)充頭大小
if (nSize > 1 && t >= 0xffffff)
{
header -= 4;
hSize += 4;
}
// 向緩沖區(qū)壓入基本頭
hptr = header;
c = packet->m_headerType << 6;
switch (cSize)
{
case 0:
c |= packet->m_nChannel;
break;
case 1:
break;
case 2:
c |= 1;
break;
}
*hptr++ = c;
if (cSize)
{
int tmp = packet->m_nChannel - 64;
*hptr++ = tmp & 0xff;
if (cSize == 2)
*hptr++ = tmp >> 8;
}
// 向緩沖區(qū)壓入時(shí)間戳
if (nSize > 1)
{
hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t);
}
// 向緩沖區(qū)壓入負(fù)載大小和報(bào)文類型
if (nSize > 4)
{
hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize);
*hptr++ = packet->m_packetType;
}
// 向緩沖區(qū)壓入流ID
if (nSize > 8)
hptr += EncodeInt32LE(hptr, packet->m_nInfoField2);
// 向緩沖區(qū)壓入擴(kuò)展時(shí)間戳
if (nSize > 1 && t >= 0xffffff)
hptr = AMF_EncodeInt32(hptr, hend, t);
nSize = packet->m_nBodySize;
buffer = packet->m_body;
nChunkSize = r->m_outChunkSize;
// 當(dāng)數(shù)據(jù)未發(fā)送完成時(shí)
while (nSize + hSize)
{
int wrote;
// 一次發(fā)送的最大負(fù)載限制為塊大小
if (nSize < nChunkSize)
nChunkSize = nSize;
// 發(fā)送一個(gè)塊
wrote = WriteN(r, header, nChunkSize + hSize);
if (!wrote)
return FALSE;
// 可能有分塊麦撵,只有部分負(fù)載發(fā)送成功
nSize -= nChunkSize;
buffer += nChunkSize;
hSize = 0;
// 若只有部分負(fù)載發(fā)送成功,則需繼續(xù)構(gòu)造塊再次發(fā)送
if (nSize > 0)
{
// 只需要構(gòu)造3號(hào)類型的塊頭
header = buffer - 1;
hSize = 1;
if (cSize)
{
header -= cSize;
hSize += cSize;
}
*header = (0xc0 | c);
if (cSize)
{
int tmp = packet->m_nChannel - 64;
header[1] = tmp & 0xff;
if (cSize == 2)
header[2] = tmp >> 8;
}
}
}
// 如果是0x14遠(yuǎn)程調(diào)用溃肪,則需要解出調(diào)用名稱免胃,加入等待響應(yīng)的隊(duì)列中
if (packet->m_packetType == 0x14)
{
AVal method;
char *ptr;
ptr = packet->m_body + 1;
AMF_DecodeString(ptr, &method);
if (queue) {
int txn;
ptr += 3 + method.av_len;
txn = (int)AMF_DecodeNumber(ptr);
AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn);
}
}
// 記錄這個(gè)塊流ID剛剛發(fā)送的報(bào)文,但是應(yīng)忽略負(fù)載
if (!r->m_vecChannelsOut[packet->m_nChannel])
r->m_vecChannelsOut[packet->m_nChannel] = malloc(sizeof(RTMPPacket));
memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket));
return TRUE;
}
connect遠(yuǎn)程調(diào)用響應(yīng)_result:
connect報(bào)文發(fā)出后惫撰,這時(shí)客戶端會(huì)陷入等待狀態(tài)羔沙,必須接收到服務(wù)器的_result響應(yīng)才能執(zhí)行后繼的流程。librtmp庫(kù)將等待響應(yīng)的過(guò)程封裝了厨钻,對(duì)使用者展現(xiàn)一個(gè)RTMP_ConnectStream()函數(shù)調(diào)用扼雏,代碼如下:
int
RTMP_ConnectStream(RTMP *r, int seekTime)
{
RTMPPacket packet = { 0 };
// 設(shè)置起始時(shí)間定位
if (seekTime > 0)
r->Link.seekTime = seekTime;
r->m_mediaChannel = 0;
// 循環(huán)讀取報(bào)文并等待完成推流或拉流的交互準(zhǔn)備工作
while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet))
{
// 報(bào)文可讀
if (RTMPPacket_IsReady(&packet))
{
if (!packet.m_nBodySize)
continue;
// 在所有的交互操作準(zhǔn)備好之前,過(guò)濾非法的音視頻報(bào)文
if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) ||
(packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) ||
(packet.m_packetType == RTMP_PACKET_TYPE_INFO))
{
RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring.");
RTMPPacket_Free(&packet);
continue;
}
// 進(jìn)行準(zhǔn)備工作期間的報(bào)文的分派處理
RTMP_ClientPacket(r, &packet);
RTMPPacket_Free(&packet);
}
}
// 返回是否準(zhǔn)備好推拉流
return r->m_bPlaying;
}
在RTMP_ConnectStream()處理交互準(zhǔn)備的過(guò)程中夯膀,有兩個(gè)重要函數(shù):RTMP_ReadPacket()負(fù)責(zé)接收?qǐng)?bào)文诗充,RTMP_ClientPacket()負(fù)責(zé)邏輯的分派處理。先看RTMP_ReadPacket()代碼:
int
RTMP_ReadPacket(RTMP *r, RTMPPacket *packet)
{
uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };
char *header = (char *)hbuf;
int nSize, hSize, nToRead, nChunk;
int didAlloc = FALSE;
// 讀取基本塊頭的首個(gè)字節(jié)
if (ReadN(r, (char *)hbuf, 1) == 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__);
return FALSE;
}
// 解析基本塊頭的首個(gè)字節(jié)诱建,取得報(bào)頭類型和塊流ID
packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
packet->m_nChannel = (hbuf[0] & 0x3f);
header++;
if (packet->m_nChannel == 0)
{
// 2字節(jié)基本頭其障,繼續(xù)讀取1個(gè)字節(jié)
if (ReadN(r, (char *)&hbuf[1], 1) != 1)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte",
__FUNCTION__);
return FALSE;
}
packet->m_nChannel = hbuf[1];
packet->m_nChannel += 64;
header++;
}
else if (packet->m_nChannel == 1)
{
// 3字節(jié)基本頭,繼續(xù)讀取2個(gè)字節(jié)
int tmp;
if (ReadN(r, (char *)&hbuf[1], 2) != 2)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte",
__FUNCTION__);
return FALSE;
}
tmp = (hbuf[2] << 8) + hbuf[1];
packet->m_nChannel = tmp + 64;
RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);
header += 2;
}
// 根據(jù)報(bào)頭類型取得塊頭長(zhǎng)度
nSize = packetSize[packet->m_headerType];
// 如果是標(biāo)準(zhǔn)大頭涂佃,設(shè)置時(shí)間戳為絕對(duì)的
if (nSize == RTMP_LARGE_HEADER_SIZE)
packet->m_hasAbsTimestamp = TRUE;
// 如果非標(biāo)準(zhǔn)大頭励翼,首次嘗試拷貝上一次的報(bào)頭
else if (nSize < RTMP_LARGE_HEADER_SIZE)
{
// 這里的拷貝操作有可能取得上次的分塊報(bào)文,然后繼續(xù)后續(xù)塊的接收合并工作
if (r->m_vecChannelsIn[packet->m_nChannel])
memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel],
sizeof(RTMPPacket));
}
// 計(jì)算消息塊頭(主體塊頭)大小
nSize--;
// 讀取消息塊頭
if (nSize > 0 && ReadN(r, header, nSize) != nSize)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x",
__FUNCTION__, (unsigned int)hbuf[0]);
return FALSE;
}
// 計(jì)算基本塊頭+消息塊頭的大小
hSize = nSize + (header - (char *)hbuf);
if (nSize >= 3)
{
// 解析時(shí)間戳
packet->m_nTimeStamp = AMF_DecodeInt24(header);
if (nSize >= 6)
{
// 解析負(fù)載長(zhǎng)度
packet->m_nBodySize = AMF_DecodeInt24(header + 3);
packet->m_nBytesRead = 0;
RTMPPacket_Free(packet);
if (nSize > 6)
{
// 解析包類型
packet->m_packetType = header[6];
// 解析流ID
if (nSize == 11)
packet->m_nInfoField2 = DecodeInt32LE(header + 7);
}
}
// 讀取擴(kuò)展時(shí)間戳并解析
if (packet->m_nTimeStamp == 0xffffff)
{
if (ReadN(r, header + nSize, 4) != 4)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp",
__FUNCTION__);
return FALSE;
}
packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);
hSize += 4;
}
}
// 負(fù)載非0辜荠,需要分配內(nèi)存汽抚,或第一個(gè)分塊的初使化工作
if (packet->m_nBodySize > 0 && packet->m_body == NULL)
{
if (!RTMPPacket_Alloc(packet, packet->m_nBodySize))
{
RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);
return FALSE;
}
didAlloc = TRUE;
packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
}
// 準(zhǔn)備讀取的數(shù)據(jù)和塊大小
nToRead = packet->m_nBodySize - packet->m_nBytesRead;
nChunk = r->m_inChunkSize;
if (nToRead < nChunk)
nChunk = nToRead;
// 如果packet->m_chunk非空,拷貝當(dāng)前塊的相關(guān)信息
if (packet->m_chunk)
{
packet->m_chunk->c_headerSize = hSize;
memcpy(packet->m_chunk->c_header, hbuf, hSize);
packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead;
packet->m_chunk->c_chunkSize = nChunk;
}
// 讀取負(fù)載到緩沖區(qū)中
if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %lu",
__FUNCTION__, packet->m_nBodySize);
return FALSE;
}
RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk);
packet->m_nBytesRead += nChunk;
// 保存當(dāng)前塊流ID最新的報(bào)文伯病,與RTMP_SendPacket()不同的是造烁,負(fù)載部分也被保存了否过,以應(yīng)對(duì)不完整的分塊報(bào)文
if (!r->m_vecChannelsIn[packet->m_nChannel])
r->m_vecChannelsIn[packet->m_nChannel] = malloc(sizeof(RTMPPacket));
memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));
// 若報(bào)文負(fù)載接收完整
if (RTMPPacket_IsReady(packet))
{
// 處理增量時(shí)間戳
if (!packet->m_hasAbsTimestamp)
packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel];
// 保存當(dāng)前塊流ID的時(shí)間戳
r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;
// 清理上下文中當(dāng)前塊流ID最新的報(bào)文的負(fù)載信息
r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;
r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;
r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE;
}
else
{
// 若報(bào)文不完整,不將分片負(fù)載向上拋給應(yīng)用惭蟋,以免引起使用誤解
packet->m_body = NULL;
}
return TRUE;
}
再看RTMP_ClientPacket()源碼:
int
RTMP_ClientPacket(RTMP *r, RTMPPacket *packet)
{
// 根據(jù)命令類型進(jìn)行分派處理
int bHasMediaPacket = 0;
switch (packet->m_packetType)
{
case 0x01:
// 更新接收處理時(shí)的塊限制
HandleChangeChunkSize(r, packet);
break;
case 0x03:
// 對(duì)端反饋的已讀大小
RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__);
break;
case 0x04:
// 處理對(duì)端發(fā)送的控制報(bào)文
HandleCtrl(r, packet);
break;
case 0x05:
// 處理對(duì)端發(fā)送的應(yīng)答窗口大小苗桂,這里由服務(wù)器發(fā)送,即告之客戶端收到對(duì)應(yīng)大小的數(shù)據(jù)后應(yīng)發(fā)送反饋
HandleServerBW(r, packet);
break;
case 0x06:
// 處理對(duì)端發(fā)送的設(shè)置發(fā)送帶寬大小告组,這里由服務(wù)器發(fā)送煤伟,即設(shè)置客戶端的發(fā)送帶寬
HandleClientBW(r, packet);
break;
case 0x08:
// 處理音頻數(shù)據(jù)
HandleAudio(r, packet);
bHasMediaPacket = 1;
if (!r->m_mediaChannel)
r->m_mediaChannel = packet->m_nChannel;
if (!r->m_pausing)
r->m_mediaStamp = packet->m_nTimeStamp;
break;
case 0x09:
// 處理視頻數(shù)據(jù)
HandleVideo(r, packet);
bHasMediaPacket = 1;
if (!r->m_mediaChannel)
r->m_mediaChannel = packet->m_nChannel;
if (!r->m_pausing)
r->m_mediaStamp = packet->m_nTimeStamp;
break;
case 0x12:
// 處理媒體元數(shù)據(jù)
if (HandleMetadata(r, packet->m_body, packet->m_nBodySize))
bHasMediaPacket = 1;
break;
case 0x14:
// 處理遠(yuǎn)程調(diào)用
if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1)
bHasMediaPacket = 2;
break;
}
// 返回值為1表示推拉流正在正作中,為2表示已經(jīng)停止
return bHasMediaPacket;
}
在RTMP_ConnectStream()中木缝,RTMP_ReadPacket()接收的報(bào)文便锨,交給RTMP_ClientPacket()進(jìn)行分派。connect遠(yuǎn)程調(diào)用發(fā)出后的_result響應(yīng)我碟,也屬于0x14命令放案,我們需要繼續(xù)解析HandleInvoke()這個(gè)函數(shù),代碼如下:
static int
HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize)
{
AMFObject obj;
AVal method;
int txn;
int ret = 0, nRes;
// 確保響應(yīng)報(bào)文是0x14的命令字
if (body[0] != 0x02)
{
RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet",
__FUNCTION__);
return 0;
}
// 將各參數(shù)以無(wú)名稱的對(duì)象屬性方式進(jìn)行解析
nRes = AMF_Decode(&obj, body, nBodySize, FALSE);
if (nRes < 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
return 0;
}
AMF_Dump(&obj);
// 獲取過(guò)程名稱和流水號(hào)
AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
txn = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val);
// 過(guò)程名稱為_(kāi)result
if (AVMATCH(&method, &av__result))
{
AVal methodInvoked = {0};
int i;
// 刪除請(qǐng)求隊(duì)列中的流水項(xiàng)
for (i=0; i<r->m_numCalls; i++) {
if (r->m_methodCalls[i].num == txn) {
methodInvoked = r->m_methodCalls[i].name;
AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE);
break;
}
}
if (!methodInvoked.av_val) {
RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %d without matching request",
__FUNCTION__, txn);
goto leave;
}
RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__,
methodInvoked.av_val);
// 找到了連接請(qǐng)求矫俺,確認(rèn)是連接響應(yīng)
if (AVMATCH(&methodInvoked, &av_connect))
{
// 客戶端推流
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
// 通知服務(wù)器釋放流通道和清理推流資源
SendReleaseStream(r);
SendFCPublish(r);
}
// 客戶端拉流
else
{
// 設(shè)置服務(wù)器的應(yīng)答窗口大小
RTMP_SendServerBW(r);
RTMP_SendCtrl(r, 3, 0, 300);
}
// 發(fā)送創(chuàng)建流通道請(qǐng)求
RTMP_SendCreateStream(r);
}
// 找到了創(chuàng)建流請(qǐng)求吱殉,確認(rèn)是創(chuàng)建流的響應(yīng)
else if (AVMATCH(&methodInvoked, &av_createStream))
{
// 從響應(yīng)中取流ID
r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
// 客戶端推流
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
// 發(fā)送推流點(diǎn)
SendPublish(r);
}
// 客戶端拉流
else
{
// 發(fā)送拉流點(diǎn)
SendPlay(r);
// 發(fā)送拉流緩沖時(shí)長(zhǎng)
RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS);
}
}
// 找到了推流和拉流請(qǐng)求,確認(rèn)是它們的響應(yīng)
else if (AVMATCH(&methodInvoked, &av_play) ||
AVMATCH(&methodInvoked, &av_publish))
{
// 標(biāo)識(shí)已經(jīng)進(jìn)入流狀態(tài)
r->m_bPlaying = TRUE;
}
free(methodInvoked.av_val);
}
// 過(guò)程名稱為ping
else if (AVMATCH(&method, &av_ping))
{
// 發(fā)送pong響應(yīng)
SendPong(r, txn);
}
// 過(guò)程名稱為_(kāi)error
else if (AVMATCH(&method, &av__error))
{
RTMP_Log(RTMP_LOGERROR, "rtmp server sent error");
}
// 過(guò)程名稱為close
else if (AVMATCH(&method, &av_close))
{
RTMP_Log(RTMP_LOGERROR, "rtmp server requested close");
RTMP_Close(r);
}
// 過(guò)程名稱為onStatus
else if (AVMATCH(&method, &av_onStatus))
{
// 獲取返回對(duì)象及其主要屬性
AMFObject obj2;
AVal code, level;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);
// 出錯(cuò)返回
if (AVMATCH(&code, &av_NetStream_Failed)
|| AVMATCH(&code, &av_NetStream_Play_Failed)
|| AVMATCH(&code, &av_NetStream_Play_StreamNotFound)
|| AVMATCH(&code, &av_NetConnection_Connect_InvalidApp))
{
r->m_stream_id = -1;
RTMP_Close(r);
RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val);
}
// 啟動(dòng)拉流成功
else if (AVMATCH(&code, &av_NetStream_Play_Start))
{
int i;
r->m_bPlaying = TRUE;
for (i = 0; i < r->m_numCalls; i++)
{
if (AVMATCH(&r->m_methodCalls[i].name, &av_play))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
// 啟動(dòng)推流成功
else if (AVMATCH(&code, &av_NetStream_Publish_Start))
{
int i;
r->m_bPlaying = TRUE;
for (i = 0; i < r->m_numCalls; i++)
{
if (AVMATCH(&r->m_methodCalls[i].name, &av_publish))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
// 通知流完成或結(jié)束
else if (AVMATCH(&code, &av_NetStream_Play_Complete)
|| AVMATCH(&code, &av_NetStream_Play_Stop)
|| AVMATCH(&code, &av_NetStream_Play_UnpublishNotify))
{
RTMP_Close(r);
ret = 1;
}
// 通知流暫停
else if (AVMATCH(&code, &av_NetStream_Pause_Notify))
{
if (r->m_pausing == 1 || r->m_pausing == 2)
{
RTMP_SendPause(r, FALSE, r->m_pauseStamp);
r->m_pausing = 3;
}
}
}
leave:
AMF_Reset(&obj);
return ret;
}
上面的注釋顯示厘托,在HandleInvoke()函數(shù)的處理分支中考婴,有涉及connect調(diào)用的_result處理。在收到_result響應(yīng)后催烘,又根據(jù)推流或拉流的標(biāo)志沥阱,繼續(xù)后續(xù)的流程。
HandleInvoke()函數(shù)非常強(qiáng)大伊群,除了對(duì)各種_result進(jìn)行處理外考杉,它還支持onStatus操作,后續(xù)的createStream響應(yīng)舰始,publish和play推拉流的狀態(tài)反饋崇棠,都會(huì)集中在這里處理。
connect遠(yuǎn)程調(diào)用的其它交互:
connect和_result交互是必須的丸卷,事實(shí)上枕稀,在_result返回之前,服務(wù)器還可以先返回一些可選的設(shè)置報(bào)文谜嫉,例如:
服務(wù)器設(shè)置客戶端的應(yīng)答窗口大形馈:
int
RTMP_ClientPacket(RTMP *r, RTMPPacket *packet)
{
......
switch (packet->m_packetType)
{
......
case 0x05:
/* server bw */
HandleServerBW(r, packet);
break;
......
}
static void
HandleServerBW(RTMP *r, const RTMPPacket *packet)
{
r->m_nServerBW = AMF_DecodeInt32(packet->m_body);
RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, r->m_nServerBW);
}
服務(wù)器設(shè)置客戶端的發(fā)送帶寬大小:
int
RTMP_ClientPacket(RTMP *r, RTMPPacket *packet)
{
......
switch (packet->m_packetType)
{
......
case 0x06:
/* client bw */
HandleClientBW(r, packet);
break;
......
}
static void
HandleClientBW(RTMP *r, const RTMPPacket *packet)
{
r->m_nClientBW = AMF_DecodeInt32(packet->m_body);
if (packet->m_nBodySize > 4)
r->m_nClientBW2 = packet->m_body[4];
else
r->m_nClientBW2 = -1;
RTMP_Log(RTMP_LOGDEBUG, "%s: client BW = %d %d", __FUNCTION__, r->m_nClientBW,
r->m_nClientBW2);
}
服務(wù)器設(shè)置客戶端的接收塊大秀謇肌:
int
RTMP_ClientPacket(RTMP *r, RTMPPacket *packet)
{
......
switch (packet->m_packetType)
{
case 0x01:
/* chunk size */
HandleChangeChunkSize(r, packet);
break;
......
}
static void
HandleChangeChunkSize(RTMP *r, const RTMPPacket *packet)
{
if (packet->m_nBodySize >= 4)
{
r->m_inChunkSize = AMF_DecodeInt32(packet->m_body);
RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__,
r->m_inChunkSize);
}
}
createStream遠(yuǎn)程調(diào)用:
在成功處理connect的_result響應(yīng)之后哆档,即表示服務(wù)器接收了客戶端的第一步的地址請(qǐng)求,接下來(lái)客戶端需要根據(jù)推流或拉流的場(chǎng)景住闯,發(fā)起后續(xù)的請(qǐng)求了瓜浸,精簡(jiǎn)HandleInvoke()后的代碼如下:
static int
HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize)
{
if (AVMATCH(&method, &av__result))
{
......
if (AVMATCH(&methodInvoked, &av_connect))
{
// 判斷推流標(biāo)志是否設(shè)置
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
// 推流準(zhǔn)備(必須)
SendReleaseStream(r);
SendFCPublish(r);
}
else
{
// 拉流準(zhǔn)備(可選)
RTMP_SendServerBW(r);
RTMP_SendCtrl(r, 3, 0, 300);
}
// 創(chuàng)建流通道
RTMP_SendCreateStream(r);
}
}
......
}
上面的代碼和注釋顯示澳淑,無(wú)論是推流還是拉流,都需要?jiǎng)?chuàng)建流通道插佛,但是在之前吏饿,有一些不同的額外操作要設(shè)置歼郭。
推流的額外操作:
推流之前含滴,需要先釋放掉當(dāng)前的推流點(diǎn)的資源甥材,并且準(zhǔn)備好新的推流點(diǎn)。
// 發(fā)送釋放推流點(diǎn)請(qǐng)求
static int
SendReleaseStream(RTMP *r)
{
RTMPPacket packet;
char pbuf[1024], *pend = pbuf + sizeof(pbuf);
char *enc;
packet.m_nChannel = 0x03; /* control channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
packet.m_packetType = 0x14; /* INVOKE */
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
// 壓入遠(yuǎn)程過(guò)程調(diào)用的參數(shù)谢床,尤其是推流點(diǎn)
enc = packet.m_body;
enc = AMF_EncodeString(enc, pend, &av_releaseStream);
enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
*enc++ = AMF_NULL;
enc = AMF_EncodeString(enc, pend, &r->Link.playpath); // 推流點(diǎn)
if (!enc)
return FALSE;
packet.m_nBodySize = enc - packet.m_body;
return RTMP_SendPacket(r, &packet, FALSE);
}
// 發(fā)送準(zhǔn)備推流點(diǎn)請(qǐng)求
static int
SendFCPublish(RTMP *r)
{
RTMPPacket packet;
char pbuf[1024], *pend = pbuf + sizeof(pbuf);
char *enc;
packet.m_nChannel = 0x03; /* control channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
packet.m_packetType = 0x14; /* INVOKE */
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
// 壓入遠(yuǎn)程過(guò)程調(diào)用的參數(shù)兄一,尤其是推流點(diǎn)
enc = packet.m_body;
enc = AMF_EncodeString(enc, pend, &av_FCPublish);
enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
*enc++ = AMF_NULL;
enc = AMF_EncodeString(enc, pend, &r->Link.playpath); // 推流點(diǎn)
if (!enc)
return FALSE;
packet.m_nBodySize = enc - packet.m_body;
return RTMP_SendPacket(r, &packet, FALSE);
}
拉流的額外操作:
通知服務(wù)器收到指定大小的客戶端數(shù)據(jù)后厘线,需要發(fā)送應(yīng)答识腿。拉流時(shí)客戶端基本不發(fā)送流量,個(gè)人感覺(jué)這步有些多余造壮。
int
RTMP_SendServerBW(RTMP *r)
{
RTMPPacket packet;
char pbuf[256], *pend = pbuf + sizeof(pbuf);
packet.m_nChannel = 0x02; /* control channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x05; /* Server BW */
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
packet.m_nBodySize = 4;
// 壓入4字節(jié)帶寬
AMF_EncodeInt32(packet.m_body, pend, r->m_nServerBW);
return RTMP_SendPacket(r, &packet, FALSE);
}
// 控制報(bào)文的含義比較多渡讼,具體也需要查看手冊(cè),對(duì)應(yīng)拉流來(lái)說(shuō)耳璧,主要是設(shè)置服務(wù)器的緩存時(shí)間成箫。
int
RTMP_SendCtrl(RTMP *r, short nType, unsigned int nObject, unsigned int nTime)
{
RTMPPacket packet;
char pbuf[256], *pend = pbuf + sizeof(pbuf);
int nSize;
char *buf;
packet.m_nChannel = 0x02; /* control channel (ping) */
packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
packet.m_packetType = 0x04; /* ctrl */
packet.m_nTimeStamp = 0; /* RTMP_GetTime(); */
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
switch(nType) {
case 0x03: nSize = 10; break; /* buffer time */
case 0x1A: nSize = 3; break; /* SWF verify request */
case 0x1B: nSize = 44; break; /* SWF verify response */
default: nSize = 6; break;
}
packet.m_nBodySize = nSize;
buf = packet.m_body;
buf = AMF_EncodeInt16(buf, pend, nType);
if (nType == 0x1B)
{
......
}
else if (nType == 0x1A)
{
......
}
else
{
if (nSize > 2)
buf = AMF_EncodeInt32(buf, pend, nObject);
if (nSize > 6)
buf = AMF_EncodeInt32(buf, pend, nTime);
}
return RTMP_SendPacket(r, &packet, FALSE);
}
創(chuàng)建流通道操作:
// 發(fā)送創(chuàng)建流請(qǐng)求
int
RTMP_SendCreateStream(RTMP *r)
{
RTMPPacket packet;
char pbuf[256], *pend = pbuf + sizeof(pbuf);
char *enc;
packet.m_nChannel = 0x03; /* control channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
packet.m_packetType = 0x14; /* INVOKE */
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
// 壓入遠(yuǎn)程過(guò)程調(diào)用的參數(shù)
enc = packet.m_body;
enc = AMF_EncodeString(enc, pend, &av_createStream);
enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
*enc++ = AMF_NULL; /* NULL */
packet.m_nBodySize = enc - packet.m_body;
return RTMP_SendPacket(r, &packet, TRUE);
}
createStream遠(yuǎn)程調(diào)用響應(yīng):
服務(wù)器收到createStream請(qǐng)求后,若無(wú)錯(cuò)誤旨枯,則返回流ID蹬昌,有了流ID客戶端就可以進(jìn)行后續(xù)的推流或拉流了,邏輯如下:
static int
HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize)
{
if (AVMATCH(&method, &av__result))
{
......
else if (AVMATCH(&methodInvoked, &av_createStream))
{
// 保存流ID
r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
// 開(kāi)始推流
SendPublish(r);
}
else
{
// 開(kāi)始拉流
SendPlay(r);
// 控制該流的緩沖時(shí)長(zhǎng)
RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS);
}
}
......
}
}
上面的代碼和注釋顯示攀隔,推流和拉流皂贩,分別進(jìn)行不同的邏輯處理。
推流處理publish調(diào)用:
對(duì)推流來(lái)說(shuō)昆汹,客戶端需要請(qǐng)求服務(wù)器將流ID和推送點(diǎn)進(jìn)行關(guān)聯(lián)明刷。請(qǐng)求代碼如下:
static int
SendPublish(RTMP *r)
{
RTMPPacket packet;
char pbuf[1024], *pend = pbuf + sizeof(pbuf);
char *enc;
packet.m_nChannel = 0x04; /* source channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14; /* INVOKE */
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = r->m_stream_id; // 指定流ID
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
enc = packet.m_body;
enc = AMF_EncodeString(enc, pend, &av_publish);
enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
*enc++ = AMF_NULL;
enc = AMF_EncodeString(enc, pend, &r->Link.playpath); // 指定推送點(diǎn)
if (!enc)
return FALSE;
/* FIXME: should we choose live based on Link.lFlags & RTMP_LF_LIVE? */
enc = AMF_EncodeString(enc, pend, &av_live);
if (!enc)
return FALSE;
packet.m_nBodySize = enc - packet.m_body;
return RTMP_SendPacket(r, &packet, TRUE); // 需要反饋
}
拉流處理play調(diào)用:
對(duì)拉流來(lái)說(shuō),客戶端需要請(qǐng)求服務(wù)器將流ID和播放點(diǎn)進(jìn)行關(guān)聯(lián)满粗。請(qǐng)求代碼如下:
static int
SendPlay(RTMP *r)
{
RTMPPacket packet;
char pbuf[1024], *pend = pbuf + sizeof(pbuf);
char *enc;
packet.m_nChannel = 0x08; /* we make 8 our stream channel */
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14; /* INVOKE */
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = r->m_stream_id; /*0x01000000; */ // 指定流ID
packet.m_hasAbsTimestamp = 0;
packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
enc = packet.m_body;
enc = AMF_EncodeString(enc, pend, &av_play);
enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
*enc++ = AMF_NULL;
RTMP_Log(RTMP_LOGDEBUG, "%s, seekTime=%d, stopTime=%d, sending play: %s",
__FUNCTION__, r->Link.seekTime, r->Link.stopTime,
r->Link.playpath.av_val);
enc = AMF_EncodeString(enc, pend, &r->Link.playpath); // 指定推送點(diǎn)
if (!enc)
return FALSE;
// 指定開(kāi)始時(shí)間
/* Optional parameters start and len.
*
* start: -2, -1, 0, positive number
* -2: looks for a live stream, then a recorded stream,
* if not found any open a live stream
* -1: plays a live stream
* >=0: plays a recorded streams from 'start' milliseconds
*/
if (r->Link.lFlags & RTMP_LF_LIVE)
enc = AMF_EncodeNumber(enc, pend, -1000.0);
else
{
if (r->Link.seekTime > 0.0)
enc = AMF_EncodeNumber(enc, pend, r->Link.seekTime); /* resume from here */
else
enc = AMF_EncodeNumber(enc, pend, 0.0); /*-2000.0);*/ /* recorded as default, -2000.0 is not reliable since that freezes the
player if the stream is not found */
}
if (!enc)
return FALSE;
// 指點(diǎn)播放時(shí)長(zhǎng)
/* len: -1, 0, positive number
* -1: plays live or recorded stream to the end (default)
* 0: plays a frame 'start' ms away from the beginning
* >0: plays a live or recoded stream for 'len' milliseconds
*/
/*enc += EncodeNumber(enc, -1.0); */ /* len */
if (r->Link.stopTime)
{
enc = AMF_EncodeNumber(enc, pend, r->Link.stopTime - r->Link.seekTime);
if (!enc)
return FALSE;
}
packet.m_nBodySize = enc - packet.m_body;
return RTMP_SendPacket(r, &packet, TRUE); // 需要反饋
}
publish或play狀態(tài)反饋:
在前面完成publish或play過(guò)程調(diào)用后辈末,客戶端需要等待響應(yīng)或結(jié)果反饋。繼續(xù)看HandleInvoke()函數(shù)精簡(jiǎn)后的相關(guān)代碼:
static int
HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize)
{
......
else if (AVMATCH(&method, &av_onStatus))
{
AMFObject obj2;
AVal code, level;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);
// 判斷是否出錯(cuò)映皆,出錯(cuò)需中止流程
if (AVMATCH(&code, &av_NetStream_Failed)
|| AVMATCH(&code, &av_NetStream_Play_Failed)
|| AVMATCH(&code, &av_NetStream_Play_StreamNotFound)
|| AVMATCH(&code, &av_NetConnection_Connect_InvalidApp))
{
r->m_stream_id = -1;
RTMP_Close(r);
RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val);
}
......
// 啟動(dòng)推流成功
else if (AVMATCH(&code, &av_NetStream_Publish_Start))
{
int i;
r->m_bPlaying = TRUE; // 設(shè)置可以推流標(biāo)志
for (i = 0; i < r->m_numCalls; i++)
{
if (AVMATCH(&r->m_methodCalls[i].name, &av_publish))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
// 啟動(dòng)拉流成功
else if (AVMATCH(&code, &av_NetStream_Play_Start))
{
int i;
r->m_bPlaying = TRUE; // 設(shè)置可以拉流標(biāo)志
for (i = 0; i < r->m_numCalls; i++)
{
if (AVMATCH(&r->m_methodCalls[i].name, &av_play))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
......
}
......
}
讓我們回到RTMP_ConnectStream()函數(shù)挤聘,我們留意其中的m_bPlaying標(biāo)志,由于此時(shí)標(biāo)志為TRUE捅彻,循環(huán)條件不成立檬洞,函數(shù)退出。簡(jiǎn)化代碼如下:
int
RTMP_ConnectStream(RTMP *r, int seekTime)
{
......
while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet))
{
......
}
return r->m_bPlaying; // 返回推流或拉流準(zhǔn)許的狀態(tài)
}
當(dāng)RTMP_ConnectStream()返回TRUE的時(shí)候沟饥,librtmp庫(kù)的使用者添怔,就可以在自己的代碼層次湾戳,進(jìn)行后續(xù)的推流或拉流操作了。
推流操作代碼節(jié)選:
// 初使化RTMP報(bào)文
RTMPPacket packet;
RTMPPacket_Reset(&packet);
packet.m_body = NULL;
packet.m_chunk = NULL;
packet.m_nInfoField2 = pRTMP->m_stream_id;
uint32_t starttime = RTMP_GetTime();
while (true)
{
// 讀取TAG頭
uint8_t type = 0;
if (!ReadU8(&type, pFile))
break;
uint32_t datalen = 0;
if (!ReadU24(&datalen, pFile))
break;
uint32_t timestamp = 0;
if (!ReadTime(×tamp, pFile))
break;
uint32_t streamid = 0;
if (!ReadU24(&streamid, pFile))
break;
/*
// 跳過(guò)0x12 Script
if (type != 0x08 && type != 0x09)
{
fseek(pFile, datalen + 4, SEEK_CUR);
continue;
}
*/
RTMPPacket_Alloc(&packet, datalen);
if (fread(packet.m_body, 1, datalen, pFile) != datalen)
break;
// 組織報(bào)文并發(fā)送
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = type;
packet.m_hasAbsTimestamp = 0;
packet.m_nChannel = 6;
packet.m_nTimeStamp = timestamp;
packet.m_nBodySize = datalen;
if (!RTMP_SendPacket(pRTMP, &packet, 0))
{
printf("Send Error! \n");
break;
}
printf("send type:[%d] timestamp:[%d] datasize:[%d] \n", type, timestamp, datalen);
// 跳過(guò)PreTag
uint32_t pretagsize = 0;
if (!ReadU32(&pretagsize, pFile))
break;
// 延時(shí)广料,避免發(fā)送太快
uint32_t timeago = (RTMP_GetTime() - starttime);
if (timestamp > 1000 && timeago < timestamp - 1000)
{
printf("sleep...\n");
usleep(100000);
}
RTMPPacket_Free(&packet);
}
上面這段代碼片斷演示了使用librtmp推流砾脑,讀取FLV文件,并向上推流的例子艾杏。
拉流操作代碼節(jié)選:
bool bSaveMP3 = true;
FILE* pFile = fopen(bSaveMP3 ? "testrtmp.mp3" : "testrtmp.flv", "wb");
while (RTMP_IsConnected(pRTMP))
{
if (bSaveMP3)
{
RTMPPacket packet;
RTMPPacket_Reset(&packet);
packet.m_body = NULL;
packet.m_chunk = NULL;
b = RTMP_ReadPacket(pRTMP, &packet);
if (!b)
break;
if (!RTMPPacket_IsReady(&packet))
continue;
printf("\t headerType:[%d] \n", packet.m_headerType);
printf("\t packetType:[%d] \n", packet.m_packetType);
printf("\t hasAbsTimestamp:[%d] \n", packet.m_hasAbsTimestamp);
printf("\t nChannel:[%d] \n", packet.m_nChannel);
printf("\t nTimeStamp:[%d] \n", packet.m_nTimeStamp);
printf("\t nInfoField2:[%d] \n", packet.m_nInfoField2);
printf("\t nBodySize:[%d] \n", packet.m_nBodySize);
printf("\t nBytesRead:[%d] \n", packet.m_nBytesRead);
if (packet.m_packetType == 0x08)
{
fwrite(packet.m_body + 1, 1, packet.m_nBodySize - 1, pFile);
}
RTMPPacket_Free(&packet);
}
else
{
char sBuf[4096] = {0};
int bytes = RTMP_Read(pRTMP, sBuf, sizeof(sBuf));
printf("RTMP_Read() ret:[%d] \n", bytes);
if (bytes <= 0)
break;
fwrite(sBuf, 1, bytes, pFile);
}
}
上面這段代碼片斷演示了使用librtmp拉流韧衣,并將音頻保存為MP3,或者將音視頻保存為FLV的例子购桑。
小結(jié):
源代碼解析到這一步畅铭,我想應(yīng)該大部分的關(guān)鍵流程都帶過(guò)一遍了,但是簽于我的時(shí)間和水平的關(guān)系勃蜘,很多細(xì)節(jié)目前都還是淺嘗輒止硕噩,后面若有機(jī)會(huì)再慢慢補(bǔ)上吧,希望朋友們理解缭贡。