RocketMQ 是一款開源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù)俗他,提供低延時(shí)脖捻、高可靠的消息發(fā)布與訂閱服務(wù)。
這篇文章兆衅,筆者整理了 RocketMQ 源碼中創(chuàng)建線程的幾點(diǎn)技巧地沮,希望大家讀完之后,能夠有所收獲羡亩。
1 創(chuàng)建單線程
首先我們先溫習(xí)下常用的創(chuàng)建單線程的兩種方式:
- 實(shí)現(xiàn) Runnable 接口
- 繼承 Thread 類
▍一摩疑、實(shí)現(xiàn) Runnable 接口
圖中,MyRunnable 類實(shí)現(xiàn)了 Runnable 接口的 run 方法畏铆,run 方法中定義具體的任務(wù)代碼或處理邏輯雷袋,而Runnable 對(duì)象是作為線程構(gòu)造函數(shù)的參數(shù)。
▍二及志、 繼承 Thread 類
線程實(shí)現(xiàn)類直接繼承 Thread 片排,本質(zhì)上也是實(shí)現(xiàn) Runnable 接口的 run 方法。
2 單線程抽象類
創(chuàng)建單線程的兩種方式都很簡(jiǎn)單速侈,但每次創(chuàng)建線程代碼顯得有點(diǎn)冗余率寡,于是 RocketMQ 里實(shí)現(xiàn)了一個(gè)抽象類 ServiceThread 。
我們可以看到抽象類中包含了如下核心方法:
- 定義線程名倚搬;
- 啟動(dòng)線程冶共;
- 關(guān)閉線程。
下圖展示了 RocketMQ 眾多的單線程實(shí)現(xiàn)類每界。
實(shí)現(xiàn)類的編程模版類似 :
我們僅僅需要繼承抽象類捅僵,并實(shí)現(xiàn) getServiceName 和 run 方法即可。啟動(dòng)的時(shí)候眨层,調(diào)用 start 方法 庙楚, 關(guān)閉的時(shí)候調(diào)用 shutdown 方法。
3 線程池原理
線程池是一種基于池化思想管理線程的工具趴樱,線程池維護(hù)著多個(gè)線程馒闷,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)酪捡。這避免了在處理短時(shí)間任務(wù)時(shí)創(chuàng)建與銷毀線程的代價(jià)。線程池不僅能夠保證內(nèi)核的充分利用纳账,還能防止過分調(diào)度逛薇。
JDK中提供的 ThreadPoolExecutor 類,是我們最常使用的線程池類疏虫。
參數(shù)名 | 作用 |
---|---|
corePoolSize | 隊(duì)列沒滿時(shí)永罚,線程最大并發(fā)數(shù) |
maximumPoolSizes | 隊(duì)列滿后線程能夠達(dá)到的最大并發(fā)數(shù) |
keepAliveTime | 空閑線程過多久被回收的時(shí)間限制 |
unit | keepAliveTime 的時(shí)間單位 |
workQueue | 阻塞的隊(duì)列類型 |
threadPoolFactory | 改變線程的名稱、線程組卧秘、優(yōu)先級(jí)呢袱、守護(hù)進(jìn)程狀態(tài) |
RejectedExecutionHandler | 超出 maximumPoolSizes + workQueue 時(shí),任務(wù)會(huì)交給RejectedExecutionHandler來處理 |
任務(wù)的調(diào)度通過執(zhí)行 execute方法完成斯议,方法的核心流程如下:
- 如果 workerCount < corePoolSize产捞,創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)醇锚。
- 如果 workerCount >= corePoolSize哼御,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到該阻塞隊(duì)列中焊唬。
- 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize恋昼,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)赶促。
- 如果 workerCount >= maximumPoolSize液肌,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。
4 線程池封裝
在 RocketMQ 里 鸥滨,網(wǎng)絡(luò)請(qǐng)求都會(huì)攜帶命令編碼嗦哆,每種命令映射對(duì)應(yīng)的處理器,而處理器又會(huì)注冊(cè)對(duì)應(yīng)的線程池婿滓。
當(dāng)服務(wù)端 Broker 接收到發(fā)送消息命令時(shí)老速,都會(huì)有單獨(dú)的線程池 sendMessageExecutor 來處理這種命令請(qǐng)求。
基于 ThreadPoolExecutor 做了一個(gè)簡(jiǎn)單的封裝 凸主,BrokerFixedThreadPoolExecutor 構(gòu)造函數(shù)包含六個(gè)核心參數(shù):
- 核心線程數(shù)和最大線程數(shù)相同 橘券,數(shù)量是:cpu核數(shù)和4比較后的最小值;
- 空閑線程的回收的時(shí)間限制卿吐,默認(rèn)1分鐘旁舰;
- 發(fā)送消息隊(duì)列,有界隊(duì)列嗡官,默認(rèn)10000箭窜;
- 線程工廠 ThreadFactoryImpl ,定義了線程名前綴:SendMessageThread_ 衍腥。
RocketMQ 實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的線程工廠:ThreadFactoryImpl磺樱,線程工廠可以定義線程名稱芥丧,以及是否是守護(hù)線程 。
開源項(xiàng)目 Cobar 坊罢,Xmemcached续担,Metamorphosis 中都有類似線程工廠的實(shí)現(xiàn) 。
5 線程名很重要
線程名很重要活孩,線程名很重要物遇,線程名很重要 ,重要的事情說三遍憾儒。
我們看到 RocketMQ 中询兴,無論是單線程抽象類還是多線程的封裝都會(huì)配置線程名 ,因?yàn)橥ㄟ^線程名起趾,非常容易定位問題诗舰,從而大大提升解決問題的效率。
定位的媒介常見有兩種:日志文件和堆棧記錄训裆。
▍一眶根、日志文件
經(jīng)常處理業(yè)務(wù)問題的同學(xué),一定都經(jīng)常與日志打交道边琉。
- 查看 ERROR 日志属百,追溯到執(zhí)行線程, 要是線程池隔離做的好变姨,基本可以判斷出哪種業(yè)務(wù)場(chǎng)景出了問題族扰;
- 通過查看線程打印的日志,推斷線程調(diào)度是否正常定欧,比如有的定時(shí)任務(wù)線程打印了開始渔呵,沒有打印結(jié)束,推論當(dāng)前線程可能已經(jīng)掛掉或者阻塞砍鸠。
▍二扩氢、堆棧記錄
jstack 是 java 虛擬機(jī)自帶的一種堆棧跟蹤工具 ,主要用來查看 Java 線程的調(diào)用堆棧睦番,線程快照包含當(dāng)前 java 虛擬機(jī)內(nèi)每一條線程正在執(zhí)行的方法堆棧的集合类茂,可以用來分析線程問題。
jstack -l 進(jìn)程pid
筆者查看線程堆棧托嚣,一般關(guān)注如下幾點(diǎn):
- 當(dāng)前 jvm 進(jìn)程中的線程數(shù)量和線程分類是否在預(yù)期的范圍內(nèi)巩检;
- 系統(tǒng)接口超時(shí)或者定時(shí)任務(wù)停止的異常場(chǎng)景下 ,分析堆棧中是否有鎖未釋放示启,或者線程一直等待網(wǎng)絡(luò)通訊響應(yīng)兢哭;
- 分析 jvm 進(jìn)程中哪個(gè)線程占用的 CPU 最高。
6 總結(jié)
本文是RocketMQ 系列文章的開篇夫嗓,和朋友們簡(jiǎn)單聊聊 RocketMQ 源碼里創(chuàng)建線程的技巧迟螺。
-
單線程抽象類 ServiceThread
使用者只需要實(shí)現(xiàn)業(yè)務(wù)邏輯以及定義線程名即可 冲秽,不需要寫冗余的代碼。
-
線程池封裝
適當(dāng)封裝矩父,定義線程工廠锉桑,并合理配置線程池參數(shù)。
-
線程名很重要
文件日志窍株,堆棧記錄配合線程名能大大提升解決問題的效率民轴。
RocketMQ 的多線程編程技巧很多,比如線程通訊球订,并發(fā)控制后裸,線程模型等等,后續(xù)的文章會(huì)一一為大家展現(xiàn)冒滩。