摘抄一段 PacedSender
簡介伟墙,下面的鏈接對該模塊的工作原理做了詳細的介紹福荸,今天大致看了下這個模塊的代碼,記錄一下
在estimator根據(jù)網(wǎng)絡狀態(tài)決策出新的通信碼率(target bitrate)嫉戚,它會將這個碼率設置到pacer當中唬滑,要求pacer按照新的碼率來計算發(fā)包頻率。因為在視頻通信中域携,單幀視頻可能有上百KB,如果是當視頻幀被編碼器編碼出來后簇秒,就立即進行RTP打包發(fā)送,瞬時會發(fā)送大量的數(shù)據(jù)到網(wǎng)絡上秀鞭,可能會引起網(wǎng)絡衰減和通信惡化趋观。WebRTC引入pacer,pacer會根據(jù)estimator評估出來的碼率锋边,按照最小單位時間(5ms)做時間分片進行遞進發(fā)送數(shù)據(jù)拆内,避免瞬時對網(wǎng)絡的沖擊。pacer的目的就是讓視頻數(shù)據(jù)按照評估碼率均勻的分布在各個時間片里發(fā)送宠默, 所以在弱網(wǎng)的WiFi環(huán)境麸恍,pacer是個非常重要的關鍵步驟
PacedSender 發(fā)送速率設置
GCC 估測的帶寬只會通過 SetEstimatedBitrate
方法設置到 PacedSender
中, pacing_bitrate_kbps_
為 PacedSender
發(fā)送媒體包的速率搀矫,為GCC估測帶寬 乘以了固定系數(shù) kDefaultPaceMultiplier(2.5)
void PacedSender::SetEstimatedBitrate(uint32_t bitrate_bps) {
if (bitrate_bps == 0)
LOG(LS_ERROR) << "PacedSender is not designed to handle 0 bitrate.";
CriticalSectionScoped cs(critsect_.get());
estimated_bitrate_bps_ = bitrate_bps;
padding_budget_->set_target_rate_kbps(estimated_bitrate_bps_ / 1000 );
// 更新 pacing 發(fā)送速率抹沪,為 estimated_bitrate_bps_/1000 * 2.5;
pacing_bitrate_kbps_ =
max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) *
kDefaultPaceMultiplier;
alr_detector_->SetEstimatedBitrate(bitrate_bps);
}
PacedSender 包緩存隊列
該方法由 rtp_sender
模塊調用,將封裝好的視頻rtp
包的元信息瓤球,如 ssrc
, sequence_number
等封裝成Packet
數(shù)據(jù)結構存儲到隊列中融欧,并未緩存真正的媒體數(shù)據(jù)。發(fā)包時卦羡,PacedSender
會通過這些元信息噪馏,在rtp_sender
中的緩存隊列中找到對應的媒體包數(shù)據(jù)
// 將視頻包元信息,instert到pacer中
void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms,
size_t bytes,
bool retransmission) {
CriticalSectionScoped cs(critsect_.get());
DCHECK(estimated_bitrate_bps_ > 0)
<< "SetEstimatedBitrate must be called before InsertPacket.";
int64_t now_ms = clock_->TimeInMilliseconds();
prober_->OnIncomingPacket(bytes);
if (capture_time_ms < 0)
capture_time_ms = now_ms;
// 封裝 packet 包绿饵,放到list中 packets_
packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number,
capture_time_ms, now_ms, bytes,
retransmission, packet_counter_++));
}
發(fā)包線程
線程發(fā)包間隔 5ms 計算
PacedSender
發(fā)送線程是調用時間間隔為5ms欠肾,為什么是5ms, 通過模塊中下面的方法計算的:
// 計算線程調用的時間間隔,即下面 process() 方法調用的時間間隔 D馍蕖4烫摇!吸祟!
int64_t PacedSender::TimeUntilNextProcess() {
CriticalSectionScoped cs(critsect_.get());
// 如果正在探測
if (prober_->IsProbing()) {
int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
if (ret > 0 || (ret == 0 && !probing_send_failure_))
return ret;
}
// 當前時間 減去 上一次 process()方法調用的時間瑟慈,得出時間間隔
int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
// kMinPacketLimitMs:5ms,即 process() 方法 5m 處理一次
return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
}
kMinPacketLimitMs
的值為固定 5ms, elapsed_time_ms
從代碼中可以看出屋匕,為上一次調用 process
到現(xiàn)在的時間間隔葛碧,兩個的時間差即為線程要等待的時長,最大5ms过吻,之后線程會再次調用process
方法發(fā)送媒體數(shù)據(jù)进泼。
線程處理方法 process()
下面的 process()
方法中處理真正發(fā)包邏輯, 每次進入 process
都會調用 UpdateBudgetWithElapsedTime(elapsed_time_ms)
, 去更新(增加) media_budget_
中的 bytes_remaining_
,
bytes_remaining_
為每次可以發(fā)送包的最大字節(jié)數(shù):
發(fā)包量 = Δt x GCC反饋帶寬 x 2.5
Δt 即elapsed_time_ms
, 最大 5ms。 在下面的while
循環(huán)中缘琅,只要包隊列不為空粘都,就一直嘗試用SendPacket
去發(fā)包廓推。
int32_t PacedSender::Process() {
int64_t now_us = clock_->TimeInMicroseconds();
CriticalSectionScoped cs(critsect_.get());
// elapsed_time_ms: 上一次process和這次process 之間的時間間隔 elapsed_time_ms ms
int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
time_last_update_us_ = now_us;
int target_bitrate_kbps = pacing_bitrate_kbps_; // 拿到最新的 pacer 發(fā)送速率
if (!paused_ && elapsed_time_ms > 0) {
// 緩存的所有的包的大小刷袍,按字節(jié)算
size_t queue_size_bytes = packets_->SizeInBytes();
if (queue_size_bytes > 0) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
int64_t avg_time_left_ms = std::max<int64_t>(
1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs()); // 這個地方應該是 已經(jīng)緩存包時長 = 最大緩存時長 - 剩余隊列時長
int min_bitrate_needed_kbps =
static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
if (min_bitrate_needed_kbps > target_bitrate_kbps)
target_bitrate_kbps = min_bitrate_needed_kbps; // 如果計算出來的發(fā)送速率比上面?zhèn)魅氲?目標發(fā)送碼率大,則更新目標發(fā)送碼率
}
// media budget 更新發(fā)送速率
media_budget_->set_target_rate_kbps(target_bitrate_kbps);
// 最大時間間隔 kMaxIntervalTimeMs = 30 ms
elapsed_time_ms = min(kMaxIntervalTimeMs, elapsed_time_ms);
UpdateBudgetWithElapsedTime(elapsed_time_ms);
}
bool is_probing = prober_->IsProbing();
PacedPacketInfo pacing_info;
size_t bytes_sent = 0;
size_t recommended_probe_size = 0;
if (is_probing) {
pacing_info = prober_->CurrentCluster();
recommended_probe_size = prober_->RecommendedMinProbeSize();
}
// 隊列不為空的情況下
while (!packets_->Empty()) {
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
const paced_sender::Packet& packet = packets_->BeginPop();
// 去除 隊列中的包樊展,并開始發(fā)送
if (SendPacket(packet, pacing_info)) {
// Send succeeded, remove it from the queue.
// 發(fā)送成功呻纹,則記錄發(fā)送的字節(jié)數(shù),并從隊列中將包清除
bytes_sent += packet.bytes;
packets_->FinalizePop(packet);
// 探測期間专缠,或發(fā)送字節(jié)數(shù)超過探測的數(shù)值了雷酪,則跳出循環(huán)不再發(fā)送 這個地方和探測有關,是否有問題涝婉?哥力??
if (is_probing && bytes_sent > recommended_probe_size)
break;
} else {
// Send failed, put it back into the queue.
packets_->CancelPop(packet);
break;
}
}
if (packets_->Empty() && !paused_) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
if (packet_counter_ > 0) {
// 計算是否應該發(fā)送 padding 包
int padding_needed = static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent) : padding_budget_->bytes_remaining());
if (padding_needed > 0) {
bytes_sent += SendPadding(padding_needed, pacing_info);
}
}
}
if (is_probing) {
probing_send_failure_ = bytes_sent == 0;
if (!probing_send_failure_)
prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent);
}
alr_detector_->OnBytesSent(bytes_sent, now_us / 1000);
return 0;
}
發(fā)包方法SendPacket()
該方法由上面的while發(fā)包循環(huán)調用墩弯,發(fā)包后會調用 UpdateBudgetWithBytesSent(packet.bytes)
從 media_budget_
減去packet.bytes
長度的發(fā)包預算, 當發(fā)博包循環(huán)走幾次之后吩跋,media_budget
中的預算長度被消耗完,即 <= 0渔工, 此時 media_budget_->bytes_remaining()
方法會做 max(0, bytes_remaining_)
處理锌钮,即返回0 ,而發(fā)包前會判斷 media_budget_->bytes_remaining() == 0
,滿足條件就return false
不發(fā)了引矩。
bool PacedSender::SendPacket(const paced_sender::Packet& packet,
const PacedPacketInfo& pacing_info) {
// 是否暫停發(fā)包
if (paused_)
return false;
// media budget 剩余預算字節(jié)數(shù)為 0梁丘,停止發(fā)包
if (media_budget_->bytes_remaining() == 0 &&
pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) {
return false;
}
critsect_->Enter();
const bool success = packet_sender_->TimeToSendPacket(
packet.ssrc, packet.sequence_number, packet.capture_time_ms,
packet.retransmission, pacing_info);
critsect_->Leave();
if (success) {
// TODO(holmer): High priority packets should only be accounted for if we
// are allocating bandwidth for audio.
if (packet.priority != kHighPriority) { // 包的優(yōu)先級不為最高優(yōu)先級,更新發(fā)送的字節(jié)數(shù)
// Update media bytes sent.
UpdateBudgetWithBytesSent(packet.bytes);
}
}
return success;
}
SendPacket
方法最終會調用 rtp_sender
中的方法旺韭,將ssrc
氛谜,sequence_number
等參數(shù)傳遞過去,rtp_sender
通過這些值找到真正的視頻媒體包区端,最終發(fā)送到到網(wǎng)絡上混蔼。
media_budget簡介
media_budget
是在 PacedSender
中封裝的一個類,全部代碼如下珊燎,注釋做了解釋:
class IntervalBudget {
public:
explicit IntervalBudget(int initial_target_rate_kbps)
: target_rate_kbps_(initial_target_rate_kbps),
bytes_remaining_(0) {}
void set_target_rate_kbps(int target_rate_kbps) {
//更新發(fā)送速率
target_rate_kbps_ = target_rate_kbps;
bytes_remaining_ =
max(-kWindowMs * target_rate_kbps_ / 8, bytes_remaining_);
}
void IncreaseBudget(int64_t delta_time_ms) {
// 估計在 delta 時間惭嚣, 在帶寬為 target_rate_kbps 的情況可以發(fā)送出去多少字節(jié)
int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;
if (bytes_remaining_ < 0) {
// We overused last interval, compensate this interval.
bytes_remaining_ = bytes_remaining_ + bytes;
} else {
// If we underused last interval we can't use it this interval.
bytes_remaining_ = bytes;
}
}
//更新實際發(fā)送的字節(jié)數(shù), 從bytes_remaining_減去
void UseBudget(size_t bytes) {
bytes_remaining_ = max(bytes_remaining_ - static_cast<int>(bytes),
-kWindowMs * target_rate_kbps_ / 8);
}
// 幾次發(fā)送循環(huán)后悔政,發(fā)送的總字節(jié)數(shù)大于開始的 bytes_remaining_晚吞,bytes_remaining_ <= 0,改方法返回0
size_t bytes_remaining() const {
return static_cast<size_t>(max(0, bytes_remaining_));
}
int target_rate_kbps() const { return target_rate_kbps_; }
private:
static const int kWindowMs = 500; // window 500 ms
int target_rate_kbps_;
int bytes_remaining_;
};
PacedSender 原理總結
到這可以知道PacedSender
工作原理了, 每次發(fā)包前會更新media_budget
中預算bytes_remaining_
的大小谋国,而每次發(fā)送時間(<= 5ms)內(nèi)最多發(fā)送 bytes_remaining_
字節(jié)數(shù)槽地,從而達到限制和平滑帶寬的目的,PacedSender
中 padding
發(fā)送的原理和此類似。