前言
- 在WebRTC GCC帶寬探測(cè)原理(一)一文中主要介紹了
ProbeController
模塊哥谷,該模塊主要是根據(jù)不同的場(chǎng)景,判斷當(dāng)前是否需要進(jìn)行帶寬探測(cè)夫晌,并生成ProbeClusterConfig
源信號(hào)。 - 而最終
ProbeClusterConfig
信息回經(jīng)Pacing
模塊經(jīng)由BitrateProber
模塊處理曙博,將當(dāng)前需要發(fā)送的包標(biāo)識(shí)成探測(cè)包,并進(jìn)行發(fā)送怜瞒。 - 在
Pacing
模塊中存在一個(gè)BitrateProber prober_
的成員變量父泳,專門用來(lái)處理帶寬探測(cè)。 - 本文的重點(diǎn)就是分析
BitrateProber
模塊吴汪。
認(rèn)識(shí)ProbeCluster結(jié)構(gòu)
struct PacedPacketInfo {
PacedPacketInfo();
PacedPacketInfo(int probe_cluster_id,
int probe_cluster_min_probes,
int probe_cluster_min_bytes);
bool operator==(const PacedPacketInfo& rhs) const;
// TODO(srte): Move probing info to a separate, optional struct.
static constexpr int kNotAProbe = -1;
// 當(dāng)前探測(cè)簇的目標(biāo)探測(cè)碼率
int send_bitrate_bps = -1;
// 當(dāng)前探測(cè)簇的探測(cè)id
int probe_cluster_id = kNotAProbe;
// 當(dāng)前探測(cè)簇最小探測(cè)包個(gè)數(shù),默認(rèn)最小5個(gè)
int probe_cluster_min_probes = -1;
// 當(dāng)前探測(cè)簇最小探測(cè)的字節(jié)數(shù) = 目標(biāo)探測(cè)碼率 * 目標(biāo)探測(cè)時(shí)長(zhǎng)(默認(rèn)15ms)
int probe_cluster_min_bytes = -1;
// 當(dāng)前探測(cè)簇實(shí)際發(fā)送的字節(jié)數(shù)
int probe_cluster_bytes_sent = 0;
};
struct ProbeCluster {
PacedPacketInfo pace_info;
// 當(dāng)前探測(cè)簇實(shí)際發(fā)送的包個(gè)數(shù)
int sent_probes = 0;
// 當(dāng)前探測(cè)簇實(shí)際發(fā)送的字節(jié)數(shù)
int sent_bytes = 0;
//ProbeController模塊生成ProbeClusterConfig的時(shí)間
Timestamp requested_at = Timestamp::MinusInfinity();
//當(dāng)前探測(cè)簇發(fā)送的時(shí)間
Timestamp started_at = Timestamp::MinusInfinity();
int retries = 0;
};
ProbeCluster創(chuàng)建
BitrateProber
模塊會(huì)根據(jù)ProbeController
模塊生成的ProbeClusterConfig
結(jié)構(gòu)生成對(duì)應(yīng)的ProbeCluster
結(jié)構(gòu)信息惠窄。-
其實(shí)現(xiàn)流程如下:
001.png 代碼實(shí)現(xiàn)如下:
void PacingController::CreateProbeClusters(
rtc::ArrayView<const ProbeClusterConfig> probe_cluster_configs) {
for (const ProbeClusterConfig probe_cluster_config : probe_cluster_configs) {
prober_.CreateProbeCluster(probe_cluster_config);
}
}
- 循環(huán)遍歷
ProbeController
模塊生成的源信息,調(diào)用BitrateProber::CreateProbeCluster
創(chuàng)建ProbeCluster
結(jié)構(gòu)信息漾橙。
void BitrateProber::CreateProbeCluster(
const ProbeClusterConfig& cluster_config) {
RTC_DCHECK(probing_state_ != ProbingState::kDisabled);
// 當(dāng)新生成一個(gè)探測(cè)簇的時(shí)候杆融,遍歷clusters_隊(duì)列,如果對(duì)頭對(duì)應(yīng)的探測(cè)簇的生成時(shí)間距離現(xiàn)在已經(jīng)超過5秒則認(rèn)為已經(jīng)超時(shí)霜运,則進(jìn)行移除
// 同時(shí)控制探測(cè)簇隊(duì)列的大小在5個(gè)以內(nèi)
while (!clusters_.empty() &&
(cluster_config.at_time - clusters_.front().requested_at >
kProbeClusterTimeout ||
clusters_.size() > kMaxPendingProbeClusters)) {
clusters_.pop();
}
// 構(gòu)造ProbeCluster
ProbeCluster cluster;
// 設(shè)置當(dāng)前探測(cè)簇生成時(shí)間
cluster.requested_at = cluster_config.at_time;
// 設(shè)置當(dāng)前探測(cè)簇最小探測(cè)包數(shù)量(默認(rèn)5個(gè))
cluster.pace_info.probe_cluster_min_probes =
cluster_config.target_probe_count;
// 設(shè)置當(dāng)前探測(cè)簇最小探測(cè)需要發(fā)送的字節(jié)數(shù) = 目標(biāo)探測(cè)碼率 * 目標(biāo)探測(cè)時(shí)長(zhǎng)(默認(rèn)15ms)
cluster.pace_info.probe_cluster_min_bytes =
(cluster_config.target_data_rate * cluster_config.target_duration)
.bytes();
RTC_DCHECK_GE(cluster.pace_info.probe_cluster_min_bytes, 0);
// 設(shè)置當(dāng)前探測(cè)簇目標(biāo)探測(cè)碼率
cluster.pace_info.send_bitrate_bps = cluster_config.target_data_rate.bps();
// 設(shè)置當(dāng)前探測(cè)簇ID
cluster.pace_info.probe_cluster_id = cluster_config.id;
// 插入到隊(duì)列
clusters_.push(cluster);
// 這個(gè)地方看上去只有隊(duì)列為空的時(shí)候才會(huì)存在,但到該步驟隊(duì)列不可能為空了脾歇,所以默認(rèn)這個(gè)條件不可能成立?
// 這里是如果是當(dāng)前模塊已經(jīng)是可以設(shè)置成kActive狀態(tài)了觉渴,則設(shè)置probing_state_為ProbingState::kActive
// 初始狀態(tài)為kInactive,在該模塊的構(gòu)造函數(shù)進(jìn)行了設(shè)置
if (ReadyToSetActiveState(/*packet_size=*/DataSize::Zero())) {
next_probe_time_ = Timestamp::MinusInfinity();
probing_state_ = ProbingState::kActive;
}
.....
}
-
ProbeController
每次可能會(huì)生成多個(gè)探測(cè)源數(shù)據(jù)介劫,其中每個(gè)源數(shù)據(jù)ProbeClusterConfig
對(duì)應(yīng)一個(gè)探測(cè)簇(ProbeCluster
),而每個(gè)探測(cè)簇在pacing
發(fā)送的時(shí)候會(huì)發(fā)送一簇(組)包案淋,按照最小探測(cè)時(shí)長(zhǎng)座韵、最小探測(cè)包數(shù)量、最小探測(cè)字節(jié)數(shù)進(jìn)行探測(cè)包發(fā)送踢京。
BitrateProber狀態(tài)設(shè)置
- 只有當(dāng)
BitrateProber
模塊的狀態(tài)為ProbingState::kActive
的時(shí)候才能發(fā)送探測(cè)包誉碴。 - 該模塊的狀態(tài)更新如下:
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
....
prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
....
}
- 當(dāng)各發(fā)送模塊向
Pacing
模塊提交數(shù)據(jù)的時(shí)候,會(huì)調(diào)用BitrateProber::OnIncomingPacket
來(lái)更新BitrateProber
的狀態(tài)瓣距。
void BitrateProber::OnIncomingPacket(DataSize packet_size) {
if (ReadyToSetActiveState(packet_size)) {
next_probe_time_ = Timestamp::MinusInfinity();
probing_state_ = ProbingState::kActive;
}
}
- 入?yún)楫?dāng)前
Rtp
包的大小黔帕,然后通過函數(shù)ReadyToSetActiveState()
來(lái)決策是否將當(dāng)前模塊的狀態(tài)設(shè)置為ProbingState::kActive
。
bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const {
// 如果隊(duì)列為空直接返回false
if (clusters_.empty()) {
RTC_DCHECK(probing_state_ == ProbingState::kDisabled ||
probing_state_ == ProbingState::kInactive);
return false;
}
// 如果當(dāng)前狀態(tài)為ProbingState::kDisabled或者ProbingState::kActive返回false
switch (probing_state_) {
case ProbingState::kDisabled:
case ProbingState::kActive:
return false;
// 只有在ProbingState::kInactive的情況下才會(huì)進(jìn)行判斷蹈丸,默認(rèn)狀態(tài)就是這個(gè)
case ProbingState::kInactive:
// If config_.min_packet_size > 0, a "large enough" packet must be sent
// first, before a probe can be generated and sent. Otherwise, send the
// probe asap.
// config_.min_packet_size的默認(rèn)值為200Byte
return packet_size >=
std::min(RecommendedMinProbeSize(), config_.min_packet_size.Get());
}
}
- 而
RecommendedMinProbeSize()
最小推進(jìn)的探測(cè)字節(jié)數(shù)實(shí)現(xiàn)如下:
DataSize BitrateProber::RecommendedMinProbeSize() const {
if (clusters_.empty()) {
return DataSize::Zero();
}
// 計(jì)算最小探測(cè)字節(jié)數(shù)=探測(cè)目標(biāo)碼率 * 最小探測(cè)間隔2ms所對(duì)應(yīng)的字節(jié)數(shù)
// 假設(shè)探測(cè)碼率為20Mbps=20000000bps,則推進(jìn)的最小探測(cè)字節(jié)數(shù)量為10000Byte(字節(jié))
DataRate send_rate =
DataRate::BitsPerSec(clusters_.front().pace_info.send_bitrate_bps);
return send_rate * config_.min_probe_delta/*最小探測(cè)間隔2ms*/;
}
- 綜上所述意思就是成黄,通過傳入實(shí)際
Rtp
包的數(shù)據(jù)大小呐芥,在結(jié)合當(dāng)前已有的探測(cè)簇隊(duì)列的首個(gè)探測(cè)簇新對(duì)應(yīng)的探測(cè)碼率和最小探測(cè)間隔算出最小的探測(cè)字節(jié)數(shù)。 - 如果實(shí)際
Rtp
包的數(shù)據(jù)大小大于最小探測(cè)間隔內(nèi)的探測(cè)字節(jié)數(shù)(意思就是這個(gè)rtp
包是否能扮演探測(cè)包需要滿足一定的大小?),則可將該模塊設(shè)置成kActive
狀態(tài)奋岁。 - 超過
200
字節(jié)的包看上去都能滿足思瘟。
Pacing模塊發(fā)送探測(cè)包
void PacingController::ProcessPackets() {
....
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
if (is_probing) {
// Probe timing is sensitive, and handled explicitly by BitrateProber, so
// use actual send time rather than target.
// 1) 獲取目標(biāo)探測(cè)信息
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
// 2) 根據(jù)探測(cè)簇信息得到推介探測(cè)字節(jié)數(shù)大小
recommended_probe_size = prober_.RecommendedMinProbeSize();
RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
}
}
DataSize data_sent = DataSize::Zero();
int iteration = 0;
int packets_sent = 0;
int padding_packets_generated = 0;
for (; iteration < circuit_breaker_threshold_; ++iteration) {
....
if (is_probing) {
pacing_info.probe_cluster_bytes_sent += packet_size.bytes();
// If we are currently probing, we need to stop the send loop when we
// have reached the send target.
// 3) 每個(gè)包發(fā)送后會(huì)進(jìn)行data_sent累加,當(dāng)總發(fā)的字節(jié)數(shù)大于等于本次推介的探測(cè)字節(jié)數(shù)的時(shí)候,則結(jié)束發(fā)送
if (data_sent >= recommended_probe_size) {
break;
}
}
.....
}
}
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
//4) 告訴BitrateProber模塊本次探測(cè)發(fā)送的字節(jié)數(shù)
prober_.ProbeSent(CurrentTime(), data_sent);
}
}
.....
}
- 探測(cè)包發(fā)送的邏輯相對(duì)比較簡(jiǎn)單闻伶。
- 1)首先通過
BitrateProber::CurrentCluster(now)
得到探測(cè)簇信息滨攻,就是當(dāng)前需要探測(cè)的目標(biāo)信息,并 - 2)如果步驟1能成功得到探測(cè)目標(biāo)信息,則通過
BitrateProber::RecommendedMinProbeSize()
函數(shù)獲取目標(biāo)探測(cè)的推介發(fā)送的字節(jié)數(shù)量大小蓝翰,記作為recommended_probe_size
光绕,該計(jì)算在上節(jié)中已經(jīng)進(jìn)行了分析說明。 - 3)開始進(jìn)行發(fā)送報(bào)文畜份,并將報(bào)文標(biāo)識(shí)成探測(cè)包诞帐,每次發(fā)送成功后會(huì)對(duì)
data_sent
累加操作,當(dāng)data_sent
大于等于recommended_probe_size
的時(shí)候結(jié)束發(fā)送漂坏。 - 4)調(diào)用
BitrateProber::ProbeSent()
告訴BitrateProber
模塊本次實(shí)際發(fā)送的數(shù)據(jù)量景埃。
CurrentCluster獲取探測(cè)信息
absl::optional<PacedPacketInfo> BitrateProber::CurrentCluster(Timestamp now) {
// 為空或者狀態(tài)不對(duì)直接返回nullopt
if (clusters_.empty() || probing_state_ != ProbingState::kActive) {
return absl::nullopt;
}
// 如果next_probe_time_為有限值,這里對(duì)應(yīng)的是上次計(jì)算出來(lái)的本次應(yīng)要探測(cè)的時(shí)間戳
// 并且當(dāng)前時(shí)間(pacing模塊即將發(fā)包的時(shí)間戳) - 上次計(jì)算出來(lái)的本次應(yīng)要探測(cè)的時(shí)間戳 > 10ms
// 則說明探測(cè)超時(shí)了,此時(shí)需要清除隊(duì)列頭部,并設(shè)置狀態(tài)為kInactive
if (next_probe_time_.IsFinite() &&
now - next_probe_time_ > config_.max_probe_delay.Get()/*默認(rèn)10Ms*/) {
RTC_DLOG(LS_WARNING) << "Probe delay too high"
" (next_ms:"
<< next_probe_time_.ms() << ", now_ms: " << now.ms()
<< "), discarding probe cluster.";
clusters_.pop();
if (clusters_.empty()) {
probing_state_ = ProbingState::kInactive;
return absl::nullopt;
}
}
// 得到首個(gè)探測(cè)簇目標(biāo)信息顶别。
PacedPacketInfo info = clusters_.front().pace_info;
info.probe_cluster_bytes_sent = clusters_.front().sent_bytes;
return info;
}
- 那么
next_probe_time_
的計(jì)算機(jī)制又是什么谷徙?該值直接決定了首個(gè)探測(cè)信號(hào)需要在什么時(shí)候發(fā)送探測(cè)包,并且逾期多長(zhǎng)時(shí)間后將為認(rèn)為是超時(shí)驯绎。
ProbeSent更新探測(cè)信息
void BitrateProber::ProbeSent(Timestamp now, DataSize size) {
RTC_DCHECK(probing_state_ == ProbingState::kActive);
RTC_DCHECK(!size.IsZero());
if (!clusters_.empty()) {
// 得到隊(duì)首探測(cè)簇信息
ProbeCluster* cluster = &clusters_.front();
// 更新探測(cè)簇對(duì)應(yīng)的探測(cè)包發(fā)送時(shí)間完慧,這個(gè)后面需要用于計(jì)算探測(cè)碼率
if (cluster->sent_probes == 0) {
RTC_DCHECK(cluster->started_at.IsInfinite());
cluster->started_at = now;
}
// 更新本次探測(cè)總共發(fā)了多少字節(jié)
cluster->sent_bytes += size.bytes<int>();
// 更新本次探測(cè)簇信息的探測(cè)次數(shù)累加1
cluster->sent_probes += 1;
// 計(jì)算下一次發(fā)送探測(cè)包的時(shí)間
next_probe_time_ = CalculateNextProbeTime(*cluster);
// 如果滿足下面條件,則移除當(dāng)前探測(cè)簇剩失,需要結(jié)束本探測(cè)簇屈尼,比如說探測(cè)次數(shù)達(dá)到5次
// 每次會(huì)發(fā)一組探測(cè)包,該探測(cè)簇對(duì)應(yīng)的已經(jīng)發(fā)了5組包了
if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes &&
cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) {
clusters_.pop();
}
// 如果隊(duì)列為空則設(shè)置狀態(tài)為kInactive
if (clusters_.empty()) {
probing_state_ = ProbingState::kInactive;
}
}
}
- 當(dāng)
Pacing
模塊發(fā)送完一組探測(cè)包后拴孤,通過上述函數(shù)告訴BitrateProber
模塊脾歧,對(duì)BitrateProber
模塊進(jìn)行更新。 - 主要是對(duì)首個(gè)探測(cè)簇新進(jìn)行更新演熟,如本次探測(cè)發(fā)送了多少的數(shù)據(jù)量鞭执,以便用于
tcc feedback
后計(jì)算探測(cè)碼率。 - 另外乳溝首個(gè)探測(cè)簇已經(jīng)不滿足再發(fā)送探測(cè)包的條件了芒粹,則進(jìn)行移除兄纺。
- 同時(shí)這里有個(gè)十分重要的操作就是調(diào)用
CalculateNextProbeTime()
函數(shù)計(jì)算下一次探測(cè)包發(fā)送的時(shí)間戳,Pacing
模塊每發(fā)送一組數(shù)據(jù)后,需要計(jì)算一次發(fā)包的時(shí)間化漆,這里也一樣估脆,當(dāng)BitrateProber
有探測(cè)任務(wù)的情況下,優(yōu)先會(huì)獲取下一組探測(cè)包發(fā)送的時(shí)間來(lái)作為下一次Pacing
模塊下一次發(fā)送的時(shí)間節(jié)點(diǎn)座云。
CalculateNextProbeTime計(jì)算下一次探測(cè)包發(fā)送時(shí)間
Timestamp BitrateProber::CalculateNextProbeTime(
const ProbeCluster& cluster) const {
RTC_CHECK_GT(cluster.pace_info.send_bitrate_bps, 0);
RTC_CHECK(cluster.started_at.IsFinite());
// Compute the time delta from the cluster start to ensure probe bitrate stays
// close to the target bitrate. Result is in milliseconds.
// 當(dāng)前探測(cè)簇已經(jīng)發(fā)送的字節(jié)數(shù)
DataSize sent_bytes = DataSize::Bytes(cluster.sent_bytes);
// 當(dāng)前探測(cè)簇的探測(cè)目標(biāo)碼率
DataRate send_bitrate =
DataRate::BitsPerSec(cluster.pace_info.send_bitrate_bps);
// 針對(duì)當(dāng)前探測(cè)簇算出一個(gè)假設(shè)是探測(cè)目標(biāo)碼率作為發(fā)送碼率的情況下需要發(fā)送多久
TimeDelta delta = sent_bytes / send_bitrate;
// 計(jì)算下一次的發(fā)送時(shí)間=總共發(fā)送多少數(shù)據(jù)/發(fā)送速率 + 該探測(cè)簇首次發(fā)送探測(cè)包的時(shí)間
return cluster.started_at + delta;
}
總結(jié)
-
BitrateProber
模塊主要是借助Pacing
模塊對(duì)每一個(gè)探測(cè)簇完成探測(cè)包的發(fā)送疙赠。 - 每一個(gè)探測(cè)簇每次都會(huì)發(fā)送一組探測(cè)包付材,可以發(fā)送多次,當(dāng)然最多是發(fā)送
5
次棺聊,并且每次之間的數(shù)據(jù)發(fā)送間隔不能超過10ms
,如果超過10
ms會(huì)被認(rèn)為超時(shí)伞租。 - 其他該模塊的實(shí)現(xiàn)從代碼的角度來(lái)看,相對(duì)不難限佩。
- 接下來(lái)需要分析的是基于
twcc feedback
對(duì)探測(cè)碼率的計(jì)算。