PacedSender 的族普關(guān)系
paced_class_uml_001.png
PacedSender繼承Module類,實現(xiàn)其Process和TimeUntilNextProcess方法,其中TimeUntilNextProcess的實現(xiàn)便是相隔多少時間Process函數(shù)會被paced_thread回調(diào)一次
-
PacedSender類依賴PacingController類事實上,PacedSender把大部分工作都交給了PacingController
和PacketRouter
PacedSender 入隊操作
#modules/pacing/paced_sender.cc
void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
rtc::CritScope cs(&critsect_);
pacing_controller_.EnqueuePacket(std::move(packet));
}
- 通過RTPSenderVideo::SendVideoPacket將rtp包通過回調(diào)EnqueuePacket將rtp包存入PacedSender所管理的隊列當中
- PacedSender::EnqueuePacket把工作交給PacingController
#modules/pacing/pacing_controller.cc
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
Timestamp now = CurrentTime();
prober_.OnIncomingPacket(packet->payload_size());
if (packet->capture_time_ms() < 0) {
packet->set_capture_time_ms(now.ms());
}
RTC_CHECK(packet->packet_type());
int priority = GetPriorityForType(*packet->packet_type());
packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
- 如果capture_time_ms小于0,在這里為期初始化時間
- 獲取優(yōu)先級,kAudio(0),kRetransmission(1),kVideo(2),kPadding(3)
- 最終根據(jù)優(yōu)先級將packet放入packet_queue_隊列
RoundRobinPacketQueue原理分析
RoundRobinPacketQueue_01.png
- RoundRobinPacketQueue隊列的核心實現(xiàn)是內(nèi)部管理4個數(shù)據(jù)結(jié)構(gòu)
- streams_容器用來管理以ssrc為key,以Stream對象為value的容器,依次可以看出,對于不同ssrc的流都會被該容器所管理
- rtp_packets_列表用來托管真正的rtp流對應(yīng)std::unique_ptr<RtpPacketToSend>,所有的的發(fā)送真實的rtp流都會存到這里,后續(xù)發(fā)送到網(wǎng)絡(luò)通過從該列表中獲得發(fā)送
- enqueue_times_集合用來記錄每次rtp流入隊列的時間
- 在每個數(shù)據(jù)包插入到隊列的時候會創(chuàng)建一個QueuedPacket,同時會根據(jù)QueuedPacket的優(yōu)先級創(chuàng)建一個StreamPrioKey對象,并且會以此對象為key,該包的ssrc值為value,將其插入到stream_priorities_集合
#modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(int priority,
Timestamp enqueue_time,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet) {
uint32_t ssrc = packet->Ssrc();
uint16_t sequence_number = packet->SequenceNumber();
int64_t capture_time_ms = packet->capture_time_ms();
DataSize size =
DataSize::bytes(send_side_bwe_with_overhead_
? packet->size()
: packet->payload_size() + packet->padding_size());
auto type = packet->packet_type();
RTC_DCHECK(type.has_value());
rtp_packets_.push_front(std::move(packet));
Push(QueuedPacket(
priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
}
- 首先得到ssrc,sequence_number,capture_time_ms,size(rtp包的大小)
- 將RtpPacketToSend包通過rtp_packets_.push_front存入rtp_packets_列表
- 以各參數(shù)創(chuàng)建QueuedPacket,由此可見每個RtpPacketToSend對應(yīng)一個QueuedPacket,但是它并不正在存放RtpPacketToSend數(shù)據(jù),只是記錄了其szie,ssrc,sequence_number,以及rtp_packets_.begin()迭代器頭,因為每次將RtpPacketToSend插入到rtp_packets_列表都是從頭部插入,這里相當于得到其索引,便于后續(xù)發(fā)送到網(wǎng)絡(luò)使用
QueuedPacket數(shù)據(jù)結(jié)構(gòu)的實現(xiàn)
RoundRobinPacketQueue_QueuedPacket.png
- QueuedPacket重要的成員變量就是packet_it_,它就是真實rtp包的索引所在
- QueuedPacket提供了如下函數(shù)用于獲取當前QueuedPacket對應(yīng)的RtpPacketToSend包
#modules/pacing/round_robin_packet_queue.cc
std::unique_ptr<RtpPacketToSend>
RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
return packet_it_ ? std::move(**packet_it_) : nullptr;
}
- 與上面分析對應(yīng)通過std::move(**packet_it_)返回
Stream數(shù)據(jù)結(jié)構(gòu)的實現(xiàn)
RoundRobinPacketQueue_Stream.png
- 以下結(jié)合代碼來分析該數(shù)據(jù)結(jié)構(gòu)
##modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
auto stream_info_it = streams_.find(packet.ssrc());
if (stream_info_it == streams_.end()) {
stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
stream_info_it->second.priority_it = stream_priorities_.end();
stream_info_it->second.ssrc = packet.ssrc();
}
Stream* stream = &stream_info_it->second;
if (stream->priority_it == stream_priorities_.end()) {
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
} else if (packet.priority() < stream->priority_it->first.priority) {
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
// and insert a new one with the new priority. Note that |priority_| uses
// lower ordinal for higher priority.
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
}
RTC_CHECK(stream->priority_it != stream_priorities_.end());
// In order to figure out how much time a packet has spent in the queue while
// not in a paused state, we subtract the total amount of time the queue has
// been paused so far, and when the packet is popped we subtract the total
// amount of time the queue has been paused at that moment. This way we
// subtract the total amount of time the packet has spent in the queue while
// in a paused state.
UpdateQueueTime(packet.enqueue_time());
packet.SubtractPauseTime(pause_time_sum_);
size_packets_ += 1;
size_ += packet.size();
stream->packet_queue.push(packet);
}
- 根據(jù)QueuedPacket的ssrc來查詢streams_集合中是否有該ssrc對應(yīng)的Stream對象,如果沒有則根據(jù)該ssrc實例化一個Stream對象并以ssrc為key將其插入到streams_集合
- 在插入后Stream的成員變量priority_it是指向stream_priorities_.end的
- 下面的處理如果Stream的成員變量priority_it是指向stream_priorities_.end則為當前的QueuedPacket包通過stream_priorities__.emplace 以StreamPrioKey對象為key,以ssrc為value插入到stream_priorities_集合當中并放回當前迭代器賦值給Stream的成員變量priority_it
- 假設(shè)同一路stream也就是同一個ssrc,在插入的時候,本次的priority小于上一次的priority(越小優(yōu)先級越高?),那么首先需要將原來stream_priorities_管理的擦除,然后在重新創(chuàng)建StreamPrioKey插入到stream_priorities_
- 最后通過stream->packet_queue.push(packet)將QueuedPacket插入到Stream管理的packet_queue集合當中
- 經(jīng)過以上分析大致可得出如下關(guān)系
RoundRobinPacketQueue_Stream_2.png
- 每一個RtpPacketToSend包對應(yīng)一個QueuedPacket對象
- 每一路ssrc對應(yīng)的stream對應(yīng)一個Stream,而每一個Stream對象管理著入隊的多個QueuedPacket
PacedSender 出隊操作
- PacedSender 出隊操作是一個十分復雜的過程,涉及到動態(tài)碼率估計,webrtc經(jīng)過bwe發(fā)送端碼率估計評測出新碼率后會將碼率作用到paced模塊,讓PacedSender按照新的碼率進行數(shù)據(jù)發(fā)送,本文為便于分析不考慮碼率估計進行分析假設(shè)碼率已知
- PacedSender 出隊操作要從PacedSender派生Module模塊談起,經(jīng)paced_thread_處理,檢測PacedSender重載的TimeUntilNextProcess函數(shù)判斷下一次回調(diào)PacedSender::Process函數(shù)
- webrtc初始化創(chuàng)建PacedSender過程會通過SetPacingRates設(shè)置初始化碼率
#modules/pacing/paced_sender.cc
int64_t PacedSender::TimeUntilNextProcess() {
rtc::CritScope cs(&critsect_);
// When paused we wake up every 500 ms to send a padding packet to ensure
// we won't get stuck in the paused state due to no feedback being received.
TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();
if (pacing_controller_.IsPaused()) {
return std::max(PacingController::kPausedProcessInterval - elapsed_time,
TimeDelta::Zero())
.ms();
}
auto next_probe = pacing_controller_.TimeUntilNextProbe();
if (next_probe) {
return next_probe->ms();
}
const TimeDelta min_packet_limit = TimeDelta::ms(5);
return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();
}
- 首先通過pacing_controller_.TimeElapsedSinceLastProcess()得到已經(jīng)流逝的時間,也就是當前時間和上一次處理時間相減
- 假設(shè)next_probe為-1或nullptr也就是不做碼率探測
- 默認最小發(fā)包間隔是5ms,這里將min_packet_limit - elapsed_time和0取最大值,超過5ms則立即執(zhí)行
void PacedSender::Process() {
rtc::CritScope cs(&critsect_);
pacing_controller_.ProcessPackets();
}
- PacedSender將真正的處理交給PacingController
void PacingController::ProcessPackets() {
Timestamp now = CurrentTime();
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
....
if (paused_)
return;
if (elapsed_time > TimeDelta::Zero()) {
DataRate target_rate = pacing_bitrate_;
DataSize queue_size_data = packet_queue_.Size();
if (queue_size_data > DataSize::Zero()) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packet_queue_.UpdateQueueTime(CurrentTime());
if (drain_large_queues_) {
TimeDelta avg_time_left =
std::max(TimeDelta::ms(1),
queue_time_limit - packet_queue_.AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > target_rate) {
target_rate = min_rate_needed;
RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
<< target_rate.kbps();
}
}
}
media_budget_.set_target_rate_kbps(target_rate.kbps());
UpdateBudgetWithElapsedTime(elapsed_time);
}
bool is_probing = prober_.IsProbing();
PacedPacketInfo pacing_info;
absl::optional<DataSize> recommended_probe_size;
if (is_probing) {
pacing_info = prober_.CurrentCluster();
recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
}
DataSize data_sent = DataSize::Zero();
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!paused_) {
auto* packet = GetPendingPacket(pacing_info);
if (packet == nullptr) {
// No packet available to send, check if we should send padding.
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
}
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
}
std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
RTC_DCHECK(rtp_packet);
packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);
data_sent += packet->size();
// Send succeeded, remove it from the queue.
OnPacketSent(packet);
if (recommended_probe_size && data_sent > *recommended_probe_size)
break;
}
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
}
}
}
- 獲取流逝的時間病更新上一次處理的時間為當前時間,流逝時間不得大于2s,如果大于2s則elapsed_time為2s
- 在drain_large_queues_支持的情況下(一次處理可以發(fā)送多個數(shù)據(jù)包?),根據(jù)時間差來計算本次發(fā)送的最小碼率,如果當前的碼率比實際發(fā)送的最小碼率要小則通過media_budget_.set_target_rate_kbps(target_rate.kbps())設(shè)置碼率
- 如果正在進行碼率探測,則獲取本次碼率探測得出的本次推薦發(fā)送的數(shù)(推薦發(fā)送多少數(shù)據(jù))
- 進入while循環(huán)通過GetPendingPacket()從RoundRobinPacketQueue中獲取QueuedPacket包,然后通過packet->ReleasePacket()得到RtpPacketToSend,最后通過packet_sender_->SendRtpPacket進行發(fā)送
- GetPendingPacket如果在網(wǎng)絡(luò)擁塞并且碼率探測其未進入探測的的情況下會返回空,并且會將從RoundRobinPacketQueue彈出的QueuedPacket重新插入到隊列當中,同時跳出循環(huán)
- 如果RoundRobinPacketQueue為空GetPendingPacket獲取不到數(shù)據(jù)while循環(huán)會跳出
- 如果成功發(fā)送后recommended_probe_size的值大于0并且實際發(fā)送值已經(jīng)大于或等于recommended_probe_size也會跳出循環(huán)結(jié)束本次process
RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
const PacedPacketInfo& pacing_info) {
if (packet_queue_.Empty()) {
return nullptr;
}
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
bool apply_pacing = !audio_packet || pace_audio_;
if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
pacing_info.probe_cluster_id ==
PacedPacketInfo::kNotAProbe))) {
packet_queue_.CancelPop();
return nullptr;
}
return packet;
}
- packet_queue_.BeginPop()彈出QueuedPacket
- packet_queue_.CancelPop()重新將QueuedPacket加入到隊列
- BeginPop的原理是首先通過GetHighestPriorityStream遍歷stream_priorities_獲取優(yōu)先發(fā)送的流對應(yīng)的ssrc
- 其次通過對應(yīng)的ssrc查找streams_集合得到Stream,然后通過Stream得到依次要發(fā)送的QueuedPacket