librtmp源碼分析之核心實(shí)現(xiàn)解讀

librtmp是我們平常工作中進(jìn)行推拉流開(kāi)發(fā)的重要工具沙廉,官方提供的版本是基于C/C++技術(shù)棧的,但是有不少的其它高級(jí)語(yǔ)言技術(shù)棧也都提供了相應(yīng)的包裝或移植版本。

RTMP協(xié)議非常復(fù)雜辜御,網(wǎng)上又鮮有較為完整的文檔,再加上它的一些設(shè)計(jì)理念比較奇特(奇特的事件會(huì)剌激我們的反向思維屈张,讓我們對(duì)現(xiàn)有認(rèn)知產(chǎn)生否定)擒权,以至于我們完全理解它需要耗費(fèi)非常多的時(shí)間和腦力袱巨。
從RTMP的實(shí)現(xiàn)版本也側(cè)面驗(yàn)證了這點(diǎn),現(xiàn)有的主流實(shí)現(xiàn)仍然遵循TLV范式(不了解的話請(qǐng)百度)碳抄,協(xié)議層面基本拋棄了消息頭的建議愉老,鏈路層復(fù)用只使用塊頭,塊頭之上就是負(fù)載剖效,對(duì)指令來(lái)說(shuō)負(fù)載就是AMF嫉入,對(duì)媒體來(lái)說(shuō)負(fù)載就是編碼幀,清爽干凈璧尸!

本文不講解RTMP協(xié)議規(guī)范的相關(guān)知識(shí)咒林,讀者朋友們請(qǐng)自行百度。本文的主要目的是配合大家一起閱讀librtmp的核心源碼爷光,但是在開(kāi)始之前建議大家有一些RTMP協(xié)議的基本認(rèn)識(shí)垫竞,我希望在完成本文的閱讀之后,大家對(duì)librtmp的使用能上一個(gè)更高的層次蛀序。

下載源碼:

在閱讀源碼前欢瞪,請(qǐng)大家到官方網(wǎng)址下載:http://rtmpdump.mplayerhq.hu/download
最新的是2.3版本,作者已經(jīng)很多年沒(méi)有更新了徐裸,說(shuō)明還是比較穩(wěn)定的遣鼓。

下源源碼后,解壓重贺,文件樹(shù)結(jié)構(gòu)如下譬正,部分文件我做了說(shuō)明:

.
├── ChangeLog
├── COPYING
├── librtmp   // rtmp協(xié)議的一個(gè)客戶端庫(kù)實(shí)現(xiàn)
│?? ├── amf.c    // AMF序列化實(shí)現(xiàn)
│?? ├── amf.h   // AMF序列化頭文件
│?? ├── bytes.h
│?? ├── COPYING
│?? ├── dhgroups.h
│?? ├── dh.h    // DH非對(duì)稱加密算法調(diào)用封裝
│?? ├── handshake.h    // 加密版本的握手實(shí)現(xiàn)
│?? ├── hashswf.c    // SWF哈希校驗(yàn)相關(guān)實(shí)現(xiàn)
│?? ├── http.h    // HTTP請(qǐng)求頭文件定義
│?? ├── librtmp.3
│?? ├── librtmp.3.html
│?? ├── librtmp.pc.in
│?? ├── log.c    // 日志輸出實(shí)現(xiàn)
│?? ├── log.h    // 日志輸出定義
│?? ├── Makefile
│?? ├── parseurl.c    // RTMP網(wǎng)址解析實(shí)現(xiàn)
│?? ├── rtmp.c    // RTMP主邏輯實(shí)現(xiàn)
│?? ├── rtmp.h    // RTMP主邏輯頭文件
│?? └── rtmp_sys.h
├── Makefile
├── README
├── rtmpdump.1
├── rtmpdump.1.html
├── rtmpdump.c    // 一個(gè)很強(qiáng)大的FLV文件解析、拉流的例子
├── rtmpgw.8
├── rtmpgw.8.html
├── rtmpgw.c    // 一個(gè)HTTP服務(wù)代理的實(shí)現(xiàn)
├── rtmpsrv.c    // 一個(gè)簡(jiǎn)單的RTMP服務(wù)器的實(shí)現(xiàn)的基本框架
├── rtmpsuck.c
├── thread.c    // 線程封裝
└── thread.h    // 線程封裝頭文件

編譯的話一般執(zhí)行make就可以了檬姥,需要openssl依賴曾我,生成的庫(kù)在librtmp子目錄下。

主要結(jié)構(gòu)定義:

在rtmp.h中(以下除非特殊說(shuō)明健民,均指rtmp.h或rtmp.c)抒巢,定義了4種塊頭格式:

#define RTMP_PACKET_SIZE_LARGE    0    // 對(duì)應(yīng)于基本塊頭的0格式
#define RTMP_PACKET_SIZE_MEDIUM   1    // 對(duì)應(yīng)于基本塊頭的1格式
#define RTMP_PACKET_SIZE_SMALL    2    // 對(duì)應(yīng)于基本塊頭的2格式
#define RTMP_PACKET_SIZE_MINIMUM  3    // 對(duì)應(yīng)于基本塊頭的3格式

4種格式的塊頭大小:

static const int packetSize[] = { 12, 8, 4, 1 };

整個(gè)塊頭的大小是不固定的秉犹,最小為1字節(jié)(基本塊頭為格式3蛉谜,且塊流ID大于1小于64時(shí)),最大為18字節(jié)(基本塊頭為格式0崇堵,且塊流ID大于319型诚,且時(shí)間戳大于0x00ffffff時(shí))。這里packetSize定義的是4種塊頭格式的消息塊頭大小+1字節(jié)的基本塊頭鸳劳。

塊頭結(jié)構(gòu)定義:

  typedef struct RTMPPacket
  {
    uint8_t m_headerType;    // 塊頭格式
    uint8_t m_packetType;    // 命令類型
    uint8_t m_hasAbsTimestamp;   // 是否絕對(duì)時(shí)間
    int m_nChannel;    // 塊流ID
    uint32_t m_nTimeStamp;     // 時(shí)間戳
    int32_t m_nInfoField2;    // 特殊字段狰贯,通常用于保存0x14號(hào)遠(yuǎn)程調(diào)用時(shí)的流ID
    uint32_t m_nBodySize;    // 負(fù)載大小
    uint32_t m_nBytesRead;    // 當(dāng)前讀取的負(fù)載大小,合包處理時(shí)使用
    RTMPChunk *m_chunk;    // 保存原始的chunk數(shù)據(jù)流
    char *m_body;    // 負(fù)載數(shù)據(jù)指針
  } RTMPPacket;

RTMPPacket非常強(qiáng)大,它負(fù)責(zé)處理發(fā)送和接收過(guò)程中的協(xié)議解析涵紊、分包傍妒、合包等復(fù)雜邏輯。

RTMP套接字上下文:

  typedef struct RTMPSockBuf
  {
    int sb_socket;    // 套接字
    int sb_size;    // 緩沖區(qū)可讀大小
    char *sb_start;    // 緩沖區(qū)讀取位置
    char sb_buf[RTMP_BUFFER_CACHE_SIZE];    // 套接字讀取緩沖區(qū)
    int sb_timedout;    // 超時(shí)標(biāo)志
    void *sb_ssl;    // TLS上下文
  } RTMPSockBuf;

RTMP在與底層套接口通訊時(shí)摸柄,使用了這個(gè)與邏輯無(wú)關(guān)的讀緩沖颤练。

RTMP協(xié)議層連接上下文:

  typedef struct RTMP_LNK
  {
    AVal hostname;    // 目標(biāo)主機(jī)地址
    AVal sockshost;    // socks代理地址

    // 連接和推拉流涉及的一些參數(shù)信息
    AVal playpath0;     /* parsed from URL */
    AVal playpath;      /* passed in explicitly */
    AVal tcUrl;
    AVal swfUrl;
    AVal pageUrl;
    AVal app;
    AVal auth;
    AVal flashVer;
    AVal subscribepath;
    AVal token;
    AMFObject extras;
    int edepth;

    int seekTime;    // 播放流的開(kāi)始時(shí)間
    int stopTime;    // 播放流的停止時(shí)間

#define RTMP_LF_AUTH    0x0001  /* using auth param */
#define RTMP_LF_LIVE    0x0002  /* stream is live */
#define RTMP_LF_SWFV    0x0004  /* do SWF verification */
#define RTMP_LF_PLST    0x0008  /* send playlist before play */
#define RTMP_LF_BUFX    0x0010  /* toggle stream on BufferEmpty msg */
#define RTMP_LF_FTCU    0x0020  /* free tcUrl on close */
    int lFlags;

    int swfAge;

    int protocol;    // 連接使用的協(xié)議
    int timeout;    // 連接超時(shí)時(shí)間

    unsigned short socksport;    // socks代理端口
    unsigned short port;    // 目標(biāo)主機(jī)端口

#ifdef CRYPTO
#define RTMP_SWF_HASHLEN        32
    void *dh;                   /* for encryption */
    void *rc4keyIn;
    void *rc4keyOut;

    uint32_t SWFSize;
    uint8_t SWFHash[RTMP_SWF_HASHLEN];
    char SWFVerificationResponse[RTMP_SWF_HASHLEN+10];
#endif
  } RTMP_LNK;

RTMP_LNK包含了連接的服務(wù)器地址,以及推拉流所需的各種參數(shù)信息驱负,一些需要在連接前進(jìn)行設(shè)置嗦玖。

RTMP_Read()操作的附加上下文:

  typedef struct RTMP_READ
  {
    char *buf;    // 讀取緩沖區(qū)
    char *bufpos;    // 緩沖區(qū)讀取位置
    unsigned int buflen;  // 當(dāng)前緩沖區(qū)數(shù)據(jù)的長(zhǎng)度
    uint32_t timestamp;    // 讀取的最新時(shí)間戳
    uint8_t dataType;    // 讀取到的元數(shù)據(jù)媒體類型
    uint8_t flags;    // 讀取標(biāo)志集合
#define RTMP_READ_HEADER        0x01
#define RTMP_READ_RESUME        0x02
#define RTMP_READ_NO_IGNORE     0x04
#define RTMP_READ_GOTKF         0x08
#define RTMP_READ_GOTFLVK       0x10
#define RTMP_READ_SEEKING       0x20
    int8_t status;    // 當(dāng)前讀取的狀態(tài)
#define RTMP_READ_COMPLETE      -3
#define RTMP_READ_ERROR -2
#define RTMP_READ_EOF   -1
#define RTMP_READ_IGNORE        0

    /* if bResume == TRUE */
    uint8_t initialFrameType;
    uint32_t nResumeTS;
    char *metaHeader;
    char *initialFrame;
    uint32_t nMetaHeaderSize;
    uint32_t nInitialFrameSize;
    uint32_t nIgnoredFrameCounter;
    uint32_t nIgnoredFlvFrameCounter;
  } RTMP_READ;

RTMP_READ結(jié)構(gòu)定義了RTMP_Read()函數(shù)工作時(shí)需要的附加上下文和緩沖區(qū)。
RTMP_Read()與RTMP_ReadPacket()的主要區(qū)別是跃脊,RTMP_Read()返回的是FLV格式的流踏揣,它需要做兩層操作,首先是解RTMP協(xié)議匾乓,其次是編碼為FLV格式捞稿,而RTMP_ReadPacket()只需要執(zhí)行一步。

0x14命令遠(yuǎn)程過(guò)程調(diào)用隊(duì)列子項(xiàng):

  typedef struct RTMP_METHOD
  {
    AVal name;    // 當(dāng)前的調(diào)用過(guò)程名稱
    int num;    // 操作流水號(hào)拼缝,初使為1娱局,自增
  } RTMP_METHOD;

處理所有RTMP操作的連接上下文:

  typedef struct RTMP
  {
    int m_inChunkSize;    // 最大接收塊大小
    int m_outChunkSize;    // 最大發(fā)送塊大小
    int m_nBWCheckCounter;    // 帶寬檢測(cè)計(jì)數(shù)器
    int m_nBytesIn;    // 接收數(shù)據(jù)計(jì)數(shù)器
    int m_nBytesInSent;    // 當(dāng)前數(shù)據(jù)已回應(yīng)計(jì)數(shù)器
    int m_nBufferMS;    // 當(dāng)前緩沖的時(shí)間長(zhǎng)度,以MS為單位
    int m_stream_id;    // 當(dāng)前連接的流ID
    int m_mediaChannel;    // 當(dāng)前連接媒體使用的塊流ID
    uint32_t m_mediaStamp;    // 當(dāng)前連接媒體最新的時(shí)間戳
    uint32_t m_pauseStamp;    // 當(dāng)前連接媒體暫停時(shí)的時(shí)間戳
    int m_pausing;    // 是否暫停狀態(tài)
    int m_nServerBW;    // 服務(wù)器帶寬
    int m_nClientBW;    // 客戶端帶寬
    uint8_t m_nClientBW2;    // 客戶端帶寬調(diào)節(jié)方式
    uint8_t m_bPlaying;    // 當(dāng)前是否推流或連接中
    uint8_t m_bSendEncoding;    // 連接服務(wù)器時(shí)發(fā)送編碼
    uint8_t m_bSendCounter;    // 設(shè)置是否向服務(wù)器發(fā)送接收字節(jié)應(yīng)答

    int m_numInvokes;    // 0x14命令遠(yuǎn)程過(guò)程調(diào)用計(jì)數(shù)
    int m_numCalls;    // 0x14命令遠(yuǎn)程過(guò)程請(qǐng)求隊(duì)列數(shù)量
    RTMP_METHOD *m_methodCalls;    // 遠(yuǎn)程過(guò)程調(diào)用請(qǐng)求隊(duì)列

    RTMPPacket *m_vecChannelsIn[RTMP_CHANNELS];    // 對(duì)應(yīng)塊流ID上一次接收的報(bào)文
    RTMPPacket *m_vecChannelsOut[RTMP_CHANNELS];    // 對(duì)應(yīng)塊流ID上一次發(fā)送的報(bào)文
    int m_channelTimestamp[RTMP_CHANNELS];    // 對(duì)應(yīng)塊流ID媒體的最新時(shí)間戳

    double m_fAudioCodecs;    // 音頻編碼器代碼
    double m_fVideoCodecs;    // 視頻編碼器代碼
    double m_fEncoding;         /* AMF0 or AMF3 */

    double m_fDuration;    // 當(dāng)前媒體的時(shí)長(zhǎng)

    int m_msgCounter;    // 使用HTTP協(xié)議發(fā)送請(qǐng)求的計(jì)數(shù)器
    int m_polling;    // 使用HTTP協(xié)議接收消息主體時(shí)的位置
    int m_resplen;    // 使用HTTP協(xié)議接收消息主體時(shí)的未讀消息計(jì)數(shù)
    int m_unackd;    // 使用HTTP協(xié)議處理時(shí)無(wú)響應(yīng)的計(jì)數(shù)
    AVal m_clientID;    // 使用HTTP協(xié)議處理時(shí)的身份ID

    RTMP_READ m_read;    // RTMP_Read()操作的上下文
    RTMPPacket m_write;    // RTMP_Write()操作使用的可復(fù)用報(bào)文對(duì)象
    RTMPSockBuf m_sb;    // RTMP_ReadPacket()讀包操作的上下文
    RTMP_LNK Link;    // RTMP連接上下文
  } RTMP;

RTMP做為整個(gè)推拉流操作的上下文咧七,從握手開(kāi)始到關(guān)閉連接衰齐,它慣穿了整個(gè)會(huì)話的生存期。

主要函數(shù)實(shí)現(xiàn):

報(bào)文操作:

RTMPPacket是librtmp收發(fā)報(bào)文的關(guān)鍵結(jié)構(gòu)继阻,基中m_body緩沖區(qū)是塊頭和負(fù)載公用的耻涛,這里有個(gè)技巧,請(qǐng)看代碼:

// 為報(bào)文結(jié)構(gòu)分配指定負(fù)載大小的內(nèi)存
int
RTMPPacket_Alloc(RTMPPacket *p, int nSize)
{
  // 這里多分配了18個(gè)字節(jié)的內(nèi)存
  char *ptr = calloc(1, nSize + RTMP_MAX_HEADER_SIZE);
  if (!ptr)
    return FALSE;
  // 讓負(fù)載指向內(nèi)存的第19字節(jié)
  p->m_body = ptr + RTMP_MAX_HEADER_SIZE;
  p->m_nBytesRead = 0; 
  return TRUE;
}

// 釋放負(fù)載內(nèi)存
void
RTMPPacket_Free(RTMPPacket *p)
{
  if (p->m_body)
    {    
      free(p->m_body - RTMP_MAX_HEADER_SIZE);
      p->m_body = NULL;
    }    
}

// 重置除負(fù)載內(nèi)存以外的其它字段
void
RTMPPacket_Reset(RTMPPacket *p)
{
  p->m_headerType = 0; 
  p->m_packetType = 0; 
  p->m_nChannel = 0; 
  p->m_nTimeStamp = 0; 
  p->m_nInfoField2 = 0; 
  p->m_hasAbsTimestamp = FALSE;
  p->m_nBodySize = 0; 
  p->m_nBytesRead = 0; 
}

RTMPPacket_Alloc()多分配18個(gè)字節(jié)的內(nèi)存瘟檩,其好處在于抹缕,發(fā)送報(bào)文時(shí),頭部和負(fù)載可以序列化在一段連續(xù)的緩沖區(qū)墨辛,理想情況下只需要執(zhí)行一個(gè)Write調(diào)用卓研。

RTMP上下文的初使化操作:

librtmp提供RTMP上下文內(nèi)存分配、初使化睹簇、釋放的函數(shù)奏赘,雖然也不復(fù)雜,但使用標(biāo)準(zhǔn)接口是個(gè)好習(xí)慣太惠。

// 分配RTMP內(nèi)存
RTMP *
RTMP_Alloc()
{
  return calloc(1, sizeof(RTMP));
}

// 釋放RTMP內(nèi)存
void
RTMP_Free(RTMP *r)
{
  free(r);
}

// 初使化RTMP內(nèi)存
void
RTMP_Init(RTMP *r)
{
#ifdef CRYPTO
  if (!RTMP_TLS_ctx)
    RTMP_TLS_Init();
#endif

  memset(r, 0, sizeof(RTMP));    // 這里將所有的內(nèi)存置0
  r->m_sb.sb_socket = -1;
  r->m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE;    // 默認(rèn)最大接收塊限制128字節(jié)
  r->m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE;    // 默認(rèn)最大發(fā)送塊限制128字節(jié)
  r->m_nBufferMS = 30000;    // 默認(rèn)最大時(shí)長(zhǎng)緩沖設(shè)置磨淌,需通知服務(wù)器
  r->m_nClientBW = 2500000;    // 默認(rèn)最大客戶端帶寬
  r->m_nClientBW2 = 2;    // 默認(rèn)最大客戶端帶寬的調(diào)整方式
  r->m_nServerBW = 2500000;    // 默認(rèn)最大服務(wù)器帶寬
  r->m_fAudioCodecs = 3191.0;
  r->m_fVideoCodecs = 252.0;
  r->Link.timeout = 30;    // 默認(rèn)連接超時(shí)
  r->Link.swfAge = 30;
}
參數(shù)設(shè)置:

大多數(shù)參數(shù)設(shè)置函數(shù)必須在連接服務(wù)器即RTMP_Connect()之前調(diào)用,否則可能不會(huì)生效凿渊。

// 設(shè)置推流操作選項(xiàng)梁只,這樣在RTMP_ConnectStream()操作內(nèi)部缚柳,將使用推流請(qǐng)求代替拉流請(qǐng)求。
void
RTMP_EnableWrite(RTMP *r)
{
  r->Link.protocol |= RTMP_FEATURE_WRITE;
}

// 設(shè)置服務(wù)器緩存的流時(shí)間長(zhǎng)度
void
RTMP_SetBufferMS(RTMP *r, int size)
{
  r->m_nBufferMS = size;
}

// 設(shè)置RTMP推拉流的完整地址
int RTMP_SetupURL(RTMP *r, char *url)
{
  ......
  // 解析流地址
  ret = RTMP_ParseURL(url, &r->Link.protocol, &r->Link.hostname,
        &port, &r->Link.playpath0, &r->Link.app);
  if (!ret)
    return ret;
  r->Link.port = port;
  r->Link.playpath = r->Link.playpath0;
  ......
  // 解析其他KV參數(shù)
  while (ptr) {
    *ptr++ = '\0';
    p1 = ptr;
    p2 = strchr(p1, '=');
    if (!p2)
      break;
    opt.av_val = p1;
    opt.av_len = p2 - p1;
    *p2++ = '\0';
    arg.av_val = p2;
    ptr = strchr(p2, ' ');
    if (ptr) {
      *ptr = '\0';
      arg.av_len = ptr - p2;
      /* skip repeated spaces */
      while(ptr[1] == ' ')
        *ptr++ = '\0';
    } else {
      arg.av_len = strlen(p2);
    }

    arg.av_len = p2 - arg.av_val;

    ret = RTMP_SetOpt(r, &opt, &arg);
    if (!ret)
      return ret;
  }
  ......
}
連接和握手操作:

在完成選項(xiàng)設(shè)置后敛纲,接下來(lái)調(diào)用RTMP_Connect()進(jìn)行RTMP的握手操作。握手完成后剂癌,客戶端還需要主動(dòng)發(fā)送connect遠(yuǎn)程過(guò)程調(diào)用淤翔,這些操作都封裝在RTMP_Connect(),請(qǐng)看代碼:

// 對(duì)用戶開(kāi)放的RTMP連接接口
int
RTMP_Connect(RTMP *r, RTMPPacket *cp)
{
  struct sockaddr_in service;
  if (!r->Link.hostname.av_len)
    return FALSE;

  memset(&service, 0, sizeof(struct sockaddr_in));
  service.sin_family = AF_INET;

  // 設(shè)置直接連接的服務(wù)器地址
  if (r->Link.socksport)
    {
      /* Connect via SOCKS */
      if (!add_addr_info(&service, &r->Link.sockshost, r->Link.socksport))
        return FALSE;
    }
  else
    {
      /* Connect directly */
      if (!add_addr_info(&service, &r->Link.hostname, r->Link.port))
        return FALSE;
    }

  // 發(fā)起網(wǎng)絡(luò)連接
  if (!RTMP_Connect0(r, (struct sockaddr *)&service))
    return FALSE;

  r->m_bSendCounter = TRUE;

  // 發(fā)起握手協(xié)商
  return RTMP_Connect1(r, cp);
}

// 執(zhí)行基礎(chǔ)網(wǎng)絡(luò)連接
int
RTMP_Connect0(RTMP *r, struct sockaddr * service)
{
  int on = 1;
  r->m_sb.sb_timedout = FALSE;
  r->m_pausing = 0;
  r->m_fDuration = 0.0;

  // 創(chuàng)建套接字
  r->m_sb.sb_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  if (r->m_sb.sb_socket != -1)
    {
      // 連接對(duì)端
      if (connect(r->m_sb.sb_socket, service, sizeof(struct sockaddr)) < 0)
        {
          int err = GetSockError();
          RTMP_Log(RTMP_LOGERROR, "%s, failed to connect socket. %d (%s)",
              __FUNCTION__, err, strerror(err));
          RTMP_Close(r);
          return FALSE;
        }
 
      // 執(zhí)行Socks協(xié)商
      if (r->Link.socksport)
        {
          RTMP_Log(RTMP_LOGDEBUG, "%s ... SOCKS negotiation", __FUNCTION__);
          if (!SocksNegotiate(r))
            {
              RTMP_Log(RTMP_LOGERROR, "%s, SOCKS negotiation failed.", __FUNCTION__);
              RTMP_Close(r);
              return FALSE;
            }
        }
    }
  else
    {
      RTMP_Log(RTMP_LOGERROR, "%s, failed to create socket. Error: %d", __FUNCTION__,
          GetSockError());
      return FALSE;
    }

  // 設(shè)置接收網(wǎng)絡(luò)超時(shí)
  {
    SET_RCVTIMEO(tv, r->Link.timeout);
    if (setsockopt
        (r->m_sb.sb_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)))
      {
        RTMP_Log(RTMP_LOGERROR, "%s, Setting socket timeout to %ds failed!",
            __FUNCTION__, r->Link.timeout);
      }
  }

  setsockopt(r->m_sb.sb_socket, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on));

  return TRUE;
}

// 繼續(xù)執(zhí)行SSL或HTTP協(xié)商佩谷,以及RTMP握手
int
RTMP_Connect1(RTMP *r, RTMPPacket *cp)
{
  // SSL握手處理
  if (r->Link.protocol & RTMP_FEATURE_SSL)
    {
#if defined(CRYPTO) && !defined(NO_SSL)
      TLS_client(RTMP_TLS_ctx, r->m_sb.sb_ssl);
      TLS_setfd(r->m_sb.sb_ssl, r->m_sb.sb_socket);
      if (TLS_connect(r->m_sb.sb_ssl) < 0)
        {
          RTMP_Log(RTMP_LOGERROR, "%s, TLS_Connect failed", __FUNCTION__);
          RTMP_Close(r);
          return FALSE;
        }
#else
      RTMP_Log(RTMP_LOGERROR, "%s, no SSL/TLS support", __FUNCTION__);
      RTMP_Close(r);
      return FALSE;

#endif
    }
  // HTTP代理協(xié)商
  if (r->Link.protocol & RTMP_FEATURE_HTTP)
    {
      r->m_msgCounter = 1;
      r->m_clientID.av_val = NULL;
      r->m_clientID.av_len = 0;
      HTTP_Post(r, RTMPT_OPEN, "", 1);
      HTTP_read(r, 1);
      r->m_msgCounter = 0;
    }
  RTMP_Log(RTMP_LOGDEBUG, "%s, ... connected, handshaking", __FUNCTION__);
  // RTMP握手
  if (!HandShake(r, TRUE))
    {
      RTMP_Log(RTMP_LOGERROR, "%s, handshake failed.", __FUNCTION__);
      RTMP_Close(r);
      return FALSE;
    }
  RTMP_Log(RTMP_LOGDEBUG, "%s, handshaked", __FUNCTION__);

  // 發(fā)送第一個(gè)連接報(bào)文
  if (!SendConnectPacket(r, cp))
    {
      RTMP_Log(RTMP_LOGERROR, "%s, RTMP connect failed.", __FUNCTION__);
      RTMP_Close(r);
      return FALSE;
    }
  return TRUE;
}

// SOCKS協(xié)商處理
static int
SocksNegotiate(RTMP *r)
{
  unsigned long addr;
  struct sockaddr_in service;
  memset(&service, 0, sizeof(struct sockaddr_in));

  add_addr_info(&service, &r->Link.hostname, r->Link.port);
  addr = htonl(service.sin_addr.s_addr);

  {
    char packet[] = {
      4, 1,                     /* SOCKS 4, connect */
      (r->Link.port >> 8) & 0xFF,
      (r->Link.port) & 0xFF,
      (char)(addr >> 24) & 0xFF, (char)(addr >> 16) & 0xFF,
      (char)(addr >> 8) & 0xFF, (char)addr & 0xFF,
      0
    };                          /* NULL terminate */

    WriteN(r, packet, sizeof packet);

    if (ReadN(r, packet, 8) != 8)
      return FALSE;

    if (packet[0] == 0 && packet[1] == 90)
      {
        return TRUE;
      }
    else
      {
        RTMP_Log(RTMP_LOGERROR, "%s, SOCKS returned error code %d", packet[1]);
        return FALSE;
      }
  }
}
connect遠(yuǎn)程調(diào)用:

在前面RTMP_Connect1()的最后一步旁壮,調(diào)用了SendConnectPacket()這個(gè)函數(shù),實(shí)際上的用途就是發(fā)起0x14命令connect遠(yuǎn)程調(diào)用谐檀,代碼摘錄并簡(jiǎn)化如下:

static int
SendConnectPacket(RTMP *r, RTMPPacket *cp)
{
  RTMPPacket packet;
  char pbuf[4096], *pend = pbuf + sizeof(pbuf);
  char *enc;

  // 填寫(xiě)塊頭字段
  packet.m_nChannel = 0x03;     /* control channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
  packet.m_packetType = 0x14;   /* INVOKE */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
  enc = packet.m_body;

  // 壓入connect命令和操作流水號(hào)
  enc = AMF_EncodeString(enc, pend, &av_connect);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  *enc++ = AMF_OBJECT;
  
  // 壓入對(duì)象的各個(gè)屬性
  enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app);
  if (!enc)
    return FALSE;
  if (r->Link.protocol & RTMP_FEATURE_WRITE)
    {
      enc = AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate);
      if (!enc)
        return FALSE;
    }
  if (r->Link.flashVer.av_len)
    {
      enc = AMF_EncodeNamedString(enc, pend, &av_flashVer, &r->Link.flashVer);
      if (!enc)
        return FALSE;
    }
  ......
  // 壓入屬性結(jié)束標(biāo)記
  *enc++ = 0;
  *enc++ = 0;                   /* end of object - 0x00 0x00 0x09 */
  *enc++ = AMF_OBJECT_END;
  packet.m_nBodySize = enc - packet.m_body;

  // 發(fā)送報(bào)文抡谐,并記入應(yīng)答隊(duì)列
  return RTMP_SendPacket(r, &packet, TRUE);
}

然后,我們?cè)倏纯碦TMP_SendPacket()的實(shí)現(xiàn)桐猬,代碼摘錄并簡(jiǎn)化如下(相關(guān)邏輯的解釋添加在源碼中):

int
RTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue)
{
  // 取出對(duì)應(yīng)塊流ID上一次發(fā)送的報(bào)文
  const RTMPPacket *prevPacket = r->m_vecChannelsOut[packet->m_nChannel];
  uint32_t last = 0;
  int nSize;
  int hSize, cSize;
  char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c;
  ......
  // 嘗試對(duì)非LARGE報(bào)文進(jìn)行字段壓縮
  if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE)
    {
      // MEDIUM報(bào)文可以嘗試壓縮為SMALL報(bào)文
      if (prevPacket->m_nBodySize == packet->m_nBodySize
          && prevPacket->m_packetType == packet->m_packetType
          && packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM)
        packet->m_headerType = RTMP_PACKET_SIZE_SMALL;

      // MALL報(bào)文可以嘗試壓縮為MINIMUM報(bào)文
      if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp
          && packet->m_headerType == RTMP_PACKET_SIZE_SMALL)
        packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM;

      last = prevPacket->m_nTimeStamp;
    }
  ......
  // 根據(jù)壓縮后的報(bào)文類型預(yù)設(shè)報(bào)頭大小
  nSize = packetSize[packet->m_headerType];
  hSize = nSize; cSize = 0;
  t = packet->m_nTimeStamp - last;
  // 預(yù)設(shè)報(bào)頭的緩沖區(qū)
  if (packet->m_body)
    {
      header = packet->m_body - nSize;
      hend = packet->m_body;
    }
  else
    {
      header = hbuf + 6;
      hend = hbuf + sizeof(hbuf);
    }

  // 計(jì)算基本頭的擴(kuò)充大小
  if (packet->m_nChannel > 319)
    cSize = 2;
  else if (packet->m_nChannel > 63)
    cSize = 1;
  if (cSize)
    {
      header -= cSize;
      hSize += cSize;
    }
  // 根據(jù)時(shí)間戳計(jì)算是否需要擴(kuò)充頭大小
  if (nSize > 1 && t >= 0xffffff)
    {
      header -= 4;
      hSize += 4;
    }

  // 向緩沖區(qū)壓入基本頭
  hptr = header;
  c = packet->m_headerType << 6;
  switch (cSize)
    {
    case 0:
      c |= packet->m_nChannel;
      break;
    case 1:
      break;
    case 2:
      c |= 1;
      break;
    }
  *hptr++ = c;
  if (cSize)
    {
      int tmp = packet->m_nChannel - 64;
      *hptr++ = tmp & 0xff;
      if (cSize == 2)
        *hptr++ = tmp >> 8;
    }

  // 向緩沖區(qū)壓入時(shí)間戳
  if (nSize > 1)
    {
      hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t);
    }
  // 向緩沖區(qū)壓入負(fù)載大小和報(bào)文類型
  if (nSize > 4)
    {
      hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize);
      *hptr++ = packet->m_packetType;
    }
  // 向緩沖區(qū)壓入流ID
  if (nSize > 8)
    hptr += EncodeInt32LE(hptr, packet->m_nInfoField2);
  // 向緩沖區(qū)壓入擴(kuò)展時(shí)間戳
  if (nSize > 1 && t >= 0xffffff)
    hptr = AMF_EncodeInt32(hptr, hend, t);

  nSize = packet->m_nBodySize;
  buffer = packet->m_body;
  nChunkSize = r->m_outChunkSize;
  // 當(dāng)數(shù)據(jù)未發(fā)送完成時(shí)
  while (nSize + hSize)
    {
      int wrote;
      // 一次發(fā)送的最大負(fù)載限制為塊大小
      if (nSize < nChunkSize)
        nChunkSize = nSize;
      // 發(fā)送一個(gè)塊
      wrote = WriteN(r, header, nChunkSize + hSize);
      if (!wrote)
         return FALSE;
      // 可能有分塊麦撵,只有部分負(fù)載發(fā)送成功
      nSize -= nChunkSize;
      buffer += nChunkSize;
      hSize = 0;
      // 若只有部分負(fù)載發(fā)送成功,則需繼續(xù)構(gòu)造塊再次發(fā)送
      if (nSize > 0)
        {
          // 只需要構(gòu)造3號(hào)類型的塊頭
          header = buffer - 1;
          hSize = 1;
          if (cSize)
            {
              header -= cSize;
              hSize += cSize;
            }
          *header = (0xc0 | c);
          if (cSize)
            {
              int tmp = packet->m_nChannel - 64;
              header[1] = tmp & 0xff;
              if (cSize == 2)
                header[2] = tmp >> 8;
            }
        }
    }
  // 如果是0x14遠(yuǎn)程調(diào)用溃肪,則需要解出調(diào)用名稱免胃,加入等待響應(yīng)的隊(duì)列中
  if (packet->m_packetType == 0x14)
    {
      AVal method;
      char *ptr;
      ptr = packet->m_body + 1;
      AMF_DecodeString(ptr, &method);
      if (queue) {
        int txn;
        ptr += 3 + method.av_len;
        txn = (int)AMF_DecodeNumber(ptr);
        AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn);
      }
    }
  
  // 記錄這個(gè)塊流ID剛剛發(fā)送的報(bào)文,但是應(yīng)忽略負(fù)載
  if (!r->m_vecChannelsOut[packet->m_nChannel])
    r->m_vecChannelsOut[packet->m_nChannel] = malloc(sizeof(RTMPPacket));
  memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket));
  return TRUE;
}
connect遠(yuǎn)程調(diào)用響應(yīng)_result:

connect報(bào)文發(fā)出后惫撰,這時(shí)客戶端會(huì)陷入等待狀態(tài)羔沙,必須接收到服務(wù)器的_result響應(yīng)才能執(zhí)行后繼的流程。librtmp庫(kù)將等待響應(yīng)的過(guò)程封裝了厨钻,對(duì)使用者展現(xiàn)一個(gè)RTMP_ConnectStream()函數(shù)調(diào)用扼雏,代碼如下:

int
RTMP_ConnectStream(RTMP *r, int seekTime)
{
  RTMPPacket packet = { 0 };

  // 設(shè)置起始時(shí)間定位
  if (seekTime > 0)
    r->Link.seekTime = seekTime;

  r->m_mediaChannel = 0;

  // 循環(huán)讀取報(bào)文并等待完成推流或拉流的交互準(zhǔn)備工作
  while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet))
    {
      // 報(bào)文可讀
      if (RTMPPacket_IsReady(&packet))
        {
          if (!packet.m_nBodySize)
            continue;

          // 在所有的交互操作準(zhǔn)備好之前,過(guò)濾非法的音視頻報(bào)文
          if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) ||
              (packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) ||
              (packet.m_packetType == RTMP_PACKET_TYPE_INFO))
            {
              RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring.");
              RTMPPacket_Free(&packet);
              continue;
            }

          // 進(jìn)行準(zhǔn)備工作期間的報(bào)文的分派處理
          RTMP_ClientPacket(r, &packet);
          RTMPPacket_Free(&packet);
        }
    }

  // 返回是否準(zhǔn)備好推拉流
  return r->m_bPlaying;
}

在RTMP_ConnectStream()處理交互準(zhǔn)備的過(guò)程中夯膀,有兩個(gè)重要函數(shù):RTMP_ReadPacket()負(fù)責(zé)接收?qǐng)?bào)文诗充,RTMP_ClientPacket()負(fù)責(zé)邏輯的分派處理。先看RTMP_ReadPacket()代碼:

int
RTMP_ReadPacket(RTMP *r, RTMPPacket *packet)
{
  uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };
  char *header = (char *)hbuf;
  int nSize, hSize, nToRead, nChunk;
  int didAlloc = FALSE;

  // 讀取基本塊頭的首個(gè)字節(jié)
  if (ReadN(r, (char *)hbuf, 1) == 0)
    {
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__);
      return FALSE;
    }

  // 解析基本塊頭的首個(gè)字節(jié)诱建,取得報(bào)頭類型和塊流ID
  packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
  packet->m_nChannel = (hbuf[0] & 0x3f);
  header++;
  if (packet->m_nChannel == 0)
    {
      // 2字節(jié)基本頭其障,繼續(xù)讀取1個(gè)字節(jié)
      if (ReadN(r, (char *)&hbuf[1], 1) != 1)
        {
          RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte",
              __FUNCTION__);
          return FALSE;
        }
      packet->m_nChannel = hbuf[1];
      packet->m_nChannel += 64;
      header++;
    }
  else if (packet->m_nChannel == 1)
    {
      // 3字節(jié)基本頭,繼續(xù)讀取2個(gè)字節(jié)
      int tmp;
      if (ReadN(r, (char *)&hbuf[1], 2) != 2)
        {
          RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte",
              __FUNCTION__);
          return FALSE;
        }
      tmp = (hbuf[2] << 8) + hbuf[1];
      packet->m_nChannel = tmp + 64;
      RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);
      header += 2;
    }

  // 根據(jù)報(bào)頭類型取得塊頭長(zhǎng)度
  nSize = packetSize[packet->m_headerType];

  // 如果是標(biāo)準(zhǔn)大頭涂佃,設(shè)置時(shí)間戳為絕對(duì)的
  if (nSize == RTMP_LARGE_HEADER_SIZE)
    packet->m_hasAbsTimestamp = TRUE;
  // 如果非標(biāo)準(zhǔn)大頭励翼,首次嘗試拷貝上一次的報(bào)頭
  else if (nSize < RTMP_LARGE_HEADER_SIZE)
    {
      // 這里的拷貝操作有可能取得上次的分塊報(bào)文,然后繼續(xù)后續(xù)塊的接收合并工作
      if (r->m_vecChannelsIn[packet->m_nChannel])
        memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel],
               sizeof(RTMPPacket));
    }

  // 計(jì)算消息塊頭(主體塊頭)大小
  nSize--;

  // 讀取消息塊頭
  if (nSize > 0 && ReadN(r, header, nSize) != nSize)
    {
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x",
          __FUNCTION__, (unsigned int)hbuf[0]);
      return FALSE;
    }
  // 計(jì)算基本塊頭+消息塊頭的大小
  hSize = nSize + (header - (char *)hbuf);

  if (nSize >= 3)
    {
      // 解析時(shí)間戳
      packet->m_nTimeStamp = AMF_DecodeInt24(header);

      if (nSize >= 6)
        {
          // 解析負(fù)載長(zhǎng)度
          packet->m_nBodySize = AMF_DecodeInt24(header + 3);
          packet->m_nBytesRead = 0;
          RTMPPacket_Free(packet);

          if (nSize > 6)
            {
              // 解析包類型
              packet->m_packetType = header[6];
              // 解析流ID
              if (nSize == 11)
                packet->m_nInfoField2 = DecodeInt32LE(header + 7);
            }
        }
      // 讀取擴(kuò)展時(shí)間戳并解析
      if (packet->m_nTimeStamp == 0xffffff)
        {
          if (ReadN(r, header + nSize, 4) != 4)
            {
              RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp",
                  __FUNCTION__);
              return FALSE;
            }
          packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);
          hSize += 4;
        }
    }

  // 負(fù)載非0辜荠,需要分配內(nèi)存汽抚,或第一個(gè)分塊的初使化工作
  if (packet->m_nBodySize > 0 && packet->m_body == NULL)
    {
      if (!RTMPPacket_Alloc(packet, packet->m_nBodySize))
        {
          RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);
          return FALSE;
        }
      didAlloc = TRUE;
      packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
    }

  // 準(zhǔn)備讀取的數(shù)據(jù)和塊大小
  nToRead = packet->m_nBodySize - packet->m_nBytesRead;
  nChunk = r->m_inChunkSize;
  if (nToRead < nChunk)
    nChunk = nToRead;

  // 如果packet->m_chunk非空,拷貝當(dāng)前塊的相關(guān)信息
  if (packet->m_chunk)
    {
      packet->m_chunk->c_headerSize = hSize;
      memcpy(packet->m_chunk->c_header, hbuf, hSize);
      packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead;
      packet->m_chunk->c_chunkSize = nChunk;
    }

  // 讀取負(fù)載到緩沖區(qū)中
  if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk)
    {
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %lu",
          __FUNCTION__, packet->m_nBodySize);
      return FALSE;
    }

  RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk);

  packet->m_nBytesRead += nChunk;

  // 保存當(dāng)前塊流ID最新的報(bào)文伯病,與RTMP_SendPacket()不同的是造烁,負(fù)載部分也被保存了否过,以應(yīng)對(duì)不完整的分塊報(bào)文
  if (!r->m_vecChannelsIn[packet->m_nChannel])
    r->m_vecChannelsIn[packet->m_nChannel] = malloc(sizeof(RTMPPacket));
  memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));

  // 若報(bào)文負(fù)載接收完整
  if (RTMPPacket_IsReady(packet))
    {
      // 處理增量時(shí)間戳
      if (!packet->m_hasAbsTimestamp)
        packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; 

      // 保存當(dāng)前塊流ID的時(shí)間戳
      r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;

      // 清理上下文中當(dāng)前塊流ID最新的報(bào)文的負(fù)載信息
      r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;
      r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;
      r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE;
    }
  else
    {
      // 若報(bào)文不完整,不將分片負(fù)載向上拋給應(yīng)用惭蟋,以免引起使用誤解
      packet->m_body = NULL;
    }

  return TRUE;
}

再看RTMP_ClientPacket()源碼:

int
RTMP_ClientPacket(RTMP *r, RTMPPacket *packet)
{
  // 根據(jù)命令類型進(jìn)行分派處理
  int bHasMediaPacket = 0;
  switch (packet->m_packetType)
    {
    case 0x01:
      // 更新接收處理時(shí)的塊限制
      HandleChangeChunkSize(r, packet);
      break;

    case 0x03:
      // 對(duì)端反饋的已讀大小
      RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__);
      break;

    case 0x04:
      // 處理對(duì)端發(fā)送的控制報(bào)文
      HandleCtrl(r, packet);
      break;

    case 0x05:
      // 處理對(duì)端發(fā)送的應(yīng)答窗口大小苗桂,這里由服務(wù)器發(fā)送,即告之客戶端收到對(duì)應(yīng)大小的數(shù)據(jù)后應(yīng)發(fā)送反饋
      HandleServerBW(r, packet);
      break;

    case 0x06:
      // 處理對(duì)端發(fā)送的設(shè)置發(fā)送帶寬大小告组,這里由服務(wù)器發(fā)送煤伟,即設(shè)置客戶端的發(fā)送帶寬
      HandleClientBW(r, packet);
      break;

    case 0x08:
      // 處理音頻數(shù)據(jù)
      HandleAudio(r, packet);
      bHasMediaPacket = 1;
      if (!r->m_mediaChannel)
        r->m_mediaChannel = packet->m_nChannel;
      if (!r->m_pausing)
        r->m_mediaStamp = packet->m_nTimeStamp;
      break;

    case 0x09:
      // 處理視頻數(shù)據(jù)
      HandleVideo(r, packet);
      bHasMediaPacket = 1;
      if (!r->m_mediaChannel)
        r->m_mediaChannel = packet->m_nChannel;
      if (!r->m_pausing)
        r->m_mediaStamp = packet->m_nTimeStamp;
      break;

    case 0x12:
      // 處理媒體元數(shù)據(jù)
      if (HandleMetadata(r, packet->m_body, packet->m_nBodySize))
        bHasMediaPacket = 1;
      break;

    case 0x14:
      // 處理遠(yuǎn)程調(diào)用
      if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1)
        bHasMediaPacket = 2;
      break;
    }

  // 返回值為1表示推拉流正在正作中,為2表示已經(jīng)停止
  return bHasMediaPacket;
}

在RTMP_ConnectStream()中木缝,RTMP_ReadPacket()接收的報(bào)文便锨,交給RTMP_ClientPacket()進(jìn)行分派。connect遠(yuǎn)程調(diào)用發(fā)出后的_result響應(yīng)我碟,也屬于0x14命令放案,我們需要繼續(xù)解析HandleInvoke()這個(gè)函數(shù),代碼如下:

static int
HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize)
{
  AMFObject obj;
  AVal method;
  int txn;
  int ret = 0, nRes;

  // 確保響應(yīng)報(bào)文是0x14的命令字
  if (body[0] != 0x02)
    {
      RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet",
          __FUNCTION__);
      return 0;
    }

  // 將各參數(shù)以無(wú)名稱的對(duì)象屬性方式進(jìn)行解析
  nRes = AMF_Decode(&obj, body, nBodySize, FALSE);
  if (nRes < 0)
    {
      RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
      return 0;
    }

  AMF_Dump(&obj);

  // 獲取過(guò)程名稱和流水號(hào)
  AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
  txn = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
  RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val);

  // 過(guò)程名稱為_(kāi)result
  if (AVMATCH(&method, &av__result))
    {
      AVal methodInvoked = {0};
      int i;

      // 刪除請(qǐng)求隊(duì)列中的流水項(xiàng)
      for (i=0; i<r->m_numCalls; i++) {
        if (r->m_methodCalls[i].num == txn) {
          methodInvoked = r->m_methodCalls[i].name;
          AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE);
          break;
        }
      }
      if (!methodInvoked.av_val) {
        RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %d without matching request",
          __FUNCTION__, txn);
        goto leave;
      }

      RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__,
          methodInvoked.av_val);

      // 找到了連接請(qǐng)求矫俺,確認(rèn)是連接響應(yīng)
      if (AVMATCH(&methodInvoked, &av_connect))
        {
          // 客戶端推流
          if (r->Link.protocol & RTMP_FEATURE_WRITE)
            {
              // 通知服務(wù)器釋放流通道和清理推流資源
              SendReleaseStream(r);
              SendFCPublish(r);
            }
          // 客戶端拉流
          else
            {
              // 設(shè)置服務(wù)器的應(yīng)答窗口大小
              RTMP_SendServerBW(r);
              RTMP_SendCtrl(r, 3, 0, 300);
            }
          // 發(fā)送創(chuàng)建流通道請(qǐng)求
          RTMP_SendCreateStream(r);
        }
      // 找到了創(chuàng)建流請(qǐng)求吱殉,確認(rèn)是創(chuàng)建流的響應(yīng)
      else if (AVMATCH(&methodInvoked, &av_createStream))
        {
          // 從響應(yīng)中取流ID
          r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));

          // 客戶端推流
          if (r->Link.protocol & RTMP_FEATURE_WRITE)
            {
              // 發(fā)送推流點(diǎn)
              SendPublish(r);
            }
          // 客戶端拉流
          else
            {
              // 發(fā)送拉流點(diǎn)
              SendPlay(r);
              // 發(fā)送拉流緩沖時(shí)長(zhǎng)
              RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS);
            }
        }
      // 找到了推流和拉流請(qǐng)求,確認(rèn)是它們的響應(yīng)
      else if (AVMATCH(&methodInvoked, &av_play) ||
        AVMATCH(&methodInvoked, &av_publish))
        {
          // 標(biāo)識(shí)已經(jīng)進(jìn)入流狀態(tài)
          r->m_bPlaying = TRUE;
        }
      free(methodInvoked.av_val);
    }
  // 過(guò)程名稱為ping
  else if (AVMATCH(&method, &av_ping))
    {
      // 發(fā)送pong響應(yīng)
      SendPong(r, txn);
    }
  // 過(guò)程名稱為_(kāi)error
  else if (AVMATCH(&method, &av__error))
    {
      RTMP_Log(RTMP_LOGERROR, "rtmp server sent error");
    }
  // 過(guò)程名稱為close
  else if (AVMATCH(&method, &av_close))
    {
      RTMP_Log(RTMP_LOGERROR, "rtmp server requested close");
      RTMP_Close(r);
    }
  // 過(guò)程名稱為onStatus
  else if (AVMATCH(&method, &av_onStatus))
    {
      // 獲取返回對(duì)象及其主要屬性
      AMFObject obj2;
      AVal code, level;
      AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
      AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
      AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);

      RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);

      // 出錯(cuò)返回
      if (AVMATCH(&code, &av_NetStream_Failed)
          || AVMATCH(&code, &av_NetStream_Play_Failed)
          || AVMATCH(&code, &av_NetStream_Play_StreamNotFound)
          || AVMATCH(&code, &av_NetConnection_Connect_InvalidApp))
        {
          r->m_stream_id = -1;
          RTMP_Close(r);
          RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val);
        }

      // 啟動(dòng)拉流成功
      else if (AVMATCH(&code, &av_NetStream_Play_Start))
        {
          int i;
          r->m_bPlaying = TRUE;
          for (i = 0; i < r->m_numCalls; i++)
            {
              if (AVMATCH(&r->m_methodCalls[i].name, &av_play))
                {
                  AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
                  break;
                }
            }
        }

      // 啟動(dòng)推流成功
      else if (AVMATCH(&code, &av_NetStream_Publish_Start))
        {
          int i;
          r->m_bPlaying = TRUE;
          for (i = 0; i < r->m_numCalls; i++)
            {
              if (AVMATCH(&r->m_methodCalls[i].name, &av_publish))
                {
                  AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
                  break;
                }
            }
        }

      // 通知流完成或結(jié)束
      else if (AVMATCH(&code, &av_NetStream_Play_Complete)
          || AVMATCH(&code, &av_NetStream_Play_Stop)
          || AVMATCH(&code, &av_NetStream_Play_UnpublishNotify))
        {
          RTMP_Close(r);
          ret = 1;
        }

      // 通知流暫停
      else if (AVMATCH(&code, &av_NetStream_Pause_Notify))
        {
          if (r->m_pausing == 1 || r->m_pausing == 2)
          {
            RTMP_SendPause(r, FALSE, r->m_pauseStamp);
            r->m_pausing = 3;
          }
        }
    }
leave:
  AMF_Reset(&obj);
  return ret;
}

上面的注釋顯示厘托,在HandleInvoke()函數(shù)的處理分支中考婴,有涉及connect調(diào)用的_result處理。在收到_result響應(yīng)后催烘,又根據(jù)推流或拉流的標(biāo)志沥阱,繼續(xù)后續(xù)的流程。
HandleInvoke()函數(shù)非常強(qiáng)大伊群,除了對(duì)各種_result進(jìn)行處理外考杉,它還支持onStatus操作,后續(xù)的createStream響應(yīng)舰始,publish和play推拉流的狀態(tài)反饋崇棠,都會(huì)集中在這里處理。

connect遠(yuǎn)程調(diào)用的其它交互:

connect和_result交互是必須的丸卷,事實(shí)上枕稀,在_result返回之前,服務(wù)器還可以先返回一些可選的設(shè)置報(bào)文谜嫉,例如:

服務(wù)器設(shè)置客戶端的應(yīng)答窗口大形馈:

int
RTMP_ClientPacket(RTMP *r, RTMPPacket *packet)
{
    ......
  switch (packet->m_packetType)
    {
    ......
    case 0x05:
      /* server bw */
      HandleServerBW(r, packet);
      break;
    ......
}

static void
HandleServerBW(RTMP *r, const RTMPPacket *packet)
{
  r->m_nServerBW = AMF_DecodeInt32(packet->m_body);
  RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, r->m_nServerBW);
}

服務(wù)器設(shè)置客戶端的發(fā)送帶寬大小:

int
RTMP_ClientPacket(RTMP *r, RTMPPacket *packet)
{
    ......
  switch (packet->m_packetType)
    {
    ......
    case 0x06:
      /* client bw */
      HandleClientBW(r, packet);
      break;
    ......
}

static void
HandleClientBW(RTMP *r, const RTMPPacket *packet)
{
  r->m_nClientBW = AMF_DecodeInt32(packet->m_body);
  if (packet->m_nBodySize > 4)
    r->m_nClientBW2 = packet->m_body[4];
  else
    r->m_nClientBW2 = -1;
  RTMP_Log(RTMP_LOGDEBUG, "%s: client BW = %d %d", __FUNCTION__, r->m_nClientBW,
      r->m_nClientBW2);
}

服務(wù)器設(shè)置客戶端的接收塊大秀謇肌:

int
RTMP_ClientPacket(RTMP *r, RTMPPacket *packet)
{
    ......
  switch (packet->m_packetType)
    {
    case 0x01:
      /* chunk size */
      HandleChangeChunkSize(r, packet);
      break;
    ......
}

static void
HandleChangeChunkSize(RTMP *r, const RTMPPacket *packet)
{
  if (packet->m_nBodySize >= 4)
    {
      r->m_inChunkSize = AMF_DecodeInt32(packet->m_body);
      RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__,
          r->m_inChunkSize);
    }
}
createStream遠(yuǎn)程調(diào)用:

在成功處理connect的_result響應(yīng)之后哆档,即表示服務(wù)器接收了客戶端的第一步的地址請(qǐng)求,接下來(lái)客戶端需要根據(jù)推流或拉流的場(chǎng)景住闯,發(fā)起后續(xù)的請(qǐng)求了瓜浸,精簡(jiǎn)HandleInvoke()后的代碼如下:

static int
HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize)
{
  if (AVMATCH(&method, &av__result))
    {
      ......
      if (AVMATCH(&methodInvoked, &av_connect))
        {
          // 判斷推流標(biāo)志是否設(shè)置
          if (r->Link.protocol & RTMP_FEATURE_WRITE)
            {
              // 推流準(zhǔn)備(必須)
              SendReleaseStream(r);
              SendFCPublish(r);
            }
          else
            {
              // 拉流準(zhǔn)備(可選)
              RTMP_SendServerBW(r);
              RTMP_SendCtrl(r, 3, 0, 300);
            }
          
          // 創(chuàng)建流通道
          RTMP_SendCreateStream(r);
        }
    }
      ......
}

上面的代碼和注釋顯示澳淑,無(wú)論是推流還是拉流,都需要?jiǎng)?chuàng)建流通道插佛,但是在之前吏饿,有一些不同的額外操作要設(shè)置歼郭。

推流的額外操作:

推流之前含滴,需要先釋放掉當(dāng)前的推流點(diǎn)的資源甥材,并且準(zhǔn)備好新的推流點(diǎn)。

// 發(fā)送釋放推流點(diǎn)請(qǐng)求
static int
SendReleaseStream(RTMP *r)
{
  RTMPPacket packet;
  char pbuf[1024], *pend = pbuf + sizeof(pbuf);
  char *enc;

  packet.m_nChannel = 0x03;     /* control channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
  packet.m_packetType = 0x14;   /* INVOKE */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  // 壓入遠(yuǎn)程過(guò)程調(diào)用的參數(shù)谢床,尤其是推流點(diǎn)
 enc = packet.m_body;
  enc = AMF_EncodeString(enc, pend, &av_releaseStream);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  *enc++ = AMF_NULL;
  enc = AMF_EncodeString(enc, pend, &r->Link.playpath);    // 推流點(diǎn)
  if (!enc)
    return FALSE;

  packet.m_nBodySize = enc - packet.m_body;

  return RTMP_SendPacket(r, &packet, FALSE);
}

// 發(fā)送準(zhǔn)備推流點(diǎn)請(qǐng)求
static int
SendFCPublish(RTMP *r)
{
  RTMPPacket packet;
  char pbuf[1024], *pend = pbuf + sizeof(pbuf);
  char *enc;

  packet.m_nChannel = 0x03;     /* control channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
  packet.m_packetType = 0x14;   /* INVOKE */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  // 壓入遠(yuǎn)程過(guò)程調(diào)用的參數(shù)兄一,尤其是推流點(diǎn)
  enc = packet.m_body;
  enc = AMF_EncodeString(enc, pend, &av_FCPublish);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  *enc++ = AMF_NULL;
  enc = AMF_EncodeString(enc, pend, &r->Link.playpath);    // 推流點(diǎn)
  if (!enc)
    return FALSE;

  packet.m_nBodySize = enc - packet.m_body;

  return RTMP_SendPacket(r, &packet, FALSE);
}
拉流的額外操作:

通知服務(wù)器收到指定大小的客戶端數(shù)據(jù)后厘线,需要發(fā)送應(yīng)答识腿。拉流時(shí)客戶端基本不發(fā)送流量,個(gè)人感覺(jué)這步有些多余造壮。

int
RTMP_SendServerBW(RTMP *r)
{
  RTMPPacket packet;
  char pbuf[256], *pend = pbuf + sizeof(pbuf);

  packet.m_nChannel = 0x02;     /* control channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
  packet.m_packetType = 0x05;   /* Server BW */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  packet.m_nBodySize = 4;

  // 壓入4字節(jié)帶寬
  AMF_EncodeInt32(packet.m_body, pend, r->m_nServerBW);
  return RTMP_SendPacket(r, &packet, FALSE);
}

// 控制報(bào)文的含義比較多渡讼,具體也需要查看手冊(cè),對(duì)應(yīng)拉流來(lái)說(shuō)耳璧,主要是設(shè)置服務(wù)器的緩存時(shí)間成箫。
int
RTMP_SendCtrl(RTMP *r, short nType, unsigned int nObject, unsigned int nTime)
{
  RTMPPacket packet;
  char pbuf[256], *pend = pbuf + sizeof(pbuf);
  int nSize;
  char *buf;

  packet.m_nChannel = 0x02;     /* control channel (ping) */
  packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
  packet.m_packetType = 0x04;   /* ctrl */
  packet.m_nTimeStamp = 0;      /* RTMP_GetTime(); */
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  switch(nType) {
  case 0x03: nSize = 10; break; /* buffer time */
  case 0x1A: nSize = 3; break;  /* SWF verify request */
  case 0x1B: nSize = 44; break; /* SWF verify response */
  default: nSize = 6; break;
  }

  packet.m_nBodySize = nSize;

  buf = packet.m_body;
  buf = AMF_EncodeInt16(buf, pend, nType);

  if (nType == 0x1B)
    {
    ......
    }
  else if (nType == 0x1A)
    {
    ......
   }
  else
    {
      if (nSize > 2)
        buf = AMF_EncodeInt32(buf, pend, nObject);

      if (nSize > 6)
        buf = AMF_EncodeInt32(buf, pend, nTime);
    }

  return RTMP_SendPacket(r, &packet, FALSE);
}
創(chuàng)建流通道操作:
// 發(fā)送創(chuàng)建流請(qǐng)求
int
RTMP_SendCreateStream(RTMP *r)
{
  RTMPPacket packet;
  char pbuf[256], *pend = pbuf + sizeof(pbuf);
  char *enc;

  packet.m_nChannel = 0x03;     /* control channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
  packet.m_packetType = 0x14;   /* INVOKE */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  // 壓入遠(yuǎn)程過(guò)程調(diào)用的參數(shù)
  enc = packet.m_body;
  enc = AMF_EncodeString(enc, pend, &av_createStream);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  *enc++ = AMF_NULL;            /* NULL */

  packet.m_nBodySize = enc - packet.m_body;

  return RTMP_SendPacket(r, &packet, TRUE);
}
createStream遠(yuǎn)程調(diào)用響應(yīng):

服務(wù)器收到createStream請(qǐng)求后,若無(wú)錯(cuò)誤旨枯,則返回流ID蹬昌,有了流ID客戶端就可以進(jìn)行后續(xù)的推流或拉流了,邏輯如下:

static int
HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize)
{
  if (AVMATCH(&method, &av__result))
    {
      ......
      else if (AVMATCH(&methodInvoked, &av_createStream))
        {
          // 保存流ID
          r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));

          if (r->Link.protocol & RTMP_FEATURE_WRITE)
            {
              // 開(kāi)始推流
              SendPublish(r);
            }
          else
            {
              // 開(kāi)始拉流
              SendPlay(r);
              // 控制該流的緩沖時(shí)長(zhǎng)
              RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS);
            }
        }
      ......
    }
}

上面的代碼和注釋顯示攀隔,推流和拉流皂贩,分別進(jìn)行不同的邏輯處理。

推流處理publish調(diào)用:

對(duì)推流來(lái)說(shuō)昆汹,客戶端需要請(qǐng)求服務(wù)器將流ID和推送點(diǎn)進(jìn)行關(guān)聯(lián)明刷。請(qǐng)求代碼如下:

static int
SendPublish(RTMP *r)
{
  RTMPPacket packet;
  char pbuf[1024], *pend = pbuf + sizeof(pbuf);
  char *enc;

  packet.m_nChannel = 0x04;     /* source channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
  packet.m_packetType = 0x14;   /* INVOKE */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = r->m_stream_id;    // 指定流ID
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  enc = packet.m_body;
  enc = AMF_EncodeString(enc, pend, &av_publish);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  *enc++ = AMF_NULL;
  enc = AMF_EncodeString(enc, pend, &r->Link.playpath);    // 指定推送點(diǎn)
  if (!enc)
    return FALSE;

  /* FIXME: should we choose live based on Link.lFlags & RTMP_LF_LIVE? */
  enc = AMF_EncodeString(enc, pend, &av_live);
  if (!enc)
    return FALSE;

  packet.m_nBodySize = enc - packet.m_body;

  return RTMP_SendPacket(r, &packet, TRUE);    // 需要反饋
}
拉流處理play調(diào)用:

對(duì)拉流來(lái)說(shuō),客戶端需要請(qǐng)求服務(wù)器將流ID和播放點(diǎn)進(jìn)行關(guān)聯(lián)满粗。請(qǐng)求代碼如下:

static int
SendPlay(RTMP *r)
{
  RTMPPacket packet;
  char pbuf[1024], *pend = pbuf + sizeof(pbuf);
  char *enc;

  packet.m_nChannel = 0x08;     /* we make 8 our stream channel */
  packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
  packet.m_packetType = 0x14;   /* INVOKE */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = r->m_stream_id;        /*0x01000000; */    // 指定流ID
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  enc = packet.m_body;
  enc = AMF_EncodeString(enc, pend, &av_play);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  *enc++ = AMF_NULL;

  RTMP_Log(RTMP_LOGDEBUG, "%s, seekTime=%d, stopTime=%d, sending play: %s",
      __FUNCTION__, r->Link.seekTime, r->Link.stopTime,
      r->Link.playpath.av_val);
  enc = AMF_EncodeString(enc, pend, &r->Link.playpath);    // 指定推送點(diǎn)
  if (!enc)
    return FALSE;

  // 指定開(kāi)始時(shí)間
  /* Optional parameters start and len.
   *
   * start: -2, -1, 0, positive number
   *  -2: looks for a live stream, then a recorded stream,
   *      if not found any open a live stream
   *  -1: plays a live stream
   * >=0: plays a recorded streams from 'start' milliseconds
   */
  if (r->Link.lFlags & RTMP_LF_LIVE)
    enc = AMF_EncodeNumber(enc, pend, -1000.0);
  else
    {
      if (r->Link.seekTime > 0.0)
        enc = AMF_EncodeNumber(enc, pend, r->Link.seekTime);    /* resume from here */
      else
        enc = AMF_EncodeNumber(enc, pend, 0.0); /*-2000.0);*/ /* recorded as default, -2000.0 is not reliable since that freezes the
 player if the stream is not found */
    }
  if (!enc)
    return FALSE;

  // 指點(diǎn)播放時(shí)長(zhǎng)
  /* len: -1, 0, positive number
   *  -1: plays live or recorded stream to the end (default)
   *   0: plays a frame 'start' ms away from the beginning
   *  >0: plays a live or recoded stream for 'len' milliseconds
   */
  /*enc += EncodeNumber(enc, -1.0); */ /* len */
  if (r->Link.stopTime)
    {
      enc = AMF_EncodeNumber(enc, pend, r->Link.stopTime - r->Link.seekTime);
      if (!enc)
        return FALSE;
    }

  packet.m_nBodySize = enc - packet.m_body;

  return RTMP_SendPacket(r, &packet, TRUE);    // 需要反饋
}
publish或play狀態(tài)反饋:

在前面完成publish或play過(guò)程調(diào)用后辈末,客戶端需要等待響應(yīng)或結(jié)果反饋。繼續(xù)看HandleInvoke()函數(shù)精簡(jiǎn)后的相關(guān)代碼:

static int
HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize)
{
  ......
  else if (AVMATCH(&method, &av_onStatus))
    {
      AMFObject obj2;
      AVal code, level;
      AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
      AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
      AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);

      RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);

      // 判斷是否出錯(cuò)映皆,出錯(cuò)需中止流程
      if (AVMATCH(&code, &av_NetStream_Failed)
          || AVMATCH(&code, &av_NetStream_Play_Failed)
          || AVMATCH(&code, &av_NetStream_Play_StreamNotFound)
          || AVMATCH(&code, &av_NetConnection_Connect_InvalidApp))
        {
          r->m_stream_id = -1;
          RTMP_Close(r);
          RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val);
        }
      ......
      // 啟動(dòng)推流成功
      else if (AVMATCH(&code, &av_NetStream_Publish_Start))
        {
          int i;
          r->m_bPlaying = TRUE;    // 設(shè)置可以推流標(biāo)志
          for (i = 0; i < r->m_numCalls; i++)
            {
              if (AVMATCH(&r->m_methodCalls[i].name, &av_publish))
                {
                  AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
                  break;
                }
            }
        }
      // 啟動(dòng)拉流成功
      else if (AVMATCH(&code, &av_NetStream_Play_Start))
        {
          int i;
          r->m_bPlaying = TRUE;    // 設(shè)置可以拉流標(biāo)志
          for (i = 0; i < r->m_numCalls; i++)
            {
              if (AVMATCH(&r->m_methodCalls[i].name, &av_play))
                {
                  AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
                  break;
                }
            }
        }
      ......
    }
  ......
}

讓我們回到RTMP_ConnectStream()函數(shù)挤聘,我們留意其中的m_bPlaying標(biāo)志,由于此時(shí)標(biāo)志為TRUE捅彻,循環(huán)條件不成立檬洞,函數(shù)退出。簡(jiǎn)化代碼如下:

int
RTMP_ConnectStream(RTMP *r, int seekTime)
{
  ......
  while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet))
    {
        ......
    }
  return r->m_bPlaying;    // 返回推流或拉流準(zhǔn)許的狀態(tài)
}

當(dāng)RTMP_ConnectStream()返回TRUE的時(shí)候沟饥,librtmp庫(kù)的使用者添怔,就可以在自己的代碼層次湾戳,進(jìn)行后續(xù)的推流或拉流操作了。

推流操作代碼節(jié)選:
    // 初使化RTMP報(bào)文
    RTMPPacket packet;
    RTMPPacket_Reset(&packet);
    packet.m_body = NULL;
    packet.m_chunk = NULL;

    packet.m_nInfoField2 = pRTMP->m_stream_id;

    uint32_t starttime = RTMP_GetTime();

    while (true)
    {
        // 讀取TAG頭

        uint8_t type = 0;
        if (!ReadU8(&type, pFile))
            break;

        uint32_t datalen = 0;
        if (!ReadU24(&datalen, pFile))
            break;

        uint32_t timestamp = 0;
        if (!ReadTime(&timestamp, pFile))
            break;

        uint32_t streamid = 0;
        if (!ReadU24(&streamid, pFile))
            break;

/*
        // 跳過(guò)0x12 Script
        if (type != 0x08 && type != 0x09)
        {
            fseek(pFile, datalen + 4, SEEK_CUR);
            continue;
        }
*/

        RTMPPacket_Alloc(&packet, datalen);

        if (fread(packet.m_body, 1, datalen, pFile) != datalen)
            break;

        // 組織報(bào)文并發(fā)送
        packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
        packet.m_packetType = type;
        packet.m_hasAbsTimestamp = 0;
        packet.m_nChannel = 6;
        packet.m_nTimeStamp = timestamp;
        packet.m_nBodySize = datalen;

        if (!RTMP_SendPacket(pRTMP, &packet, 0))
        {
            printf("Send Error! \n");
            break;
        }

        printf("send type:[%d] timestamp:[%d] datasize:[%d] \n", type, timestamp, datalen);

        // 跳過(guò)PreTag
        uint32_t pretagsize = 0;
        if (!ReadU32(&pretagsize, pFile))
            break;

        // 延時(shí)广料,避免發(fā)送太快
        uint32_t timeago = (RTMP_GetTime() - starttime);
        if (timestamp > 1000 && timeago < timestamp - 1000)
        {
            printf("sleep...\n");
            usleep(100000);
        }

        RTMPPacket_Free(&packet);
    }

上面這段代碼片斷演示了使用librtmp推流砾脑,讀取FLV文件,并向上推流的例子艾杏。

拉流操作代碼節(jié)選:
    bool bSaveMP3 = true;
    FILE* pFile = fopen(bSaveMP3 ? "testrtmp.mp3" : "testrtmp.flv", "wb");

    while (RTMP_IsConnected(pRTMP))
    {
        if (bSaveMP3)
        {
            RTMPPacket packet;
            RTMPPacket_Reset(&packet);
            packet.m_body = NULL;
            packet.m_chunk = NULL;
            b = RTMP_ReadPacket(pRTMP, &packet);
            if (!b)
                break;

            if (!RTMPPacket_IsReady(&packet))
                continue;

            printf("\t headerType:[%d] \n", packet.m_headerType);
            printf("\t packetType:[%d] \n", packet.m_packetType);
            printf("\t hasAbsTimestamp:[%d] \n", packet.m_hasAbsTimestamp);
            printf("\t nChannel:[%d] \n", packet.m_nChannel);
            printf("\t nTimeStamp:[%d] \n", packet.m_nTimeStamp);
            printf("\t nInfoField2:[%d] \n", packet.m_nInfoField2);
            printf("\t nBodySize:[%d] \n", packet.m_nBodySize);
            printf("\t nBytesRead:[%d] \n", packet.m_nBytesRead);

            if (packet.m_packetType == 0x08)
            {
                fwrite(packet.m_body + 1, 1, packet.m_nBodySize - 1, pFile);
            }

            RTMPPacket_Free(&packet);
        }
        else
        {
            char sBuf[4096] = {0};
            int bytes = RTMP_Read(pRTMP, sBuf, sizeof(sBuf));
            printf("RTMP_Read() ret:[%d] \n", bytes);

            if (bytes <= 0)
                break;

            fwrite(sBuf, 1, bytes, pFile);
        }
    }

上面這段代碼片斷演示了使用librtmp拉流韧衣,并將音頻保存為MP3,或者將音視頻保存為FLV的例子购桑。

小結(jié):

源代碼解析到這一步畅铭,我想應(yīng)該大部分的關(guān)鍵流程都帶過(guò)一遍了,但是簽于我的時(shí)間和水平的關(guān)系勃蜘,很多細(xì)節(jié)目前都還是淺嘗輒止硕噩,后面若有機(jī)會(huì)再慢慢補(bǔ)上吧,希望朋友們理解缭贡。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末炉擅,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子阳惹,更是在濱河造成了極大的恐慌谍失,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,324評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件莹汤,死亡現(xiàn)場(chǎng)離奇詭異快鱼,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)纲岭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門抹竹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人荒勇,你說(shuō)我怎么就攤上這事柒莉。” “怎么了沽翔?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,328評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵兢孝,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我仅偎,道長(zhǎng)跨蟹,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,147評(píng)論 1 292
  • 正文 為了忘掉前任橘沥,我火速辦了婚禮窗轩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘座咆。我一直安慰自己痢艺,他們只是感情好仓洼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著堤舒,像睡著了一般色建。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上舌缤,一...
    開(kāi)封第一講書(shū)人閱讀 51,115評(píng)論 1 296
  • 那天箕戳,我揣著相機(jī)與錄音,去河邊找鬼国撵。 笑死陵吸,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的介牙。 我是一名探鬼主播壮虫,決...
    沈念sama閱讀 40,025評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼耻瑟!你這毒婦竟也來(lái)了旨指?” 一聲冷哼從身側(cè)響起赏酥,我...
    開(kāi)封第一講書(shū)人閱讀 38,867評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤喳整,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后裸扶,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體框都,經(jīng)...
    沈念sama閱讀 45,307評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評(píng)論 2 332
  • 正文 我和宋清朗相戀三年呵晨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了魏保。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,688評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡摸屠,死狀恐怖谓罗,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情季二,我是刑警寧澤檩咱,帶...
    沈念sama閱讀 35,409評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站胯舷,受9級(jí)特大地震影響刻蚯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜桑嘶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評(píng)論 3 325
  • 文/蒙蒙 一炊汹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧逃顶,春花似錦讨便、人聲如沸充甚。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,657評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)津坑。三九已至,卻和暖如春傲霸,著一層夾襖步出監(jiān)牢的瞬間疆瑰,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,811評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工昙啄, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留穆役,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,685評(píng)論 2 368
  • 正文 我出身青樓梳凛,卻偏偏與公主長(zhǎng)得像耿币,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子韧拒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容