Windows 開發(fā)日記
背景: 把一個多線程網(wǎng)絡軟件移植到Windows上。
基本簡介:
- 軟件類似代理炎疆。轉(zhuǎn)發(fā)上層應用UDP數(shù)據(jù)包,僅添加一些字段在數(shù)據(jù)頭。
- 每一條服務端發(fā)過來的都是消息种呐,有邊界,不是字節(jié)流弃甥。
- 線程A負責接收數(shù)據(jù)(libpcap抓包)爽室,線程B負責發(fā)送數(shù)據(jù)和處理數(shù)據(jù)。線程A收到的數(shù)據(jù)要傳給線程B處理淆攻。數(shù)據(jù)流向:線程A->線程B->上層應用阔墩。
問題:
- Mac/Linux上速度很快,Windows上面速度極其慢瓶珊。非常詭異的是啸箫,通過流量監(jiān)控,發(fā)現(xiàn)軟件下載速度非成∏郏快忘苛,但上層應用收到的數(shù)據(jù)的速度極其慢蝉娜。
過程:
1. UDP
在linux上,線程A和B數(shù)據(jù)收發(fā)是通過unix域套接字扎唾。
在Windows召川,由于消息是有邊界的,所以自然想到了同步方式是UDP胸遇,自然走loopback比較合適荧呐。。沒有選擇命名管道狐榔、完成端口的原因是:軟件的消息隊列用的是libuv坛增,對這兩種SOCK_DGRAM方式的通信不支持。
使用UDP后發(fā)現(xiàn)在Windows上薄腻,軟件速度極其慢收捣。為了比較,在linux上把通信方式也切換為UDP庵楷,發(fā)現(xiàn)linux上速度仍舊很快罢艾。
2. Winpcap不支持某些無線網(wǎng)卡抓包
考慮到有些無線網(wǎng)卡不支持抓包,所以造成沒有速度尽纽。改用有線連接的時候咐蚯,發(fā)現(xiàn)果真能抓數(shù)據(jù)了。但速度仍然很慢弄贿。通過網(wǎng)絡監(jiān)控發(fā)現(xiàn)春锋,軟件的網(wǎng)絡速度非常快差凹,但應用層基本沒速度期奔。
3. Windows loopback上的套接字緩沖很小
線程A發(fā)送一個數(shù)據(jù),我就把cnt1增1危尿,線程B收到一個數(shù)據(jù)我就把cnt2增1. 運行一會兒就發(fā)現(xiàn)呐萌,cnt2 只有cnt1的10%不到。意思是丟包率超過90%谊娇。
由于以前從來沒有在Windows上開發(fā)過肺孤,實在想不通為什么丟包率這么高。
折騰了幾天找不出原因后济欢,網(wǎng)上問了一下赠堵,有人說Windows loopback隊列很小。問題簡單了法褥,把兩個udp socket的buf都增大顾腊,直接設(shè)置成了10M。情況好多了挖胃,但運行久了杂靶,軟件接受數(shù)據(jù)變快以后梆惯,丟包又開始上升。這說明通過簡單的擴大buf size行不通吗垮。
4. 線程A和線程B使用可靠傳輸
既然在loopback下udp丟包率這么高垛吗,自然而然的想到了使用TCP。問題是TCP是面向流的烁登,是沒有消息邊界的怯屉。
TCP消息分界有如下幾種方式,解決這個問題:
- 發(fā)送固定大小的長度N饵沧。消息的長度在數(shù)據(jù)的最前端锨络。接收端先讀取長度len(比如4個字節(jié)的int),然后讀取N - 4個字節(jié)狼牺。后一次讀取的len字節(jié)就是數(shù)據(jù)羡儿。數(shù)據(jù)以后的字節(jié)全部丟棄。
- 用分界符是钥。比如用x12341234來區(qū)分消息掠归。如果數(shù)據(jù)中本身包含分界符,那么再用一個分界符去轉(zhuǎn)移悄泥。這種在以太網(wǎng)中也有應用虏冻。
- 按
數(shù)據(jù)長度+數(shù)據(jù)組成
來發(fā)送,大小不固定弹囚。
第一種方式厨相,比較容易,但容易浪費帶寬鸥鹉。
第二種方式蛮穿,比較費時間,每一次都要遍歷數(shù)據(jù)宋舷,不可取绪撵。
第三種方式瓢姻,邏輯比較復雜祝蝠,比如讀取消息可能不完整,讀取了幾個消息幻碱。任何一次出錯會造成后面所有的數(shù)據(jù)都出錯绎狭。要求正確率100%。
最終選擇了第三種方式褥傍。然后再也沒有出現(xiàn)軟件速度飛快儡嘶,上層APP沒速度的情況了。
5. 實現(xiàn)
下面是部分代碼恍风。
發(fā)送端:
int PacketStream::Send(int nread, const rbuf_t &rbuf) {
if (nread > 0) {
char buf[MAX_BUF_SIZE];
*(COUNT_TYPE *)buf = nread;
char *p = buf + COUNT_SIZE;
memcpy(p, rbuf.base, nread);
assert(nread + COUNT_SIZE <= MAX_BUF_SIZE);
return doSend(buf, COUNT_SIZE + nread);
}
return nread;
}
int PacketStream::doSend(const char *buf, int nread) {
if (nread > 0) {
// LOGD << "nread: " << nread;
assert(nread < OM_MAX_PKT_SIZE);
int nret = send(mWriteSock, buf, nread, 0);
LOGE_IF(nret < 0) << "send error: " << strerror(errno) << ", nret: " << nret;
return nret;
}
return nread;
}
接收端:
int PacketStream::rawInput(int nread, const char *buf) {
if (nread > 0) {
assert(mNextLen <= MAX_BUF_SIZE);
if (nread > (RSOCK_UV_MAX_BUF + MAX_BUF_SIZE)) {
LOGE << "nread too large: " << nread;
assert(0);
}
const int totLen = nread + mBufLen; // data in buf and in mTempBuf, not including length(4) of mBufLen
//LOGD << "mNextLen: " << mNextLen << ", mBufLen: " << mBufLen << ", nread: " << nread << ", totLen: " << totLen;
if (totLen > mNextLen) { // totLen > mNextLen. copy two buf into one
char *pbuf = mLargeBuf; // mLargeBuf is large enough to store both mTempBuf and buf
int left = totLen;
if (mNextLen > 0) { // if nextLen valid. prepend it to buf.
*(COUNT_TYPE*)pbuf = mNextLen;
pbuf += COUNT_SIZE;
left += COUNT_SIZE; // DON'T FORGET THIS !!!!
}
memcpy(pbuf, mTempBuf, mBufLen);
pbuf += mBufLen;
memcpy(pbuf, buf, nread);
const char *p = mLargeBuf;
int nret = 0;
std::vector<int> head;
// now mLargeBuf always starts with len information
while (left >= 0) {
if (left < COUNT_SIZE) { // left is only part of int, prevent invalid memory access.
markTempBuf(0, p, left);
// LOGD << "mNextLen: " << mNextLen << ", left: " << left << ", mBufLen: " << mBufLen;
break;
}
COUNT_TYPE nextLen = *(COUNT_TYPE *)p;
//LOGD << "nextLen: " << nextLen << ", left: " << left;
head.push_back(nextLen);
assert(nextLen > 0 && nextLen < MAX_BUF_SIZE);
p += COUNT_SIZE;
left -= COUNT_SIZE;
if (nextLen <= left) {
nret = Input(nextLen, new_buf(nextLen, p, nullptr));
left -= nextLen;
p += nextLen;
//LOGD << "nret: " << nret << ", nextLen: " << nextLen << ", left: " << left << ", p - mLargeBuf: " << (p - mLargeBuf);
if (left >= COUNT_SIZE) {
COUNT_TYPE temp = *(COUNT_TYPE*)p;
if (temp <= 0 || temp > OM_MAX_PKT_SIZE) {
LOGE << "temp error: " << temp;
}
}
} else {
markTempBuf(nextLen, p, left);
//LOGD << "nextLen: " << nextLen << ", left: " << left;
break;
}
}
return nret;
} else if (totLen < mNextLen) { // if not enough data. just copy
memcpy(mTempBuf + mBufLen, buf, nread);
mBufLen += nread;
//LOGD << "less than: mNextLen: " << mNextLen << ", mBufLen: " << mBufLen << ", nread: " << nread;
return 0;
} else { // equal: (totLen == mNextLen)
memcpy(mTempBuf + mBufLen, buf, nread);
// LOGD << "input: " << totLen;
int n = Input(totLen, new_buf(totLen, mTempBuf, nullptr));
markTempBuf(0, nullptr, 0);
//LOGD << "euqal, after input: mNextLen: " << mNextLen << ", mBufLen: " << mBufLen << ", nread: " << nread;
return n;
}
} else if (nread == UV_EOF) {
markTempBuf(0, nullptr, 0); // todo: when error occurs, should close this synconn and create a new one
LOGE << "nread = EOF";
} else if (nread < 0) {
LOGE << "error: " << uv_strerror(nread);
assert(0); // todo: when error occurs, should close this synconn and create a new one
}
return nread;
}