WebRTC PacedSender 原理分析(一)

PacedSender 的族普關(guān)系

paced_class_uml_001.png
  • PacedSender繼承Module類,實現(xiàn)其Process和TimeUntilNextProcess方法,其中TimeUntilNextProcess的實現(xiàn)便是相隔多少時間Process函數(shù)會被paced_thread回調(diào)一次

  • PacedSender類依賴PacingController類事實上,PacedSender把大部分工作都交給了PacingController

    和PacketRouter

PacedSender 入隊操作

#modules/pacing/paced_sender.cc
void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  rtc::CritScope cs(&critsect_);
  pacing_controller_.EnqueuePacket(std::move(packet));
}
  • 通過RTPSenderVideo::SendVideoPacket將rtp包通過回調(diào)EnqueuePacket將rtp包存入PacedSender所管理的隊列當中
  • PacedSender::EnqueuePacket把工作交給PacingController
#modules/pacing/pacing_controller.cc
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
      << "SetPacingRate must be called before InsertPacket.";

  Timestamp now = CurrentTime();
  prober_.OnIncomingPacket(packet->payload_size());

  if (packet->capture_time_ms() < 0) {
    packet->set_capture_time_ms(now.ms());
  }

  RTC_CHECK(packet->packet_type());
  int priority = GetPriorityForType(*packet->packet_type());
  packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
  • 如果capture_time_ms小于0,在這里為期初始化時間
  • 獲取優(yōu)先級,kAudio(0),kRetransmission(1),kVideo(2),kPadding(3)
  • 最終根據(jù)優(yōu)先級將packet放入packet_queue_隊列

RoundRobinPacketQueue原理分析

RoundRobinPacketQueue_01.png
  • RoundRobinPacketQueue隊列的核心實現(xiàn)是內(nèi)部管理4個數(shù)據(jù)結(jié)構(gòu)
  • streams_容器用來管理以ssrc為key,以Stream對象為value的容器,依次可以看出,對于不同ssrc的流都會被該容器所管理
  • rtp_packets_列表用來托管真正的rtp流對應(yīng)std::unique_ptr<RtpPacketToSend>,所有的的發(fā)送真實的rtp流都會存到這里,后續(xù)發(fā)送到網(wǎng)絡(luò)通過從該列表中獲得發(fā)送
  • enqueue_times_集合用來記錄每次rtp流入隊列的時間
  • 在每個數(shù)據(jù)包插入到隊列的時候會創(chuàng)建一個QueuedPacket,同時會根據(jù)QueuedPacket的優(yōu)先級創(chuàng)建一個StreamPrioKey對象,并且會以此對象為key,該包的ssrc值為value,將其插入到stream_priorities_集合
#modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(int priority,
                                 Timestamp enqueue_time,
                                 uint64_t enqueue_order,
                                 std::unique_ptr<RtpPacketToSend> packet) {
  uint32_t ssrc = packet->Ssrc();
  uint16_t sequence_number = packet->SequenceNumber();
  int64_t capture_time_ms = packet->capture_time_ms();
  DataSize size =
      DataSize::bytes(send_side_bwe_with_overhead_
                          ? packet->size()
                          : packet->payload_size() + packet->padding_size());
  auto type = packet->packet_type();
  RTC_DCHECK(type.has_value());

  rtp_packets_.push_front(std::move(packet));
  Push(QueuedPacket(
      priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
      size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
      enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
}
  • 首先得到ssrc,sequence_number,capture_time_ms,size(rtp包的大小)
  • 將RtpPacketToSend包通過rtp_packets_.push_front存入rtp_packets_列表
  • 以各參數(shù)創(chuàng)建QueuedPacket,由此可見每個RtpPacketToSend對應(yīng)一個QueuedPacket,但是它并不正在存放RtpPacketToSend數(shù)據(jù),只是記錄了其szie,ssrc,sequence_number,以及rtp_packets_.begin()迭代器頭,因為每次將RtpPacketToSend插入到rtp_packets_列表都是從頭部插入,這里相當于得到其索引,便于后續(xù)發(fā)送到網(wǎng)絡(luò)使用

QueuedPacket數(shù)據(jù)結(jié)構(gòu)的實現(xiàn)

RoundRobinPacketQueue_QueuedPacket.png
  • QueuedPacket重要的成員變量就是packet_it_,它就是真實rtp包的索引所在
  • QueuedPacket提供了如下函數(shù)用于獲取當前QueuedPacket對應(yīng)的RtpPacketToSend包
#modules/pacing/round_robin_packet_queue.cc
std::unique_ptr<RtpPacketToSend>
RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
  return packet_it_ ? std::move(**packet_it_) : nullptr;
}
  • 與上面分析對應(yīng)通過std::move(**packet_it_)返回

Stream數(shù)據(jù)結(jié)構(gòu)的實現(xiàn)

RoundRobinPacketQueue_Stream.png
  • 以下結(jié)合代碼來分析該數(shù)據(jù)結(jié)構(gòu)
##modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
  auto stream_info_it = streams_.find(packet.ssrc());
  if (stream_info_it == streams_.end()) {
    stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
    stream_info_it->second.priority_it = stream_priorities_.end();
    stream_info_it->second.ssrc = packet.ssrc();
  }

  Stream* stream = &stream_info_it->second;

  if (stream->priority_it == stream_priorities_.end()) {
    // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
    RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
  } else if (packet.priority() < stream->priority_it->first.priority) {
    // If the priority of this SSRC increased, remove the outdated StreamPrioKey
    // and insert a new one with the new priority. Note that |priority_| uses
    // lower ordinal for higher priority.
    stream_priorities_.erase(stream->priority_it);
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
  }
  RTC_CHECK(stream->priority_it != stream_priorities_.end());

  // In order to figure out how much time a packet has spent in the queue while
  // not in a paused state, we subtract the total amount of time the queue has
  // been paused so far, and when the packet is popped we subtract the total
  // amount of time the queue has been paused at that moment. This way we
  // subtract the total amount of time the packet has spent in the queue while
  // in a paused state.
  UpdateQueueTime(packet.enqueue_time());
  packet.SubtractPauseTime(pause_time_sum_);

  size_packets_ += 1;
  size_ += packet.size();

  stream->packet_queue.push(packet);
}
  • 根據(jù)QueuedPacket的ssrc來查詢streams_集合中是否有該ssrc對應(yīng)的Stream對象,如果沒有則根據(jù)該ssrc實例化一個Stream對象并以ssrc為key將其插入到streams_集合
  • 在插入后Stream的成員變量priority_it是指向stream_priorities_.end的
  • 下面的處理如果Stream的成員變量priority_it是指向stream_priorities_.end則為當前的QueuedPacket包通過stream_priorities__.emplace 以StreamPrioKey對象為key,以ssrc為value插入到stream_priorities_集合當中并放回當前迭代器賦值給Stream的成員變量priority_it
  • 假設(shè)同一路stream也就是同一個ssrc,在插入的時候,本次的priority小于上一次的priority(越小優(yōu)先級越高?),那么首先需要將原來stream_priorities_管理的擦除,然后在重新創(chuàng)建StreamPrioKey插入到stream_priorities_
  • 最后通過stream->packet_queue.push(packet)將QueuedPacket插入到Stream管理的packet_queue集合當中
  • 經(jīng)過以上分析大致可得出如下關(guān)系
RoundRobinPacketQueue_Stream_2.png
  • 每一個RtpPacketToSend包對應(yīng)一個QueuedPacket對象
  • 每一路ssrc對應(yīng)的stream對應(yīng)一個Stream,而每一個Stream對象管理著入隊的多個QueuedPacket

PacedSender 出隊操作

  • PacedSender 出隊操作是一個十分復雜的過程,涉及到動態(tài)碼率估計,webrtc經(jīng)過bwe發(fā)送端碼率估計評測出新碼率后會將碼率作用到paced模塊,讓PacedSender按照新的碼率進行數(shù)據(jù)發(fā)送,本文為便于分析不考慮碼率估計進行分析假設(shè)碼率已知
  • PacedSender 出隊操作要從PacedSender派生Module模塊談起,經(jīng)paced_thread_處理,檢測PacedSender重載的TimeUntilNextProcess函數(shù)判斷下一次回調(diào)PacedSender::Process函數(shù)
  • webrtc初始化創(chuàng)建PacedSender過程會通過SetPacingRates設(shè)置初始化碼率
#modules/pacing/paced_sender.cc
int64_t PacedSender::TimeUntilNextProcess() {
  rtc::CritScope cs(&critsect_);

  // When paused we wake up every 500 ms to send a padding packet to ensure
  // we won't get stuck in the paused state due to no feedback being received.
  TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();
  if (pacing_controller_.IsPaused()) {
    return std::max(PacingController::kPausedProcessInterval - elapsed_time,
                    TimeDelta::Zero())
        .ms();
  }

  auto next_probe = pacing_controller_.TimeUntilNextProbe();
  if (next_probe) {
    return next_probe->ms();
  }

  const TimeDelta min_packet_limit = TimeDelta::ms(5);
  return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();
}
  • 首先通過pacing_controller_.TimeElapsedSinceLastProcess()得到已經(jīng)流逝的時間,也就是當前時間和上一次處理時間相減
  • 假設(shè)next_probe為-1或nullptr也就是不做碼率探測
  • 默認最小發(fā)包間隔是5ms,這里將min_packet_limit - elapsed_time和0取最大值,超過5ms則立即執(zhí)行
void PacedSender::Process() {
  rtc::CritScope cs(&critsect_);
  pacing_controller_.ProcessPackets();
}
  • PacedSender將真正的處理交給PacingController
void PacingController::ProcessPackets() {
  Timestamp now = CurrentTime();
  TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
  ....
  if (paused_)
    return;

  if (elapsed_time > TimeDelta::Zero()) {
    DataRate target_rate = pacing_bitrate_;
    DataSize queue_size_data = packet_queue_.Size();
    if (queue_size_data > DataSize::Zero()) {
      // Assuming equal size packets and input/output rate, the average packet
      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
      // time constraint shall be met. Determine bitrate needed for that.
      packet_queue_.UpdateQueueTime(CurrentTime());
      if (drain_large_queues_) {
        TimeDelta avg_time_left =
            std::max(TimeDelta::ms(1),
                     queue_time_limit - packet_queue_.AverageQueueTime());
        DataRate min_rate_needed = queue_size_data / avg_time_left;
        if (min_rate_needed > target_rate) {
          target_rate = min_rate_needed;
          RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
                              << target_rate.kbps();
        }
      }
    }

    media_budget_.set_target_rate_kbps(target_rate.kbps());
    UpdateBudgetWithElapsedTime(elapsed_time);
  }

  bool is_probing = prober_.IsProbing();
  PacedPacketInfo pacing_info;
  absl::optional<DataSize> recommended_probe_size;
  if (is_probing) {
    pacing_info = prober_.CurrentCluster();
    recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
  }

  DataSize data_sent = DataSize::Zero();
  // The paused state is checked in the loop since it leaves the critical
  // section allowing the paused state to be changed from other code.
  while (!paused_) {
    auto* packet = GetPendingPacket(pacing_info);
    if (packet == nullptr) {
      // No packet available to send, check if we should send padding.
      DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
      if (padding_to_add > DataSize::Zero()) {
        std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
            packet_sender_->GeneratePadding(padding_to_add);
        if (padding_packets.empty()) {
          // No padding packets were generated, quite send loop.
          break;
        }
        for (auto& packet : padding_packets) {
          EnqueuePacket(std::move(packet));
        }
        // Continue loop to send the padding that was just added.
        continue;
      }

      // Can't fetch new packet and no padding to send, exit send loop.
      break;
    }

    std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
    RTC_DCHECK(rtp_packet);
    packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);

    data_sent += packet->size();
    // Send succeeded, remove it from the queue.
    OnPacketSent(packet);
    if (recommended_probe_size && data_sent > *recommended_probe_size)
      break;
  }

  if (is_probing) {
    probing_send_failure_ = data_sent == DataSize::Zero();
    if (!probing_send_failure_) {
      prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
    }
  }
}
  • 獲取流逝的時間病更新上一次處理的時間為當前時間,流逝時間不得大于2s,如果大于2s則elapsed_time為2s
  • 在drain_large_queues_支持的情況下(一次處理可以發(fā)送多個數(shù)據(jù)包?),根據(jù)時間差來計算本次發(fā)送的最小碼率,如果當前的碼率比實際發(fā)送的最小碼率要小則通過media_budget_.set_target_rate_kbps(target_rate.kbps())設(shè)置碼率
  • 如果正在進行碼率探測,則獲取本次碼率探測得出的本次推薦發(fā)送的數(shù)(推薦發(fā)送多少數(shù)據(jù))
  • 進入while循環(huán)通過GetPendingPacket()從RoundRobinPacketQueue中獲取QueuedPacket包,然后通過packet->ReleasePacket()得到RtpPacketToSend,最后通過packet_sender_->SendRtpPacket進行發(fā)送
  • GetPendingPacket如果在網(wǎng)絡(luò)擁塞并且碼率探測其未進入探測的的情況下會返回空,并且會將從RoundRobinPacketQueue彈出的QueuedPacket重新插入到隊列當中,同時跳出循環(huán)
  • 如果RoundRobinPacketQueue為空GetPendingPacket獲取不到數(shù)據(jù)while循環(huán)會跳出
  • 如果成功發(fā)送后recommended_probe_size的值大于0并且實際發(fā)送值已經(jīng)大于或等于recommended_probe_size也會跳出循環(huán)結(jié)束本次process
RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
    const PacedPacketInfo& pacing_info) {
  if (packet_queue_.Empty()) {
    return nullptr;
  }

  // Since we need to release the lock in order to send, we first pop the
  // element from the priority queue but keep it in storage, so that we can
  // reinsert it if send fails.
  RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
  bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
  bool apply_pacing = !audio_packet || pace_audio_;
  if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
                                       pacing_info.probe_cluster_id ==
                                           PacedPacketInfo::kNotAProbe))) {
    packet_queue_.CancelPop();
    return nullptr;
  }
  return packet;
}
  • packet_queue_.BeginPop()彈出QueuedPacket
  • packet_queue_.CancelPop()重新將QueuedPacket加入到隊列
  • BeginPop的原理是首先通過GetHighestPriorityStream遍歷stream_priorities_獲取優(yōu)先發(fā)送的流對應(yīng)的ssrc
  • 其次通過對應(yīng)的ssrc查找streams_集合得到Stream,然后通過Stream得到依次要發(fā)送的QueuedPacket
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載爵卒,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者享扔。
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市瑞眼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嗦玖,老刑警劉巖浩销,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異芹敌,居然都是意外死亡,警方通過查閱死者的電腦和手機垮抗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進店門氏捞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人冒版,你說我怎么就攤上這事液茎。” “怎么了辞嗡?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵捆等,是天一觀的道長。 經(jīng)常有香客問我续室,道長栋烤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任挺狰,我火速辦了婚禮明郭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘丰泊。我一直安慰自己薯定,他們只是感情好,可當我...
    茶點故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布趁耗。 她就那樣靜靜地躺著沉唠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪苛败。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天径簿,我揣著相機與錄音罢屈,去河邊找鬼。 笑死篇亭,一個胖子當著我的面吹牛缠捌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼曼月,長吁一口氣:“原來是場噩夢啊……” “哼谊却!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起哑芹,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤炎辨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后聪姿,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體碴萧,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年末购,在試婚紗的時候發(fā)現(xiàn)自己被綠了破喻。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡盟榴,死狀恐怖曹质,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情擎场,我是刑警寧澤咆繁,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站顶籽,受9級特大地震影響玩般,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜礼饱,卻給世界環(huán)境...
    茶點故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一坏为、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧镊绪,春花似錦匀伏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至榄鉴,卻和暖如春履磨,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背庆尘。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工剃诅, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人驶忌。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓矛辕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子聊品,可洞房花燭夜當晚...
    茶點故事閱讀 45,055評論 2 355