SQP擁塞控制算法介紹
一種為低延遲強(qiáng)交互視頻流設(shè)計(jì)的擁塞控制算法尿赚, google paper:https://arxiv.org/pdf/2207.11857.pdf
在交互視頻應(yīng)用場景煎谍,需要傳輸高碼率的視頻率,并且保持極低的端到端時(shí)延鹃锈,比如AR流和云游戲,SQP就是為這種場景而設(shè)計(jì)的擁塞控制算法。SQP采用基于幀的整形數(shù)據(jù)包來采樣網(wǎng)絡(luò)帶寬,并用自適應(yīng)的單向采樣時(shí)延測量方法從網(wǎng)絡(luò)隊(duì)列中恢復(fù),從而維持極低的隊(duì)列時(shí)延蔬将。SQP快速適應(yīng)網(wǎng)絡(luò)帶寬變化,確保高帶寬利用率和低幀時(shí)延央星,并且當(dāng)存在競爭流時(shí)也能在保證可接受的時(shí)延內(nèi)擁有合理的帶寬份額霞怀。SQP有不錯(cuò)的公平性,網(wǎng)絡(luò)上有影子緩存時(shí)也工作得挺好莉给。
低延時(shí)交互流媒體應(yīng)用生成固定幀率的裸幀毙石。視頻碼率由ABR(Adaptive bitrate algorithm 碼率自適應(yīng)算法)算法決定,而ABR算法又是由CCA(Congestion control algorithm颓遏,擁塞控制算法)產(chǎn)生的信號來決定徐矩, 從而管理幀時(shí)延、網(wǎng)絡(luò)擁塞州泊、帶寬利用率丧蘸。壓縮的幀通過網(wǎng)絡(luò)傳輸,然后在客戶端設(shè)備上解碼和顯示。
低時(shí)延視頻流架構(gòu):
SQP內(nèi)部架構(gòu):
我是為低時(shí)延交互流媒體應(yīng)用設(shè)計(jì)的擁塞控制算法力喷, 比如云游戲刽漂,云VR等應(yīng)用。 這一套擁塞控制算法的目標(biāo)是: 1. 提供實(shí)時(shí)的帶寬估計(jì)弟孟,以確保盡可能大的帶寬利用率和盡可能低的端到端幀時(shí)延贝咙;2. 與其它基于網(wǎng)絡(luò)列隊(duì)的擁塞控制算法共存時(shí),也可以提供有競爭力的吞吐量拂募。
SQP算法實(shí)現(xiàn)
Feeback: 包級別的網(wǎng)絡(luò)反饋 + 幀級別的網(wǎng)絡(luò)反饋
#include "sqp_feedback_adaptor.h"
#include <algorithm>
#include <cstdlib>
#include "common/logger.h"
#include "common_time.h"
namespace BCC {
#define HISTORY_CACHE_MS 60000
#define SRTT_FACTOR 0.6
SQPFeedbackAdaptor::SQPFeedbackAdaptor() {
int i;
for (i = 0; i < FEEDBACK_RTT_WIN_SIZE; ++i) {
rtts_[i] = -1;
}
min_feedback_rtt_ = 10;
num_ = 0;
index_ = 0;
hist_ = new SQPSenderHistory(HISTORY_CACHE_MS);
}
SQPFeedbackAdaptor::~SQPFeedbackAdaptor() {
if (hist_) {
delete hist_;
hist_ = nullptr;
}
for (auto const& pair : frame_map_) {
delete pair.second;
}
frame_map_.clear();
}
void SQPFeedbackAdaptor::AddPacket(uint16_t seq, size_t size) {
PacketFeedBackItem packet;
packet.arrival_ts = -1;
packet.create_ts = packet.send_ts = GET_SYS_MS();
packet.payload_size = size;
packet.sequence_number = seq;
hist_->Add(&packet);
}
void SQPFeedbackAdaptor::AddFrame(uint32_t frame_idx, size_t packet_size, bool first_packet, bool last_packet) {
FrameFeedBackItem* frame_ptr;
int64_t now_ts = GET_SYS_MS();
if (frame_map_.find(frame_idx) != frame_map_.end()) {
frame_ptr = frame_map_[frame_idx];
}
else {
frame_ptr = new FrameFeedBackItem();
frame_ptr->frame_index = frame_idx;
frame_map_[frame_idx] = frame_ptr;
}
frame_ptr->frame_size += packet_size;
if (first_packet) {
frame_ptr->send_start_ts = now_ts;
}
if (last_packet) {
frame_ptr->send_end_ts = now_ts;
hist_->AddFrame(frame_ptr);
delete frame_ptr;
frame_map_.erase(frame_idx);
}
}
int SQPFeedbackAdaptor::OnFeedback(BCC_SQP::FeedBackMsgItem* msg) {
int32_t i = 0, feedback_rtt = 0;
int64_t now_ts = GET_SYS_MS();
int64_t delta_ts = 0;
feedback_rtt = -1;
num_ = 0;
for (i = 0; i < msg->samples_num; i++) {
//根據(jù)反饋的SEQ獲取對應(yīng)的報(bào)文發(fā)送信息庭猩,計(jì)算反饋RTT,更新報(bào)文到達(dá)時(shí)刻
if (hist_->Get(msg->samples[i].seq, &packets_[num_], 1) == 0) {
//計(jì)算反饋RTT
if (packets_[num_].send_ts > 0) {
feedback_rtt = (std::max)(now_ts - packets_[num_].send_ts, (int64_t)feedback_rtt);
rtts_[index_++ % FEEDBACK_RTT_WIN_SIZE] = feedback_rtt;
srtt_ = SRTT_FACTOR * srtt_ + (1 - SRTT_FACTOR) * feedback_rtt;
}
//更新到達(dá)的值
packets_[num_].arrival_ts = msg->samples[i].ts;
delta_ts = packets_[num_].arrival_ts - packets_[num_].send_ts;
if (new_min_one_way_delay_ == 0 || new_min_one_way_delay_ > delta_ts) {
new_min_one_way_delay_ = delta_ts;
}
num_++;
//更新時(shí)間窗內(nèi)的最小one way delay的值
if (now_ts - last_one_way_delay_update_ts_ > srtt_ * 2) {
last_one_way_delay_update_ts_ = now_ts;
min_one_way_delay_ = new_min_one_way_delay_;
new_min_one_way_delay_ = 0;
}
}
}
frame_num_ = 0;
for (i = 0; i < msg->frame_samples_num; i++) {
//根據(jù)反饋的SEQ獲取對應(yīng)的報(bào)文發(fā)送信息,計(jì)算反饋RTT,更新報(bào)文到達(dá)時(shí)刻
if (hist_->GetFrame(msg->frame_samples[i].frame_index, &frames_[frame_num_], 1) == 0) {
//更新到達(dá)的值
frames_[frame_num_].arrival_start_ts = msg->frame_samples[i].arrival_start_ts;
frames_[frame_num_].arrival_end_ts = msg->frame_samples[i].arrival_end_ts;
if (max_frame_size_ < frames_[frame_num_].frame_size) {
max_frame_size_ = frames_[frame_num_].frame_size;
}
frame_num_++;
}
}
//更新報(bào)文與反饋的rtt最小值
if (feedback_rtt > 0) {
min_feedback_rtt_ = rtts_[0];
for (i = 1; i < FEEDBACK_RTT_WIN_SIZE; i++) {
if (min_feedback_rtt_ > rtts_[i] && rtts_[i] > 0) {
min_feedback_rtt_ = rtts_[i];
LOGD("[bcc][feedback] min feed back rtt update {}", min_feedback_rtt_);
}
}
}
//進(jìn)行按到達(dá)時(shí)間的先后順序進(jìn)行排序
FeedbackQsort();
FrameFeedbackQsort();
return num_;
}
} // namespace BCC
帶寬估計(jì)和發(fā)送速率控制:
#include "sqp_congestion_control.h"
#include "common/logger.h"
namespace BCC {
SQPCongestionControl::SQPCongestionControl(uint32_t min_bitrate, uint32_t max_bitrate) {
min_bitrate_ = min_bitrate;
max_bitrate_ = max_bitrate;
bandwidth_ = min_bitrate_;
}
SQPCongestionControl::~SQPCongestionControl() {
}
void SQPCongestionControl::UpdateBandwidthEstimator(uint32_t frame_size, uint32_t max_frame_size, int64_t frame_send_start_ts,
int64_t frame_send_end_ts, int64_t frame_recv_start_ts, int64_t frame_recv_end_ts, int32_t one_way_delay) {
double bandwidth_sample = 0.0f;
double gama = ((double)max_frame_size) / frame_size;
if (frame_recv_end_ts == frame_send_start_ts && frame_recv_end_ts == frame_recv_start_ts) {
return;
}
bandwidth_sample = frame_size * 8 * gama * 1000 / (frame_recv_end_ts - frame_send_start_ts - one_way_delay + (frame_recv_end_ts - frame_recv_start_ts) * (gama - 1));
bandwidth_sample *= T_;
if (bandwidth_ <= 0) {
bandwidth_ = bandwidth_sample;
}
else {
bandwidth_ = bandwidth_ + delta_ * (r_ * (bandwidth_sample / bandwidth_ - 1) - (bandwidth_ / bandwidth_sample - 1));
}
}
uint32_t SQPCongestionControl::ComputePacingRate() {
return (uint32_t)(bandwidth_ * m_);
return 0;
}
};
對比實(shí)驗(yàn):
-
采用panthoen實(shí)驗(yàn)平臺
-
安裝panthoen: 以下三個(gè)主要組件
Local 和 remote兩種網(wǎng)絡(luò)模式
-
編譯llama-sqp生成sender和receiver, 拷貝到panthoen/third_party/llama_sqp下
修改配置:
- 生成src/wrappers/llama_sqp.py:
#!/usr/bin/env python
'''REMOVE ME: Example file to add a new congestion control scheme.
Use Python 2.7 and conform to PEP8.
Use snake_case as file name and make this file executable.
'''
from os import path
from subprocess import check_call
import arg_parser
import context
def main():
# use 'arg_parser' to ensure a common test interface
args = arg_parser.receiver_first() # or 'arg_parser.sender_first()'
# paths to the sender and receiver executables, etc.
cc_repo = path.join(context.third_party_dir, 'llama_sqp')
send_src = path.join(cc_repo, 'transport_sender')
recv_src = path.join(cc_repo, 'transport_receiver')
# [optional] dependencies of Debian packages
if args.option == 'deps':
print 'example_dep_1 example_dep_2'
return
# [optional] persistent setup that only needs to be run once
if args.option == 'setup':
# avoid running as root here
return
# [optional] non-persistent setup that should be performed on every reboot
if args.option == 'setup_after_reboot':
# avoid running as root here
return
# [required] run the first side on port 'args.port'
if args.option == 'receiver':
cmd = [recv_src, args.port]
check_call(cmd)
return
# [required] run the other side to connect to the first side on 'args.ip'
if args.option == 'sender':
cmd = [send_src, args.ip, args.port]
check_call(cmd)
return
if __name__ == '__main__':
main()
配置src/config.yml添加llama_sqp
運(yùn)行實(shí)驗(yàn):
src/experiments/setup.py --install-deps --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/experiments/setup.py --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/experiments/test.py local --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/analysis/analyze.py --data-dir src/experiments/data/
實(shí)驗(yàn)報(bào)告:
-
本地網(wǎng)絡(luò)實(shí)驗(yàn):
Llama-SQP時(shí)延最低陈症,但吞吐量只有72.3%蔼水,遠(yuǎn)沒有利用好帶寬, 待改進(jìn)录肯。
以下是詳細(xì)的實(shí)驗(yàn)數(shù)據(jù):
- 遠(yuǎn)程網(wǎng)絡(luò)實(shí)驗(yàn)