WebRTC中PacedSender工作原理和代碼分析

摘抄一段 PacedSender 簡介伟墙,下面的鏈接對該模塊的工作原理做了詳細的介紹福荸,今天大致看了下這個模塊的代碼,記錄一下

在estimator根據(jù)網(wǎng)絡狀態(tài)決策出新的通信碼率(target bitrate)嫉戚,它會將這個碼率設置到pacer當中唬滑,要求pacer按照新的碼率來計算發(fā)包頻率。因為在視頻通信中域携,單幀視頻可能有上百KB,如果是當視頻幀被編碼器編碼出來后簇秒,就立即進行RTP打包發(fā)送,瞬時會發(fā)送大量的數(shù)據(jù)到網(wǎng)絡上秀鞭,可能會引起網(wǎng)絡衰減和通信惡化趋观。WebRTC引入pacer,pacer會根據(jù)estimator評估出來的碼率锋边,按照最小單位時間(5ms)做時間分片進行遞進發(fā)送數(shù)據(jù)拆内,避免瞬時對網(wǎng)絡的沖擊。pacer的目的就是讓視頻數(shù)據(jù)按照評估碼率均勻的分布在各個時間片里發(fā)送宠默, 所以在弱網(wǎng)的WiFi環(huán)境麸恍,pacer是個非常重要的關鍵步驟

via: http://livevideostack.com/portal.php?mod=view&aid=201

PacedSender 發(fā)送速率設置

GCC 估測的帶寬只會通過 SetEstimatedBitrate 方法設置到 PacedSender 中, pacing_bitrate_kbps_PacedSender 發(fā)送媒體包的速率搀矫,為GCC估測帶寬 乘以了固定系數(shù) kDefaultPaceMultiplier(2.5)

void PacedSender::SetEstimatedBitrate(uint32_t bitrate_bps) {
  if (bitrate_bps == 0)
    LOG(LS_ERROR) << "PacedSender is not designed to handle 0 bitrate.";
  CriticalSectionScoped cs(critsect_.get());
  estimated_bitrate_bps_ = bitrate_bps;
  padding_budget_->set_target_rate_kbps(estimated_bitrate_bps_ / 1000 );

  // 更新 pacing 發(fā)送速率抹沪,為 estimated_bitrate_bps_/1000 * 2.5;
  pacing_bitrate_kbps_ =
      max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) *
      kDefaultPaceMultiplier;
  alr_detector_->SetEstimatedBitrate(bitrate_bps);
}

PacedSender 包緩存隊列

該方法由 rtp_sender 模塊調用,將封裝好的視頻rtp包的元信息瓤球,如 ssrc, sequence_number等封裝成Packet數(shù)據(jù)結構存儲到隊列中融欧,并未緩存真正的媒體數(shù)據(jù)。發(fā)包時卦羡,PacedSender 會通過這些元信息噪馏,在rtp_sender中的緩存隊列中找到對應的媒體包數(shù)據(jù)

// 將視頻包元信息,instert到pacer中
void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
                               uint32_t ssrc,
                               uint16_t sequence_number,
                               int64_t capture_time_ms,
                               size_t bytes,
                               bool retransmission) {
  CriticalSectionScoped cs(critsect_.get());
  DCHECK(estimated_bitrate_bps_ > 0)
        << "SetEstimatedBitrate must be called before InsertPacket.";

  int64_t now_ms = clock_->TimeInMilliseconds();
  prober_->OnIncomingPacket(bytes);

  if (capture_time_ms < 0)
    capture_time_ms = now_ms;

  // 封裝 packet 包绿饵,放到list中 packets_
  packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number,
                                      capture_time_ms, now_ms, bytes,
                                      retransmission, packet_counter_++));
}

發(fā)包線程

線程發(fā)包間隔 5ms 計算

PacedSender 發(fā)送線程是調用時間間隔為5ms欠肾,為什么是5ms, 通過模塊中下面的方法計算的:

// 計算線程調用的時間間隔,即下面 process() 方法調用的時間間隔 D馍蕖4烫摇!吸祟!
int64_t PacedSender::TimeUntilNextProcess() {
  CriticalSectionScoped cs(critsect_.get());
    //  如果正在探測
    if (prober_->IsProbing()) {
    int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
    if (ret > 0 || (ret == 0 && !probing_send_failure_))
      return ret;
  }
  // 當前時間 減去 上一次 process()方法調用的時間瑟慈,得出時間間隔
  int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
  int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
    
  // kMinPacketLimitMs:5ms,即 process() 方法 5m 處理一次
  return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
}

kMinPacketLimitMs 的值為固定 5ms, elapsed_time_ms從代碼中可以看出屋匕,為上一次調用 process到現(xiàn)在的時間間隔葛碧,兩個的時間差即為線程要等待的時長,最大5ms过吻,之后線程會再次調用process 方法發(fā)送媒體數(shù)據(jù)进泼。

線程處理方法 process()

下面的 process() 方法中處理真正發(fā)包邏輯, 每次進入 process 都會調用 UpdateBudgetWithElapsedTime(elapsed_time_ms), 去更新(增加) media_budget_中的 bytes_remaining_ ,

bytes_remaining_ 為每次可以發(fā)送包的最大字節(jié)數(shù):

發(fā)包量 = Δt x GCC反饋帶寬 x 2.5

Δt 即elapsed_time_ms, 最大 5ms。 在下面的while 循環(huán)中缘琅,只要包隊列不為空粘都,就一直嘗試用SendPacket去發(fā)包廓推。

int32_t PacedSender::Process() {
  int64_t now_us = clock_->TimeInMicroseconds();
  CriticalSectionScoped cs(critsect_.get());
    // elapsed_time_ms: 上一次process和這次process 之間的時間間隔 elapsed_time_ms  ms
  int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
  time_last_update_us_ = now_us;
  int target_bitrate_kbps = pacing_bitrate_kbps_; // 拿到最新的 pacer 發(fā)送速率
  if (!paused_ && elapsed_time_ms > 0) {
    // 緩存的所有的包的大小刷袍,按字節(jié)算
    size_t queue_size_bytes = packets_->SizeInBytes();
    if (queue_size_bytes > 0) {
      // Assuming equal size packets and input/output rate, the average packet
      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
      // time constraint shall be met. Determine bitrate needed for that.
      packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
      int64_t avg_time_left_ms = std::max<int64_t>(
          1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs()); // 這個地方應該是 已經(jīng)緩存包時長 = 最大緩存時長 - 剩余隊列時長
      int min_bitrate_needed_kbps =
          static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
      if (min_bitrate_needed_kbps > target_bitrate_kbps)
        target_bitrate_kbps = min_bitrate_needed_kbps; // 如果計算出來的發(fā)送速率比上面?zhèn)魅氲?目標發(fā)送碼率大,則更新目標發(fā)送碼率
    }
    //  media budget 更新發(fā)送速率
    media_budget_->set_target_rate_kbps(target_bitrate_kbps);
    // 最大時間間隔 kMaxIntervalTimeMs = 30 ms
    elapsed_time_ms = min(kMaxIntervalTimeMs, elapsed_time_ms);
    UpdateBudgetWithElapsedTime(elapsed_time_ms);
  }

  bool is_probing = prober_->IsProbing();
  PacedPacketInfo pacing_info;
  size_t bytes_sent = 0;
  size_t recommended_probe_size = 0;
  if (is_probing) {
    pacing_info = prober_->CurrentCluster();
    recommended_probe_size = prober_->RecommendedMinProbeSize();
  }
    
  // 隊列不為空的情況下
  while (!packets_->Empty()) {
    // Since we need to release the lock in order to send, we first pop the
    // element from the priority queue but keep it in storage, so that we can
    // reinsert it if send fails.
    const paced_sender::Packet& packet = packets_->BeginPop();
    // 去除 隊列中的包樊展,并開始發(fā)送
    if (SendPacket(packet, pacing_info)) {
      // Send succeeded, remove it from the queue.
      // 發(fā)送成功呻纹,則記錄發(fā)送的字節(jié)數(shù),并從隊列中將包清除
      bytes_sent += packet.bytes;
      packets_->FinalizePop(packet);
//        探測期間专缠,或發(fā)送字節(jié)數(shù)超過探測的數(shù)值了雷酪,則跳出循環(huán)不再發(fā)送   這個地方和探測有關,是否有問題涝婉?哥力??
      if (is_probing && bytes_sent > recommended_probe_size)
        break;
    } else {
      // Send failed, put it back into the queue.
      packets_->CancelPop(packet);
      break;
    }
  }

  if (packets_->Empty() && !paused_) {
    // We can not send padding unless a normal packet has first been sent. If we
    // do, timestamps get messed up.
    if (packet_counter_ > 0) {
    // 計算是否應該發(fā)送 padding 包
    int padding_needed = static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent) : padding_budget_->bytes_remaining());
        
        if (padding_needed > 0) {
          bytes_sent += SendPadding(padding_needed, pacing_info);
        }
    }
  }
    
  if (is_probing) {
    probing_send_failure_ = bytes_sent == 0;
    if (!probing_send_failure_)
      prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent);
  }
  alr_detector_->OnBytesSent(bytes_sent, now_us / 1000);

  return 0;
}

發(fā)包方法SendPacket()

該方法由上面的while發(fā)包循環(huán)調用墩弯,發(fā)包后會調用 UpdateBudgetWithBytesSent(packet.bytes)media_budget_ 減去packet.bytes長度的發(fā)包預算, 當發(fā)博包循環(huán)走幾次之后吩跋,media_budget中的預算長度被消耗完,即 <= 0渔工, 此時 media_budget_->bytes_remaining() 方法會做 max(0, bytes_remaining_) 處理锌钮,即返回0 ,而發(fā)包前會判斷 media_budget_->bytes_remaining() == 0 ,滿足條件就return false不發(fā)了引矩。

bool PacedSender::SendPacket(const paced_sender::Packet& packet,
                             const PacedPacketInfo& pacing_info) {
    // 是否暫停發(fā)包
    if (paused_)
    return false;
    
    
    //  media  budget 剩余預算字節(jié)數(shù)為 0梁丘,停止發(fā)包
  if (media_budget_->bytes_remaining() == 0 &&
      pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) {
    return false;
  }

  critsect_->Enter();
  const bool success = packet_sender_->TimeToSendPacket(
      packet.ssrc, packet.sequence_number, packet.capture_time_ms,
      packet.retransmission, pacing_info);
  critsect_->Leave();

  if (success) {
    // TODO(holmer): High priority packets should only be accounted for if we
    // are allocating bandwidth for audio.
    if (packet.priority != kHighPriority) { // 包的優(yōu)先級不為最高優(yōu)先級,更新發(fā)送的字節(jié)數(shù)
      // Update media bytes sent.
      UpdateBudgetWithBytesSent(packet.bytes);
    }
  }

  return success;
}

SendPacket 方法最終會調用 rtp_sender中的方法旺韭,將ssrc氛谜,sequence_number 等參數(shù)傳遞過去,rtp_sender通過這些值找到真正的視頻媒體包区端,最終發(fā)送到到網(wǎng)絡上混蔼。

media_budget簡介

media_budget 是在 PacedSender中封裝的一個類,全部代碼如下珊燎,注釋做了解釋:

class IntervalBudget {
 public:
  explicit IntervalBudget(int initial_target_rate_kbps)
      : target_rate_kbps_(initial_target_rate_kbps),
        bytes_remaining_(0) {}

  void set_target_rate_kbps(int target_rate_kbps) {
    //更新發(fā)送速率
    target_rate_kbps_ = target_rate_kbps;
      bytes_remaining_ =
        max(-kWindowMs * target_rate_kbps_ / 8, bytes_remaining_);
  }

  void IncreaseBudget(int64_t delta_time_ms) {
    // 估計在 delta 時間惭嚣, 在帶寬為 target_rate_kbps 的情況可以發(fā)送出去多少字節(jié)
    int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;
      
    if (bytes_remaining_ < 0) {
      // We overused last interval, compensate this interval.
      bytes_remaining_ = bytes_remaining_ + bytes;
    } else {
      // If we underused last interval we can't use it this interval.
      bytes_remaining_ = bytes;
    }
  }

  //更新實際發(fā)送的字節(jié)數(shù), 從bytes_remaining_減去
  void UseBudget(size_t bytes) {
    bytes_remaining_ = max(bytes_remaining_ - static_cast<int>(bytes),
                                -kWindowMs * target_rate_kbps_ / 8);
  }
  // 幾次發(fā)送循環(huán)后悔政,發(fā)送的總字節(jié)數(shù)大于開始的 bytes_remaining_晚吞,bytes_remaining_ <= 0,改方法返回0
  size_t bytes_remaining() const {
    return static_cast<size_t>(max(0, bytes_remaining_));
  }

  int target_rate_kbps() const { return target_rate_kbps_; }

 private:
  static const int kWindowMs = 500; // window 500 ms

  int target_rate_kbps_;
  int bytes_remaining_;
};

PacedSender 原理總結

到這可以知道PacedSender工作原理了, 每次發(fā)包前會更新media_budget中預算bytes_remaining_ 的大小谋国,而每次發(fā)送時間(<= 5ms)內(nèi)最多發(fā)送 bytes_remaining_ 字節(jié)數(shù)槽地,從而達到限制和平滑帶寬的目的,PacedSenderpadding發(fā)送的原理和此類似。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末捌蚊,一起剝皮案震驚了整個濱河市集畅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌缅糟,老刑警劉巖挺智,帶你破解...
    沈念sama閱讀 216,651評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異窗宦,居然都是意外死亡赦颇,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評論 3 392
  • 文/潘曉璐 我一進店門赴涵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來媒怯,“玉大人,你說我怎么就攤上這事髓窜∩劝” “怎么了?”我有些...
    開封第一講書人閱讀 162,931評論 0 353
  • 文/不壞的土叔 我叫張陵寄纵,是天一觀的道長鳖敷。 經(jīng)常有香客問我,道長擂啥,這世上最難降的妖魔是什么哄陶? 我笑而不...
    開封第一講書人閱讀 58,218評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮哺壶,結果婚禮上屋吨,老公的妹妹穿的比我還像新娘。我一直安慰自己山宾,他們只是感情好至扰,可當我...
    茶點故事閱讀 67,234評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著资锰,像睡著了一般敢课。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上绷杜,一...
    開封第一講書人閱讀 51,198評論 1 299
  • 那天直秆,我揣著相機與錄音,去河邊找鬼鞭盟。 笑死圾结,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的齿诉。 我是一名探鬼主播筝野,決...
    沈念sama閱讀 40,084評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼晌姚,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了歇竟?” 一聲冷哼從身側響起挥唠,我...
    開封第一講書人閱讀 38,926評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎焕议,沒想到半個月后宝磨,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,341評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡号坡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,563評論 2 333
  • 正文 我和宋清朗相戀三年懊烤,在試婚紗的時候發(fā)現(xiàn)自己被綠了梯醒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宽堆。...
    茶點故事閱讀 39,731評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖茸习,靈堂內(nèi)的尸體忽然破棺而出畜隶,到底是詐尸還是另有隱情,我是刑警寧澤号胚,帶...
    沈念sama閱讀 35,430評論 5 343
  • 正文 年R本政府宣布籽慢,位于F島的核電站,受9級特大地震影響猫胁,放射性物質發(fā)生泄漏箱亿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,036評論 3 326
  • 文/蒙蒙 一弃秆、第九天 我趴在偏房一處隱蔽的房頂上張望届惋。 院中可真熱鬧,春花似錦菠赚、人聲如沸脑豹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瘩欺。三九已至,卻和暖如春拌牲,著一層夾襖步出監(jiān)牢的瞬間俱饿,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評論 1 269
  • 我被黑心中介騙來泰國打工塌忽, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留拍埠,地道東北人。 一個月前我還...
    沈念sama閱讀 47,743評論 2 368
  • 正文 我出身青樓砚婆,卻偏偏與公主長得像械拍,于是被迫代替她去往敵國和親突勇。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,629評論 2 354

推薦閱讀更多精彩內(nèi)容