這一章節(jié)秧了,我們通過例子學(xué)習(xí)netty的一些高級特性。
1序无、netty客戶端流控
在有些場景下验毡,由于各種原因,會導(dǎo)致客戶端消息發(fā)送積壓帝嗡,進而導(dǎo)致OOM晶通。
- 1、當netty服務(wù)端并發(fā)壓力過大哟玷,超過了服務(wù)端的處理能力時狮辽,channel中的消息服務(wù)端不能及時消費,這時channel堵塞巢寡,客戶端消息就會堆積在發(fā)送隊列中
- 2喉脖、網(wǎng)絡(luò)瓶頸,當客戶端發(fā)送速度超過網(wǎng)絡(luò)鏈路處理能力抑月,會導(dǎo)致客戶端發(fā)送隊列積壓
- 3树叽、當對端讀取速度小于己方發(fā)送速度,導(dǎo)致自身TCP發(fā)送緩沖區(qū)滿谦絮,頻繁發(fā)生write 0字節(jié)時题诵,待發(fā)送消息會在netty發(fā)送隊列中排隊
這三種情況下须误,如果客戶端沒有流控保護,這時候就很容易發(fā)生內(nèi)存泄露仇轻。
原因:
在我們調(diào)用channel的write和writeAndFlush時
io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise),如果發(fā)送方為業(yè)務(wù)線程奶甘,則將發(fā)送操作封裝成WriteTask(繼承Runnable)篷店,放到Netty的NioEventLoop中執(zhí)行,當NioEventLoop無法完成如此多的消息的發(fā)送的時候臭家,發(fā)送任務(wù)隊列積壓疲陕,進而導(dǎo)致內(nèi)存泄漏。
解決方案:
為了防止在高并發(fā)場景下钉赁,由于服務(wù)端處理慢導(dǎo)致的客戶端消息積壓蹄殃,客戶端需要做并發(fā)保護,防止自身發(fā)生消息積壓你踩。Netty提供了一個高低水位機制诅岩,可以實現(xiàn)客戶端精準的流控。
io.netty.channel.ChannelConfig#setWriteBufferHighWaterMark 高水位
io.netty.channel.ChannelConfig#setWriteBufferLowWaterMark 低水位
當發(fā)送隊列待發(fā)送的字節(jié)數(shù)組達到高水位時带膜,對應(yīng)的channel就變?yōu)椴豢蓪憼顟B(tài)吩谦,由于高水位并不影響業(yè)務(wù)線程調(diào)用write方法把消息加入到待發(fā)送隊列,因此在消息發(fā)送時要先對channel的狀態(tài)進行判斷(ctx.channel().isWritable)膝藕。
這里涉及到的知識點是netty的消息發(fā)送機制式廷。
netty的消息發(fā)送機制
業(yè)務(wù)調(diào)用write方法后,經(jīng)過ChannelPipeline職責(zé)鏈處理芭挽,消息被投遞到發(fā)送緩沖區(qū)待發(fā)送滑废,調(diào)用flush之后會執(zhí)行真正的發(fā)送操作,底層通過調(diào)用Java NIO的SocketChannel進行非阻塞write操作袜爪,將消息發(fā)送到網(wǎng)絡(luò)上蠕趁,
當用戶線程(業(yè)務(wù)線程)發(fā)起write操作時,Netty會進行判斷饿敲,如果發(fā)現(xiàn)不少NioEventLoop(I/O線程)妻导,則將發(fā)送消息封裝成WriteTask,放入NioEventLoop的任務(wù)隊列怀各,由NioEventLoop線程執(zhí)行倔韭,代碼如下
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
//這里的executor執(zhí)行的是netty自己實現(xiàn)的SingleThreadEventExecutor#execute方法,
executor.execute(runnable);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}
io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
Netty的NioEventLoop線程內(nèi)部維護了一個Queue<Runnable> taskQuue瓢对,除了處理網(wǎng)絡(luò)IO讀寫操作寿酌,同時還負責(zé)執(zhí)行網(wǎng)絡(luò)讀寫相關(guān)的Task,NioEventLoop遍歷taskQueue硕蛹,執(zhí)行消息發(fā)送任務(wù)醇疼,代碼調(diào)用入路徑如下硕并,具體的就不貼了,太長了
io.netty.channel.nio.NioEventLoop#run
-----> io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
----->io.netty.util.concurrent.AbstractEventExecutor#safeExecute
這里safeExecute執(zhí)行的task秧荆,就是前面write寫入時包裝的AbstractWriteTask倔毙,AbstractWriteTask的run中
io.netty.channel.AbstractChannelHandlerContext.AbstractWriteTask#run
經(jīng)過一些系統(tǒng)處理操作,最終會調(diào)用io.netty.channel.ChannelOutboundBuffer#addMessage方法乙濒,將發(fā)送消息加入發(fā)送隊列(鏈表)陕赃。
我們上面寫的流程從NioSocketChannel到ChnnelOutbountBuffer,實際上在這個過程中颁股,為了對發(fā)送速度和消息積壓數(shù)進行控制么库,Netty還提供了高低水位機制,當消息隊列中積壓的待發(fā)送消息總字節(jié)數(shù)到達高水位時甘有,修改Channel的狀態(tài)為不可寫诉儒,并發(fā)送通知事件;當消息發(fā)送完成后亏掀,對低水位進行判斷忱反,如果當前積壓的待發(fā)送字節(jié)數(shù)低于低水位時,則修改channel狀態(tài)為可寫滤愕,并發(fā)送通知事件缭受,具體代碼見下
io.netty.channel.ChannelOutboundBuffer#incrementPendingOutboundBytes(long);
io.netty.channel.ChannelOutboundBuffer#decrementPendingOutboundBytes(long)该互;
總結(jié):在實際項目中米者,根據(jù)業(yè)務(wù)QPS規(guī)劃,客戶端處理性能宇智、網(wǎng)絡(luò)帶寬蔓搞、鏈路數(shù)、消息平均碼流大小等綜合因數(shù)随橘,設(shè)置Netty高水位(setWriteBufferHighWaterMark)值喂分,可以防止在發(fā)送隊列處于高水位時繼續(xù)發(fā)送消息,導(dǎo)致積壓更嚴重机蔗,甚至發(fā)生內(nèi)存泄漏蒲祈。在系統(tǒng)中合理利用Netty的高低水位機制做消息發(fā)送的流控,既可以保護自身萝嘁,同時又能減輕服務(wù)端的壓力梆掸,可以提升系統(tǒng)的可靠性。
那么代碼中牙言,怎么使用呢酸钦?
同時在業(yè)務(wù)發(fā)送消息時,添加socketChannel.isWritable()是否可以發(fā)送判斷
public static boolean sendMessage(String clientId,Object message){
if(StringUtils.isEmpty(clientId)){
log.error(" clientId 為空咱枉,找不到客戶端卑硫!");
return false;
}
SocketChannel socketChannel = FactoryMap.getChannelByDevNo(clientId);
if(socketChannel !=null ){
if(socketChannel.isWritable()){
socketChannel.writeAndFlush(message);
//更新數(shù)據(jù)庫中消息狀態(tài)
return true;
}else {
log.error("channel不可寫");
return false;
}
}else {
log.error(" 客戶端未連接服務(wù)器徒恋!發(fā)送消息失敗欢伏!{}",clientId);
}
return false;
}
2入挣、netty服務(wù)端 流量整形
前面講的流控(高低水位控制),主要是根據(jù)發(fā)送消息隊列積壓的大小來控制客戶端channel的寫狀態(tài)硝拧,然后用戶手動根據(jù)channel.isWritable()來控制消息是否發(fā)送财岔,用戶可以手動控制消息不能及時發(fā)送后的處理方案(比如,過期河爹、超時)。通常用在客戶端比較多桐款。
流量整形呢咸这,是一種主動調(diào)整流量輸出速度的措施,一個典型的應(yīng)用是基于下游網(wǎng)絡(luò)節(jié)點的TPS指標控制本地流量的輸出魔眨。大多數(shù)商用系統(tǒng)都由多個網(wǎng)元或者部件組成媳维,例如參與短信互動,會涉及手機遏暴,基站侄刽,短信中心,短信網(wǎng)關(guān)朋凉,SP/CP等網(wǎng)元州丹,不同網(wǎng)元或者部件的處理性能不同,為了防止突發(fā)的業(yè)務(wù)洪峰的 導(dǎo)致下游網(wǎng)元被沖垮杂彭,有時候需要消停提供流量整形功能墓毒。
Netty流量整形的主要作用:
1、防止由于上下游網(wǎng)元性能不均衡導(dǎo)致下游網(wǎng)元被沖垮亲怠,業(yè)務(wù)流程中斷所计;
2、防止由于通信模塊接收消息過快团秽,后端業(yè)務(wù)線程處理不及時主胧,導(dǎo)致出現(xiàn)“撐死”問題。
例如习勤,之前有博客的讀者咨詢過我一個問題踪栋,他們設(shè)備向服務(wù)端不間斷的上報數(shù)據(jù),有1G左右图毕,而服務(wù)端處理不過來這么多數(shù)據(jù)己英,這種情況下,其實就可以使用流量整形來控制接收消息速度吴旋。
原理和使用
原理:攔截channelRead和write方法损肛,計算當前需要發(fā)送的消息大小厢破,對讀取和發(fā)送閾值進行判斷,如果達到了閾值治拿,則暫停讀取和發(fā)送消息摩泪,待下一個周期繼續(xù)處理,以實現(xiàn)在某個周期內(nèi)對消息讀寫速度進行控制劫谅。
使用:將流量整形ChannelHandler添加到業(yè)務(wù)解碼器之前见坑,
注意事項:
全局流量整形實例只需要創(chuàng)建一次
GlobalChannelTrafficShapingHandler 和 GlobalTrafficShapingHandler 是全局共享的,因此實例只需要創(chuàng)建一次捏检,添加到不同的ChannelPipeline即可荞驴,不要創(chuàng)建多個實例,否則流量整形將失效贯城。流量整形參數(shù)調(diào)整不要過于頻繁
消息發(fā)送保護機制
通過流量整形可以控制發(fā)送速度熊楼,但是它的控制原理是將待發(fā)送的消息封裝成Task放入消息隊列,等待執(zhí)行時間到達后繼續(xù)發(fā)送能犯,所以如果業(yè)務(wù)發(fā)送線程不判斷channle的可以狀態(tài)鲫骗,就可能會導(dǎo)致OOM問題。