本文為
CocoaAsyncSocket
Write钧萍,主要介紹GCDAsyncSpecialPacket
和GCDAsyncWritePacket
類型數(shù)據(jù)的處理风瘦,還有核心寫入方法doWriteData
三種不同方式的寫入
注:由于該框架源碼篇幅過大万搔,且有大部分相對抽象的數(shù)據(jù)操作邏輯瞬雹,盡管樓主竭力想要簡單的去陳述相關(guān)內(nèi)容呢诬,但是閱讀起來仍會有一定的難度馅巷。如果不是誠心想學(xué)習(xí)IM
相關(guān)知識钓猬,在這里就可以離場了...
iOS- CocoaAsyncSocket源碼解析(Connect 上)
iOS- CocoaAsyncSocket源碼解析(Connect 下)
iOS- CocoaAsyncSocket源碼解析(Read 上)
iOS- CocoaAsyncSocket源碼解析(Read 下)
注:文中涉及代碼比較多,建議大家結(jié)合源碼一起閱讀比較容易能加深理解澳迫。這里有樓主標(biāo)注好注釋的源碼橄登,有需要的可以作為參照:CocoaAsyncSocket源碼注釋
我們切入口
//寫數(shù)據(jù)對外方法
- (void)writeData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag
{
if ([data length] == 0) return;
//初始化寫包
GCDAsyncWritePacket *packet = [[GCDAsyncWritePacket alloc] initWithData:data timeout:timeout tag:tag];
dispatch_async(socketQueue, ^{ @autoreleasepool {
LogTrace();
if ((flags & kSocketStarted) && !(flags & kForbidReadsWrites))
{
[writeQueue addObject:packet];
//離隊執(zhí)行
[self maybeDequeueWrite];
}
}});
// Do not rely on the block being run in order to release the packet,
// as the queue might get released without the block completing.
}
寫法類似Read
- 初始化寫包 :
GCDAsyncWritePacket
- 寫入包放入我們的寫入隊列(數(shù)組)
[writeQueue addObject:packet];
- 離隊執(zhí)行
[self maybeDequeueWrite];
寫入包谣妻,添加隊列沒什么講的了蹋半,不太清楚的小伙伴可以參考
iOS- CocoaAsyncSocket源碼解析(Read 上)
iOS- CocoaAsyncSocket源碼解析(Read 下)
下面重點解析maybeDequeueWrite
- (void)maybeDequeueWrite
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
// If we're not currently processing a write AND we have an available write stream
if ((currentWrite == nil) && (flags & kConnected))
{
if ([writeQueue count] > 0)
{
// Dequeue the next object in the write queue
currentWrite = [writeQueue objectAtIndex:0];
[writeQueue removeObjectAtIndex:0];
//TLS
if ([currentWrite isKindOfClass:[GCDAsyncSpecialPacket class]])
{
LogVerbose(@"Dequeued GCDAsyncSpecialPacket");
// Attempt to start TLS
flags |= kStartingWriteTLS;
// This method won't do anything unless both kStartingReadTLS and kStartingWriteTLS are set
[self maybeStartTLS];
}
else
{
LogVerbose(@"Dequeued GCDAsyncWritePacket");
// Setup write timer (if needed)
[self setupWriteTimerWithTimeout:currentWrite->timeout];
// Immediately write, if possible
[self doWriteData];
}
}
//寫超時導(dǎo)致的錯誤
else if (flags & kDisconnectAfterWrites)
{
//如果沒有可讀任務(wù),直接關(guān)閉socket
if (flags & kDisconnectAfterReads)
{
if (([readQueue count] == 0) && (currentRead == nil))
{
[self closeWithError:nil];
}
}
else
{
[self closeWithError:nil];
}
}
}
}
- 我們首先做了一些是否連接充坑,寫入隊列任務(wù)是否大于0等等一些判斷
- 接著我們從全局的
writeQueue
中减江,拿到第一條任務(wù),去做讀取捻爷,我們來判斷這個任務(wù)的類型辈灼,如果是GCDAsyncSpecialPacket
類型的役衡,我們將開啟TLS認(rèn)證 - 如果是是我們之前加入隊列中的
GCDAsyncWritePacket
類型茵休,我們則開始讀取操作薪棒,調(diào)用doWriteData
- 如果沒有可讀任務(wù)手蝎,直接關(guān)閉socket
其中 maybeStartTLS
我們解析過了,我們就只要來看看核心寫入方法:doWriteData
- (void)doWriteData
{
LogTrace();
// This method is called by the writeSource via the socketQueue
//錯誤俐芯,不寫
if ((currentWrite == nil) || (flags & kWritesPaused))
{
LogVerbose(@"No currentWrite or kWritesPaused");
// Unable to write at this time
//
if ([self usingCFStreamForTLS])
{
// CFWriteStream only fires once when there is available data.
// It won't fire again until we've invoked CFWriteStreamWrite.
}
else
{
// If the writeSource is firing, we need to pause it
// or else it will continue to fire over and over again.
//如果socket中可接受寫數(shù)據(jù)棵介,防止反復(fù)觸發(fā)寫source,掛起
if (flags & kSocketCanAcceptBytes)
{
[self suspendWriteSource];
}
}
return;
}
//如果當(dāng)前socket無法在寫數(shù)據(jù)了
if (!(flags & kSocketCanAcceptBytes))
{
LogVerbose(@"No space available to write...");
// No space available to write.
//如果不是cfstream
if (![self usingCFStreamForTLS])
{
// Need to wait for writeSource to fire and notify us of
// available space in the socket's internal write buffer.
//則恢復(fù)寫source吧史,當(dāng)有空間去寫的時候邮辽,會觸發(fā)回來
[self resumeWriteSource];
}
return;
}
//如果正在進(jìn)行TLS認(rèn)證
if (flags & kStartingWriteTLS)
{
LogVerbose(@"Waiting for SSL/TLS handshake to complete");
// The writeQueue is waiting for SSL/TLS handshake to complete.
if (flags & kStartingReadTLS)
{
//如果是安全通道,并且I/O阻塞贸营,那么重新去握手
if ([self usingSecureTransportForTLS] && lastSSLHandshakeError == errSSLWouldBlock)
{
// We are in the process of a SSL Handshake.
// We were waiting for available space in the socket's internal OS buffer to continue writing.
[self ssl_continueSSLHandshake];
}
}
//說明不走`TLS`了吨述,因為只支持寫的TLS
else
{
// We are still waiting for the readQueue to drain and start the SSL/TLS process.
// We now know we can write to the socket.
//掛起寫source
if (![self usingCFStreamForTLS])
{
// Suspend the write source or else it will continue to fire nonstop.
[self suspendWriteSource];
}
}
return;
}
// Note: This method is not called if currentWrite is a GCDAsyncSpecialPacket (startTLS packet)
//開始寫數(shù)據(jù)
BOOL waiting = NO;
NSError *error = nil;
size_t bytesWritten = 0;
//安全連接
if (flags & kSocketSecure)
{
//CFStreamForTLS
if ([self usingCFStreamForTLS])
{
#if TARGET_OS_IPHONE
//
// Writing data using CFStream (over internal TLS)
//
const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes] + currentWrite->bytesDone;
//寫的長度為buffer長度-已寫長度
NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone;
if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
{
bytesToWrite = SIZE_MAX;
}
//往writeStream中寫入數(shù)據(jù), bytesToWrite寫入的長度
CFIndex result = CFWriteStreamWrite(writeStream, buffer, (CFIndex)bytesToWrite);
LogVerbose(@"CFWriteStreamWrite(%lu) = %li", (unsigned long)bytesToWrite, result);
//寫錯誤
if (result < 0)
{
error = (__bridge_transfer NSError *)CFWriteStreamCopyError(writeStream);
}
else
{
//拿到已寫字節(jié)數(shù)
bytesWritten = (size_t)result;
// We always set waiting to true in this scenario.
//我們經(jīng)常設(shè)置等待來信任這個方案
// CFStream may have altered our underlying socket to non-blocking.
//CFStream很可能修改socket為非阻塞
// Thus if we attempt to write without a callback, we may end up blocking our queue.
//因此钞脂,我們嘗試去寫揣云,而不用回調(diào)。 我們可能終止我們的隊列冰啃。
waiting = YES;
}
#endif
}
//SSL寫的方式
else
{
// We're going to use the SSLWrite function.
//
// OSStatus SSLWrite(SSLContextRef context, const void *data, size_t dataLength, size_t *processed)
//
// Parameters:
// context - An SSL session context reference.
// data - A pointer to the buffer of data to write.
// dataLength - The amount, in bytes, of data to write.
// processed - On return, the length, in bytes, of the data actually written.
//
// It sounds pretty straight-forward,
//看起來相當(dāng)直觀邓夕,但是這里警告你應(yīng)注意。
// but there are a few caveats you should be aware of.
//
// The SSLWrite method operates in a non-obvious (and rather annoying) manner.
// According to the documentation:
// 這個SSLWrite方法使用著一個不明顯的方法(相當(dāng)討厭)導(dǎo)致了下面這些事阎毅。
// Because you may configure the underlying connection to operate in a non-blocking manner,
//因為你要辨別出下層連接 操縱 非阻塞的方法焚刚,一個寫的操作將返回errSSLWouldBlock,表明需要寫的數(shù)據(jù)少了扇调。
// a write operation might return errSSLWouldBlock, indicating that less data than requested
// was actually transferred. In this case, you should repeat the call to SSLWrite until some
//在這種情況下你應(yīng)該重復(fù)調(diào)用SSLWrite矿咕,直到一些其他結(jié)果被返回
// other result is returned.
// This sounds perfect, but when our SSLWriteFunction returns errSSLWouldBlock,
//這樣聽起來很完美,但是當(dāng)SSLWriteFunction返回errSSLWouldBlock,SSLWrite返回但是卻設(shè)置了進(jìn)度長度碳柱?
// then the SSLWrite method returns (with the proper errSSLWouldBlock return value),
// but it sets processed to dataLength !!
//
// In other words, if the SSLWrite function doesn't completely write all the data we tell it to,
//另外雌团,SSLWrite方法沒有完整的寫完我們給的所有數(shù)據(jù),因此它沒有告訴我們到底寫了多少數(shù)據(jù)士聪,
// then it doesn't tell us how many bytes were actually written. So, for example, if we tell it to
//因此锦援。舉個例子,如果我們告訴它去寫256個字節(jié)剥悟,它可能只寫了128個字節(jié)灵寺,但是告訴我們寫了0個字節(jié)
// write 256 bytes then it might actually write 128 bytes, but then report 0 bytes written.
//
// You might be wondering:
//你可能會覺得奇怪,如果這個方法不告訴我們寫了多少字節(jié)区岗,那么該如何去更新參數(shù)來應(yīng)對下一次的SSLWrite略板?
// If the SSLWrite function doesn't tell us how many bytes were written,
// then how in the world are we supposed to update our parameters (buffer & bytesToWrite)
// for the next time we invoke SSLWrite?
//
// The answer is that SSLWrite cached all the data we told it to write,
//答案就是,SSLWrite緩存了所有的數(shù)據(jù)我們要它寫的慈缔。并且拉出這些數(shù)據(jù)叮称,只要我們下次調(diào)用SSLWrite。
// and it will push out that data next time we call SSLWrite.
// If we call SSLWrite with new data, it will push out the cached data first, and then the new data.
//如果我們用新的data調(diào)用SSLWrite,它會拉出這些緩存的數(shù)據(jù)藐鹤,然后才輪到新數(shù)據(jù)
// If we call SSLWrite with empty data, then it will simply push out the cached data.
// 如果我們調(diào)用SSLWrite用一個空的數(shù)據(jù)瓤檐,則它僅僅會拉出緩存數(shù)據(jù)。
// For this purpose we're going to break large writes into a series of smaller writes.
//為了這個目的娱节,我們?nèi)シ珠_一個大數(shù)據(jù)寫成一連串的小數(shù)據(jù)挠蛉,它允許我們?nèi)蟾孢M(jìn)度給代理。
// This allows us to report progress back to the delegate.
OSStatus result;
//SSL緩存的寫的數(shù)據(jù)
BOOL hasCachedDataToWrite = (sslWriteCachedLength > 0);
//是否有新數(shù)據(jù)要寫
BOOL hasNewDataToWrite = YES;
if (hasCachedDataToWrite)
{
size_t processed = 0;
//去寫空指針肄满,就是拉取了所有的緩存SSL數(shù)據(jù)
result = SSLWrite(sslContext, NULL, 0, &processed);
//如果寫成功
if (result == noErr)
{
//拿到寫的緩存長度
bytesWritten = sslWriteCachedLength;
//置空緩存長度
sslWriteCachedLength = 0;
//判斷當(dāng)前需要寫的buffer長度谴古,是否和已寫的大小+緩存 大小相等
if ([currentWrite->buffer length] == (currentWrite->bytesDone + bytesWritten))
{
// We've written all data for the current write.
//相同則不需要再寫新數(shù)據(jù)了
hasNewDataToWrite = NO;
}
}
//有錯
else
{
//IO阻塞,等待
if (result == errSSLWouldBlock)
{
waiting = YES;
}
//報錯
else
{
error = [self sslError:result];
}
// Can't write any new data since we were unable to write the cached data.
//如果讀寫cache出錯稠歉,我們暫時不能去讀后面的數(shù)據(jù)
hasNewDataToWrite = NO;
}
}
//如果還有數(shù)據(jù)去讀
if (hasNewDataToWrite)
{
//拿到buffer偏移位置
const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes]
+ currentWrite->bytesDone
+ bytesWritten;
//得到需要讀的長度
NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone - bytesWritten;
//如果大于最大值,就等于最大值
if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
{
bytesToWrite = SIZE_MAX;
}
size_t bytesRemaining = bytesToWrite;
//循環(huán)值
BOOL keepLooping = YES;
while (keepLooping)
{
//最大寫的字節(jié)數(shù)掰担?
const size_t sslMaxBytesToWrite = 32768;
//得到二者小的,得到需要寫的字節(jié)數(shù)
size_t sslBytesToWrite = MIN(bytesRemaining, sslMaxBytesToWrite);
//已寫字節(jié)數(shù)
size_t sslBytesWritten = 0;
//將結(jié)果從buffer中寫到socket上(經(jīng)由了這個函數(shù)怒炸,數(shù)據(jù)就加密了)
result = SSLWrite(sslContext, buffer, sslBytesToWrite, &sslBytesWritten);
//如果寫成功
if (result == noErr)
{
//buffer指針偏移
buffer += sslBytesWritten;
//加上些的數(shù)量
bytesWritten += sslBytesWritten;
//減去仍需寫的數(shù)量
bytesRemaining -= sslBytesWritten;
//判斷是否需要繼續(xù)循環(huán)
keepLooping = (bytesRemaining > 0);
}
else
{
//IO阻塞
if (result == errSSLWouldBlock)
{
waiting = YES;
//得到緩存的大写ァ(后續(xù)長度會被自己寫到SSL緩存去)
sslWriteCachedLength = sslBytesToWrite;
}
else
{
error = [self sslError:result];
}
//跳出循環(huán)
keepLooping = NO;
}
} // while (keepLooping)
} // if (hasNewDataToWrite)
}
}
//普通socket
else
{
//
// Writing data directly over raw socket
//
//拿到當(dāng)前socket
int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
//得到指針偏移
const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes] + currentWrite->bytesDone;
NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone;
if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
{
bytesToWrite = SIZE_MAX;
}
//直接寫
ssize_t result = write(socketFD, buffer, (size_t)bytesToWrite);
LogVerbose(@"wrote to socket = %zd", result);
// Check results
if (result < 0)
{
//IO阻塞
if (errno == EWOULDBLOCK)
{
waiting = YES;
}
else
{
error = [self errnoErrorWithReason:@"Error in write() function"];
}
}
else
{
//得到寫的大小
bytesWritten = result;
}
}
// We're done with our writing.
// If we explictly ran into a situation where the socket told us there was no room in the buffer,
// then we immediately resume listening for notifications.
//
// We must do this before we dequeue another write,
// as that may in turn invoke this method again.
//
// Note that if CFStream is involved, it may have maliciously put our socket in blocking mode.
//注意,如果用CFStream,很可能會被惡意的放置數(shù)據(jù) 阻塞socket
//如果等待横媚,則恢復(fù)寫source
if (waiting)
{
//把socket可接受數(shù)據(jù)的標(biāo)記去掉
flags &= ~kSocketCanAcceptBytes;
if (![self usingCFStreamForTLS])
{
//恢復(fù)寫source
[self resumeWriteSource];
}
}
// Check our results
//判斷是否完成
BOOL done = NO;
//判斷已寫大小
if (bytesWritten > 0)
{
// Update total amount read for the current write
//更新當(dāng)前總共寫的大小
currentWrite->bytesDone += bytesWritten;
LogVerbose(@"currentWrite->bytesDone = %lu", (unsigned long)currentWrite->bytesDone);
// Is packet done?
//判斷當(dāng)前寫包是否寫完
done = (currentWrite->bytesDone == [currentWrite->buffer length]);
}
//如果完成了
if (done)
{
//完成操作
[self completeCurrentWrite];
if (!error)
{
dispatch_async(socketQueue, ^{ @autoreleasepool{
//開始下一次的讀取任務(wù)
[self maybeDequeueWrite];
}});
}
}
//未完成
else
{
// We were unable to finish writing the data,
// so we're waiting for another callback to notify us of available space in the lower-level output buffer.
//如果不是等待 而且沒有出錯
if (!waiting && !error)
{
// This would be the case if our write was able to accept some data, but not all of it.
//這是我們寫了一部分?jǐn)?shù)據(jù)的情況纠炮。
//去掉可接受數(shù)據(jù)的標(biāo)記
flags &= ~kSocketCanAcceptBytes;
//再去等讀source觸發(fā)
if (![self usingCFStreamForTLS])
{
[self resumeWriteSource];
}
}
//如果已寫大于0
if (bytesWritten > 0)
{
// We're not done with the entire write, but we have written some bytes
__strong id theDelegate = delegate;
//調(diào)用寫的進(jìn)度代理
if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didWritePartialDataOfLength:tag:)])
{
long theWriteTag = currentWrite->tag;
dispatch_async(delegateQueue, ^{ @autoreleasepool {
[theDelegate socket:self didWritePartialDataOfLength:bytesWritten tag:theWriteTag];
}});
}
}
}
// Check for errors
//如果有錯,則報錯斷開連接
if (error)
{
[self closeWithError:[self errnoErrorWithReason:@"Error in write() function"]];
}
// Do not add any code here without first adding a return statement in the error case above.
}
- 這里不同
doRead
的是沒有提前通過flush寫入鏈路層 - 如果socket中可接受寫數(shù)據(jù)灯蝴,防止反復(fù)觸發(fā)寫source恢口,掛起
- 如果當(dāng)前socket無法在寫數(shù)據(jù)了,則恢復(fù)寫source,當(dāng)有空間去寫的時候穷躁,會觸發(fā)回來
-
如果正在進(jìn)行TLS認(rèn)證 如果是安全通道耕肩,并且I/O阻塞因妇,那么重新去握手
-
下面是寫入的三種方式
CFStreamForTLS
- SSL寫的方式
if (hasNewDataToWrite)
{
//拿到buffer偏移位置
const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes]
+ currentWrite->bytesDone
+ bytesWritten;
//得到需要讀的長度
NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone - bytesWritten;
//如果大于最大值,就等于最大值
if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
{
bytesToWrite = SIZE_MAX;
}
size_t bytesRemaining = bytesToWrite;
//循環(huán)值
BOOL keepLooping = YES;
while (keepLooping)
{
//最大寫的字節(jié)數(shù)?
const size_t sslMaxBytesToWrite = 32768;
//得到二者小的猿诸,得到需要寫的字節(jié)數(shù)
size_t sslBytesToWrite = MIN(bytesRemaining, sslMaxBytesToWrite);
//已寫字節(jié)數(shù)
size_t sslBytesWritten = 0;
//將結(jié)果從buffer中寫到socket上(經(jīng)由了這個函數(shù)婚被,數(shù)據(jù)就加密了)
result = SSLWrite(sslContext, buffer, sslBytesToWrite, &sslBytesWritten);
//如果寫成功
if (result == noErr)
{
//buffer指針偏移
buffer += sslBytesWritten;
//加上些的數(shù)量
bytesWritten += sslBytesWritten;
//減去仍需寫的數(shù)量
bytesRemaining -= sslBytesWritten;
//判斷是否需要繼續(xù)循環(huán)
keepLooping = (bytesRemaining > 0);
}
else
{
//IO阻塞
if (result == errSSLWouldBlock)
{
waiting = YES;
//得到緩存的大小(后續(xù)長度會被自己寫到SSL緩存去)
sslWriteCachedLength = sslBytesToWrite;
}
else
{
error = [self sslError:result];
}
//跳出循環(huán)
keepLooping = NO;
}
} // while (keepLooping)
這里還有對殘余數(shù)據(jù)的處理:是通過指針buffer獲取我們的keepLooping
循環(huán)值梳虽,循環(huán)進(jìn)行寫入
//將結(jié)果從buffer中寫到socket上(經(jīng)由了這個函數(shù)址芯,數(shù)據(jù)就加密了)
result = SSLWrite(sslContext, buffer, sslBytesToWrite, &sslBytesWritten);
- 普通socket寫入
- 也做了完成判斷
//判斷是否完成
BOOL done = NO;
//判斷已寫大小
if (bytesWritten > 0)
{
// Update total amount read for the current write
//更新當(dāng)前總共寫的大小
currentWrite->bytesDone += bytesWritten;
LogVerbose(@"currentWrite->bytesDone = %lu", (unsigned long)currentWrite->bytesDone);
// Is packet done?
//判斷當(dāng)前寫包是否寫完
done = (currentWrite->bytesDone == [currentWrite->buffer length]);
}
同樣為的也是三種數(shù)據(jù)包:一次性包,粘包窜觉,斷包
//如果完成了
if (done)
{
//完成操作
[self completeCurrentWrite];
if (!error)
{
dispatch_async(socketQueue, ^{ @autoreleasepool{
//開始下一次的讀取任務(wù)
[self maybeDequeueWrite];
}});
}
}
//未完成
else
{
// We were unable to finish writing the data,
// so we're waiting for another callback to notify us of available space in the lower-level output buffer.
//如果不是等待 而且沒有出錯
if (!waiting && !error)
{
// This would be the case if our write was able to accept some data, but not all of it.
//這是我們寫了一部分?jǐn)?shù)據(jù)的情況谷炸。
//去掉可接受數(shù)據(jù)的標(biāo)記
flags &= ~kSocketCanAcceptBytes;
//再去等讀source觸發(fā)
if (![self usingCFStreamForTLS])
{
[self resumeWriteSource];
}
}
//如果已寫大于0
if (bytesWritten > 0)
{
// We're not done with the entire write, but we have written some bytes
__strong id theDelegate = delegate;
//調(diào)用寫的進(jìn)度代理
if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didWritePartialDataOfLength:tag:)])
{
long theWriteTag = currentWrite->tag;
dispatch_async(delegateQueue, ^{ @autoreleasepool {
[theDelegate socket:self didWritePartialDataOfLength:bytesWritten tag:theWriteTag];
}});
}
}
}
那么整個 CocoaAsyncSocket Wirte的解析就到這里完成了,當(dāng)你讀完前面幾篇禀挫,再來看這篇就跟喝水一樣旬陡,故:知識在于積累