Apache Pulsar 之 Functions Debug
在之前的文章 Java Functions楞陷、Python Functions吟榴、Go Functions 中塘幅,向大家詳細介紹了如何基于自己的場景快速編寫并部署 Pulsar Functions骤竹。這篇文章蛾派,會介紹如何利用 Pulsar 提供的 CLI 對 Functions 進行 Debug俄认。
使用 LogTopic
Pulsar Functions 在設計之初个少,就考慮到如何對 Pulsar Functions 進行 Debug。如下圖所示眯杏,Pulsar Functions 允許用戶將自己在 Functions 中定義好的日志信息輸出到指定的 LogTopic 中夜焦,如果有遇到相關的問題需要從日志信息中獲取,用戶可以編寫 consumer 從指定的 LogTopic 中消費消息岂贩,即可查看相關的日志信息茫经。
具體使用如下:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
public class LoggingFunction implements Function<String, Void> {
@Override
public void apply(String input, Context context) {
Logger LOG = context.getLogger();
String messageId = new String(context.getMessageId());
if (input.contains("danger")) {
LOG.warn("A warning was received in message {}", messageId);
} else {
LOG.info("Message {} received\nContent: {}", messageId, input);
}
return null;
}
}
如上述代碼所示,用戶可以通過 context.getLogger()
獲取 logger 對象并將其賦值給 slf4j
的 LOG
字段萎津,這樣用戶就可以使用 LOG
字段在 Pulsar Functions 中定義自己想要的日志信息卸伞。當你有日志信息需要輸出時,在啟動過程中锉屈,需要告訴 Pulsar Functions 你期望將日志信息輸出到哪一個 topic 中荤傲,具體使用如下:
$ bin/pulsar-admin functions create \
--log-topic persistent://public/default/logging-function-logs \
# Other function configs
Functions CLI
除了使用 LogTopic 通過自定義日志信息的方式來追蹤 Functions 的問題之外,用戶還以通過使用 Functions CLI 對 Pulsar Functions 進行相關的 Debug 工作部念。
關于 Pulsar Functions 相關的 CLI弃酌,Pulsar 將其集成到 pulsar-admin
下,你可以使用 ./bin/pulsar-admin
和 Functions
告訴 Pulsar儡炼,你要執(zhí)行和 Pulsar Functions 相關的操作妓湘。為了方便用戶定位問題,Pulsar Functions 提供了如下幾種 CLI:
- get
- stats
- trigger
- list
- status
get
使用 get
命令可以獲取當前 Pulsar Functions 的詳細信息乌询,它提供了如下參數(shù)列表供用戶使用:
- FQFN
- name
- tenant
- namespace
需要說明的是榜贴,F(xiàn)QFN(Fully Qualified Functions Name)是由 tenant
、namespace
和 name
這三個字段組成妹田,它可以唯一標識一個 Pulsar Functions唬党,用戶在使用時,可以使用 FQFN 或者同時指定 tenant
鬼佣、namespace
和 name
這三個字段驶拱。示例如下:
$ ./bin/pulsar-admin functions get \
--tenant public \
--namespace default \
--name ExclamationFunctio6
執(zhí)行如上命令之后,輸出如下:
{
"tenant": "public",
"namespace": "default",
"name": "ExclamationFunctio6",
"className": "org.example.test.ExclamationFunction",
"inputSpecs": {
"persistent://public/default/my-topic-1": {
"isRegexPattern": false
}
},
"output": "persistent://public/default/test-1",
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"userConfig": {},
"runtime": "JAVA",
"autoAck": true,
"parallelism": 1
}
通過 get
命令晶衷,你可以獲取到當前 Pulsar Functions 具體的輸入蓝纲、輸出,運行的 rutime 是什么晌纫,當前 Functions 中有多少個 instance 在運行等信息税迷。
list
list
命令可以獲取當前 namespace 下有哪些 Functions 在運行,會返回運行的 Functions 的 name 列表锹漱,它提供了如下參數(shù)列表供用戶使用:
- tenant
- namespace
參考示例如下:
$ ./bin/pulsar-admin functions list \
--tenant public \
--namespace default
執(zhí)行如上命令之后箭养,輸出如下:
ExclamationFunctio1
ExclamationFunctio2
ExclamationFunctio3
上述輸出說明,在 default 這個 namespace 下哥牍,運行了三個 Pulsar Functions毕泌。
trigger
trigger
命令會模擬 Functions 執(zhí)行的過程喝检,可以用來驗證當前 Functions 運行的整個鏈路是否正常,它提供了如下參數(shù)列表供用戶使用:
- FQFN
- tenant
- namespace
- name
- topic
- trigger-file
- trigger-value
參考示例如下:
$ ./bin/pulsar-admin functions trigger \
--tenant public \
--namespace default \
--name ExclamationFunctio6 \
--topic persistent://public/default/my-topic-1 \
--trigger-value "hello pulsar functions"
執(zhí)行成功之后會輸出 This is my function!
注意:在使用 --topic
指定具體的 topic 時撼泛,需要使用 topic 的全稱蛇耀,否則會出現(xiàn)如下錯誤:
Function in trigger function has unidentified topic
Reason: Function in trigger function has unidentified topic
status
使用 status
命令可以查看當前 Functions 運行的狀態(tài)信息,它提供了如下參數(shù)列表供用戶使用:
- FQFN
- tenant
- namespace
- name
- instance-id
參考示例如下:
$ ./bin/pulsar-admin functions status \
--tenant public \
--namespace default \
--name ExclamationFunctio6 \
執(zhí)行上述命令后坎弯,輸出如下:
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 1,
"numSuccessfullyProcessed" : 1,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.8385,
"lastInvocationTime" : 1557734137987,
"workerId" : "c-standalone-fw-23ccc88ef29b-8080"
}
} ]
}
可以看到在 status
命令中,你可以查看到當前 Functions 是否處于運行狀態(tài)译暂,總共接收了多少條消息抠忘,成功處理了多少條消息,系統(tǒng)發(fā)生過多少次錯誤外永,平均延遲為多少等崎脉。由于一個 Functions 可以運行多個 instance,所以可以通過 --instance-id
來具體查看某一個 instance 的詳細信息伯顶。在上述示例中囚灼,在 ExclamationFunctio6
的 Functions 下只運行了一個 instance。
stats
stats
命令主要用來獲取當前 Functions 的統(tǒng)計信息祭衩,它提供了如下參數(shù)列表供用戶使用:
- FQFN
- tenant
- namespace
- name
- instance-id
參數(shù)列表的含義與 status
命令一致灶体,參考示例如下:
$ ./bin/pulsar-admin functions stats \
--tenant public \
--namespace default \
--name ExclamationFunctio6 \
執(zhí)行上述命令后,輸出如下:
{
"receivedTotal" : 1,
"processedSuccessfullyTotal" : 1,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : 0.8385,
"1min" : {
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null
},
"lastInvocation" : 1557734137987,
"instances" : [ {
"instanceId" : 0,
"metrics" : {
"receivedTotal" : 1,
"processedSuccessfullyTotal" : 1,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : 0.8385,
"1min" : {
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null
},
"lastInvocation" : 1557734137987,
"userMetrics" : { }
}
} ]
}
運行 Pulsar Functions 出現(xiàn)問題時掐暮,你可以結合上述 Debug 命令和自定義的日志信息蝎抽,對 Pulsar Functions 的問題進行很好的定位和分析。