rocketmq采用netty作為底層通信框架,其中具體技術(shù)細(xì)節(jié)如下:
1、通信協(xié)議
rocketmq通信協(xié)議采用netty作為底層通信框架弛槐,協(xié)議格式采用自定義方式實現(xiàn)枫虏。具體協(xié)議如下:
(4個字節(jié)的數(shù)據(jù)包長度與4個字節(jié)的頭長度,這個包長與頭長怎么會一樣亮航?感覺可以再優(yōu)化下)
2、通信加密
rocketmq采用netty鏈接時可以使用netty 的sslContext方式加密傳輸,不過需要加相關(guān)啟動參數(shù)上(-Dkey=value)睡毒,啟動參數(shù)如下(省略-D):
tls.server.mode=disabled (不加密傳輸) /?permissive (默認(rèn)) /?enforcing? ?(當(dāng)然了,如果沒有檢測到證書路徑也不會選擇加密傳輸)
tls.server.certPath=xxx
tls.server.keyPath=xxx
tls.server.trustCertPath=xxx
以上三個均是證書路徑冗栗,不過是服務(wù)端的吕嘀,客戶端也有,客戶端如下:
tls.client.certPath=xxx
tls.client.keyPath=xxx
tls.client.trustCertPath=xxx
3贞瞒、rocketmq-netty默認(rèn)線程數(shù)量
rocketmq采用主從線程模型
thread-acceptor : 1
thread-boss : 3
thread-worker : 8
4偶房、rocketmq可配參數(shù)
1)netty-server-config參數(shù)
listenPort=8888
serverWorkerThreads=8 (這是worker線程)
serverCallbackExecutorThreads=0
serverSelectorThreads=3 (這是boss線程)
serverOnewaySemaphoreValue=256
serverAsyncSemaphoreValue=64
serverChannelMaxIdleTimeSeconds=120(默認(rèn)空閑2分鐘斷掉)
serverSocketSndBufSize=65535
serverSocketRcvBufSize=65535
serverPooledByteBufAllocatorEnable=true(是否使用內(nèi)存池,默認(rèn)使用)
useEpollNativeSelector=false(是否采用netty自己實現(xiàn)的EpollLoopGroup)
2)? netty-client-config參數(shù)
clientWorkerThreads =4
clientCallbackExecutorThreads=核心數(shù)量
clientOnewaySemaphoreValue = 65535
clientAsyncSemaphoreValue = 65535
connectTimeoutMillis =3000
channelNotActiveInterval=1000 *60
clientChannelMaxIdleTimeSeconds=120
clientSocketSndBufSize = 65535
clientSocketRcvBufSize = 65535
clientPooledByteBufAllocatorEnable=false
clientCloseSocketIfTimeout=false
useTLS=false
3)nameserver-config參數(shù)
rocketmq.home.dir=xxx(這是啟動參數(shù)军浆,not properties)
user.home=xxx(啟動參數(shù)棕洋,存放kvConfig.json、namesrv.properties)
productEnvName=center(默認(rèn)生產(chǎn)環(huán)境名稱)
orderMessageEnable=false(有序消息可用性)
4)? broker-config參數(shù)
aclEnable=false
adminBrokerThreadPoolNums=16
autoCreateSubscriptionGroup=true
autoCreateTopicEnable=true
brokerClusterName=DefaultCluster
brokerFastFailureEnable=true
brokerId=0
brokerIP1=localhost
brokerIP2=localhost
brokerName=localhost
brokerPermission=4 | 2 (read | write)
brokerTopicEnable=true
clientManagerThreadPoolQueueCapacity=1000000
clientManageThreadPoolNums=32
clusterTopicEnable=true
commercialBaseCount=1
commercialBigCount=1
commercialEnable=true
commercialTimerCount=1
commercialTransCount=1
compressedRegister=false
consumerFallbehindThreshold=1024L *1024 *1024 *16
consumerManagerThreadPoolQueueCapacity=1000000
consumerManageThreadPoolNums=32
defaultTopicQueueNums=8
disableConsumeIfConsumerReadSlowly=false
enableCalcFilterBitMap=false
enablePropertyFilter=false
endTransactionPoolQueueCapacity=100000
endTransactionThreadPoolNums=8 + 核心數(shù)量 *2
expectConsumerNumUseFilter=32
fetchNamesrvAddrByAddressServer=false
filterDataCleanTimeSpan=24 *3600 *1000
filterServerNums=0
filterSupportRetry=false
flushConsumerOffsetHistoryInterval=1000 *60
flushConsumerOffsetInterval=1000 *5
forceRegister=true
heartbeatThreadPoolNums=Math.min(32, 核心數(shù)量)
heartbeatThreadPoolQueueCapacity=50000
highSpeedMode=false
longPollingEnable=true
maxDelayTime=40
maxErrorRateOfBloomFilter=20
messageStorePlugIn=""
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
namesrvAddr=localhost:8080;localhost:8080
notifyConsumerIdsChangedEnable=true
pullMessageThreadPoolNums=16 + 核心數(shù)量 *2
pullThreadPoolQueueCapacity=100000
queryMessageThreadPoolNums=8 + 核心數(shù)量
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
registerBrokerTimeoutMills=6000
registerNameServerPeriod=1000 *30
rejectTransactionMessage=false
rocketmqHome=你的環(huán)境變量-ROCKETMQ_HOME
sendMessageThreadPoolNums=1
sendThreadPoolQueueCapacity=10000
shortPollingTimeMills=1000
slaveReadEnable=false
startAcceptSendRequestTimeStamp=0
traceOn=true
traceTopicEnable=false
transactionCheckInterval=60 *1000
transactionCheckMax=15
transactionTimeOut=6 *1000
transferMsgByHeap=true
waitTimeMillsInHeartbeatQueue=31 *1000
waitTimeMillsInPullQueue=5 *1000
waitTimeMillsInSendQueue=200
waitTimeMillsInTransactionQueue=3 *1000
5) message-store-config
accessMessageInMemoryMaxRatio=40
bitMapLengthConsumeQueueExt=64
brokerRole=ASYNC_MASTER
checkCRCOnRecover=true
cleanFileForciblyEnable=true
cleanResourceInterval=10000
commitCommitLogLeastPages=4
commitCommitLogThoroughInterval=200
commitIntervalCommitLog=200
debugLockEnable=false
defaultQueryMaxNum=32
deleteCommitLogFilesInterval=100
deleteConsumeQueueFilesInterval=100
deleteWhen="04" // When to delete,default is at 4 am
destroyMapedFileIntervalForcibly=1000 *120
diskFallRecorded=true
diskMaxUsedSpaceRatio=75
duplicationEnable=false
enableConsumeQueueExt=false
fastFailIfNoBufferInStorePool=false
fileReservedTime=72
flushCommitLogLeastPages=4
flushCommitLogThoroughInterval=1000 *10
flushCommitLogTimed=false
flushConsumeQueueLeastPages=2
flushConsumeQueueThoroughInterval=1000 *60
flushDelayOffsetInterval=1000 *10
flushDiskType=ASYNC_FLUSH
flushIntervalCommitLog=500
flushIntervalConsumeQueue=1000
flushLeastPagesWhenWarmMapedFile=1024 /4 *16
haHousekeepingInterval=1000 *20
haListenPort=10912
haSendHeartbeatInterval=1000 *5
haSlaveFallbehindMax=1024 *1024 *256
haTransferBatchSize=1024 *32
mapedFileSizeCommitLog=1024 *1024 *1024
mappedFileSizeConsumeQueueExt=48 *1024 *1024
maxHashSlotNum=5000000
maxIndexNum=5000000 *4
maxMessageSize=1024 *1024 *4
maxMsgsNumBatch=64
maxTransferBytesOnMessageInDisk=1024 *64
maxTransferBytesOnMessageInMemory=1024 *256
maxTransferCountOnMessageInDisk=8
maxTransferCountOnMessageInMemory=32
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
messageIndexEnable=true
messageIndexSafe=false
offsetCheckInSlave=false
osPageCacheBusyTimeOutMills=1000
putMsgIndexHightWater=600000
redeleteHangedFileInterval=1000 *120
storePathCommitLog=...
storePathRootDir=...
syncFlushTimeout=1000 *5
transientStorePoolEnable=false
transientStorePoolSize=5
useReentrantLockWhenPutMessage=false
warmMapedFileEnable=false
5 特殊參數(shù)
broker可以動態(tài)調(diào)整nameserver地址乒融,頻率是每2分鐘以http的請求方式獲取nameserver地址然后更新掰盘,這樣可以增加系統(tǒng)高可用。熱備broker鏈接nameserver需要啟動參數(shù)赞季,非配置文件愧捕,啟動參數(shù)如下:
-Drocketmq.namesrv.domain=jmenv.tbsite.net
-Drocketmq.namesrv.domain.subgroup=nsaddr
默認(rèn) jmenv.tbsite.net:8080/rocketmq/nsaddr