在查看Kafka的方法調(diào)用時(shí)灌具,斷點(diǎn)調(diào)試與日志打印會(huì)顯得非常重滥搭,如果學(xué)會(huì)了用Arthas這個(gè)武器,會(huì)讓你探索Kafka的源碼變得相對(duì)容易兔毒。
場(chǎng)景一
我知道接下來(lái)漫贞,kafka一定會(huì)執(zhí)行到以下的方法: KafkaApis#handleProduceRequest,但是我想知道下一次具體的執(zhí)行分支
// kafka.server.KafkaApis#handleProduceRequest
def handleProduceRequest(request: RequestChannel.Request) {
val produceRequest = request.body[ProduceRequest]
val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
...
}
用trace命令來(lái)跟蹤內(nèi)部每一個(gè)執(zhí)行到的方法
[arthas@51687]$ trace kafka.server.KafkaApis handleProduceRequest
Press Q or Ctrl+C to abort.
Affect(class-cnt:1 , method-cnt:1) cost in 360 ms.
`---ts=2019-10-23 13:20:46;thread_name=kafka-request-handler-4;id=37;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@764c12b6
`---[3.661559ms] kafka.server.KafkaApis:handleProduceRequest()
+---[0.010396ms] scala.reflect.ClassTag$:apply() #367
+---[9.6E-4ms] scala.Predef$$eq$colon$eq$:tpEquals() #367
+---[5.53E-4ms] kafka.utils.NotNothing$:notNothingEvidence() #367
+---[5.49E-4ms] kafka.network.RequestChannel$Request:body() #367
+---[4.53E-4ms] kafka.network.RequestChannel$Request:header() #368
+---[0.008506ms] org.apache.kafka.common.requests.RequestHeader:toStruct() #368
+---[0.001567ms] org.apache.kafka.common.protocol.types.Struct:sizeOf() #368
+---[0.003086ms] kafka.network.RequestChannel$Request:sizeOfBodyInBytes() #368
+---[0.002294ms] org.apache.kafka.common.requests.ProduceRequest:hasTransactionalRecords() #370
+---[0.002079ms] org.apache.kafka.common.requests.ProduceRequest:hasIdempotentRecords() #379
+---[0.009205ms] scala.collection.mutable.Map$:apply() #384
+---[9.72E-4ms] scala.collection.mutable.Map$:apply() #385
+---[9.21E-4ms] scala.collection.mutable.Map$:apply() #386
+---[0.00206ms] org.apache.kafka.common.requests.ProduceRequest:partitionRecordsOrFail() #388
+---[7.84E-4ms] scala.collection.JavaConverters$:mapAsScalaMapConverter() #388
+---[0.001015ms] scala.collection.convert.Decorators$AsScala:asScala() #388
+---[0.003071ms] kafka.server.KafkaApis$$anonfun$handleProduceRequest$1:<init>() #388
+---[0.001735ms] scala.collection.TraversableLike:withFilter() #388
+---[0.005349ms] kafka.server.KafkaApis$$anonfun$handleProduceRequest$2:<init>() #388
+---[0.013075ms] scala.collection.generic.FilterMonadic:foreach() #388
+---[0.006955ms] scala.collection.mutable.Map:isEmpty() #454
+---[4.05E-4ms] kafka.network.RequestChannel$Request:header() #457
+---[5.25E-4ms] org.apache.kafka.common.requests.RequestHeader:clientId() #457
+---[0.024344ms] kafka.admin.AdminUtils$:AdminClientId() #457
+---[8.02E-4ms] java.lang.Object:equals() #457
+---[0.002155ms] org.apache.kafka.common.requests.ProduceRequest:timeout() #461
+---[0.009585ms] org.apache.kafka.common.requests.ProduceRequest:acks() #462
+---[0.00303ms] kafka.server.KafkaApis$$anonfun$12:<init>() #466
+---[0.002418ms] kafka.server.KafkaApis$$anonfun$13:<init>() #467
+---[min=5.63E-4ms,max=7.45E-4ms,total=0.001308ms,count=2] kafka.server.KafkaApis:replicaManager() #460
+---[0.0033ms] kafka.server.ReplicaManager:appendRecords$default$7() #460
+---[3.247819ms] kafka.server.ReplicaManager:appendRecords() #460
`---[0.004697ms] org.apache.kafka.common.requests.ProduceRequest:clearPartitionRecords() #471
場(chǎng)景二
我知道kafka一定會(huì)執(zhí)行到以下的方法: LogSegment#append育叁,但是我想知道它是怎么調(diào)用到這個(gè)方法的
[arthas@51687]$ stack kafka.log.LogSegment append
Press Q or Ctrl+C to abort.
Affect(class-cnt:1 , method-cnt:1) cost in 69 ms.
ts=2019-10-23 13:27:26;thread_name=kafka-request-handler-4;id=37;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@764c12b6
@kafka.log.Log$$anonfun$append$2.apply()
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:626)
at kafka.log.Log.maybeHandleIOException(Log.scala:1691)
at kafka.log.Log.append(Log.scala:626)
at kafka.log.Log.appendAsLeader(Log.scala:597)
at kafka.cluster.Partition$$anonfun$13.apply(Partition.scala:533)
at kafka.cluster.Partition$$anonfun$13.apply(Partition.scala:521)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:223)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:520)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:719)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:703)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:703)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:453)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:460)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
at java.lang.Thread.run(Thread.java:748)
場(chǎng)景三
查看現(xiàn)在kafka接受的都是些什么請(qǐng)求迅脐,也就是查看def handle(request: RequestChannel.Request) 的Request是什么類型的
[arthas@12587]$ watch kafka.server.KafkaApis handle params[0].header.apiKey
Press Q or Ctrl+C to abort.
Affect(class-cnt:1 , method-cnt:1) cost in 98 ms.
ts=2019-10-30 20:39:45; [cost=0.901428ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=0.947624ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=2.340937ms] result=@ApiKeys[METADATA]
ts=2019-10-30 20:39:45; [cost=1.204087ms] result=@ApiKeys[FIND_COORDINATOR]
ts=2019-10-30 20:39:45; [cost=0.402322ms] result=@ApiKeys[METADATA]
ts=2019-10-30 20:39:45; [cost=0.880728ms] result=@ApiKeys[FIND_COORDINATOR]
ts=2019-10-30 20:39:45; [cost=0.201719ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=0.116254ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=2.556618ms] result=@ApiKeys[JOIN_GROUP]
ts=2019-10-30 20:39:45; [cost=10.631961ms] result=@ApiKeys[SYNC_GROUP]
ts=2019-10-30 20:39:45; [cost=1.172244ms] result=@ApiKeys[OFFSET_FETCH]
ts=2019-10-30 20:39:45; [cost=0.508714ms] result=@ApiKeys[JOIN_GROUP]
ts=2019-10-30 20:39:45; [cost=5.21916ms] result=@ApiKeys[SYNC_GROUP]
ts=2019-10-30 20:39:45; [cost=0.269761ms] result=@ApiKeys[OFFSET_FETCH]
ts=2019-10-30 20:39:45; [cost=0.139089ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=0.235017ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=2.174569ms] result=@ApiKeys[FETCH]
ts=2019-10-30 20:39:45; [cost=1.251027ms] result=@ApiKeys[FETCH]
ts=2019-10-30 20:39:48; [cost=1.538105ms] result=@ApiKeys[OFFSET_COMMIT]
ts=2019-10-30 20:39:48; [cost=1.54736ms] result=@ApiKeys[OFFSET_COMMIT]
但是,如果想查看的是kafka.server.KafkaApis#authorize豪嗽,如果在arthas輸入以下內(nèi)容谴蔑,會(huì)報(bào)錯(cuò)豌骏,提示也很明顯,這個(gè)方法不存在
[arthas@12587]$ watch kafka.server.KafkaApis authorize params[0]
No class or method is affected, try:
1. sm CLASS_NAME METHOD_NAME to make sure the method you are tracing actually exists (it might be in your parent class).
2. reset CLASS_NAME and try again, your method body might be too large.
3. check arthas log: /Users/ericfei/logs/arthas/arthas.log
4. visit https://github.com/alibaba/arthas/issues/47 for more details.
按照提示树碱,可以用sm查看下authorize這個(gè)方法到底叫什么名字
[arthas@12587]$ sm kafka.server.KafkaApis
kafka.server.KafkaApis kafka$server$KafkaApis$$authorize(Lkafka/network/RequestChannel$Session;Lkafka/security/auth/Operation;Lkafka/security/auth/Resource;)Z
知道了該方法具體的名字肯适,就可以watch它了
[arthas@12587]$ watch kafka.server.KafkaApis kafka$server$KafkaApis$$authorize params[0]
Press Q or Ctrl+C to abort.
Affect(class-cnt:1 , method-cnt:1) cost in 61 ms.
ts=2019-10-30 20:46:01; [cost=0.187904ms] result=@Session[
principal=@KafkaPrincipal[User:ANONYMOUS],
clientAddress=@Inet4Address[/172.16.115.163],
sanitizedUser=@String[ANONYMOUS],
]
ts=2019-10-30 20:46:01; [cost=0.056613ms] result=@Session[
principal=@KafkaPrincipal[User:ANONYMOUS],
clientAddress=@Inet4Address[/172.16.115.163],
sanitizedUser=@String[ANONYMOUS],
]
ts=2019-10-30 20:46:01; [cost=0.038524ms] result=@Session[
principal=@KafkaPrincipal[User:ANONYMOUS],
clientAddress=@Inet4Address[/172.16.115.163],
sanitizedUser=@String[ANONYMOUS],
]
ts=2019-10-30 20:46:01; [cost=0.060588ms] result=@Session[
principal=@KafkaPrincipal[User:ANONYMOUS],
clientAddress=@Inet4Address[/172.16.115.163],
sanitizedUser=@String[ANONYMOUS],
]