note:本文是《用 Pulsar 開發(fā)多人在線小游戲》的第三篇,配套源碼和全部文檔參見我的 GitHub 倉庫 play-with-pulsar 以及我的文章列表逸尖。
Pulsar Function 允許你編寫函數(shù)對 topic 中的數(shù)據(jù)進行一些處理,函數(shù)的輸入就是一個或多個 topic 中的消息叽粹,函數(shù)的返回值可以發(fā)送到其他 topic 中。
比方說,發(fā)送到 topicA
中的消息都是英文單詞班缎,我想把這些英文單詞都轉化成大寫并轉發(fā)到 topicB
中各聘,那么就可以寫一個 Pulsar function 做這個事情揣非。
Pulsar Function 還支持 Stateful Storage,簡單來說就是鍵值對的存儲服務躲因。
這個 Pulsar Function 會從一個 topic 中讀取句子并切分成單詞早敬,然后統(tǒng)計每個單詞出現(xiàn)的頻率。
單詞頻率其實是以鍵值對的形式存儲在這個 Function 中的大脉,可以通過 admin API 來讀取鍵對應的值搞监,官網(wǎng)文檔:
Pulsar Function 可以單獨部署成服務,也可以上傳到 broker 上镰矿,作為 broker 的一部分琐驴。不過目前社區(qū)的建議是部署單獨的 Function 集群。
目前 Pulsar 支持使用 Python、Go棍矛、Java 來開發(fā) Function安疗,API 文檔:
文檔給出的例子比較少,可以直接看 Pulsar Function examples够委,直接根據(jù)需求選擇合適的 Function 進行開發(fā)就行了荐类。
本文就以炸彈人游戲為例,利用 Pulsar Function 開發(fā)游戲房間的計分板功能茁帽。
在我們的炸彈人游戲中玉罐,玩家的死亡也會被抽象成事件發(fā)送到 topic 中:
type UserDeadEvent struct {
// 被炸死的玩家名
playerName string
// 殺手玩家名
killerName string
類似單詞計數(shù)器,我們這里也可以實現(xiàn)一個 Pulsar Function潘拨,專門過濾玩家死亡的 UserDeadEvent
事件吊输,然后統(tǒng)計 killerName
當然季蚂,我們需要實時更新房間內玩家的分數(shù),所以每個游戲房間除了 event topic 和 map topic 之外琅束,我們還需要一個 score topic扭屁,讓 Pulsar Function 把分數(shù)更新事件輸出到 score topic,并且利用 Pulsar client 的 tableview 功能做一個比較好的展現(xiàn)涩禀。
那么現(xiàn)在需要實現(xiàn)的 Pulsar Function 有如下需求:
1料滥、因為玩家產(chǎn)生的事件都發(fā)到了格式為 {roomName}-event-topic
的 topic 中,所以函數(shù)應該接收所有這些 topic 的消息艾船。
2葵腹、讀取這些消息的 Type
字段,過濾出 UserDeadEvent
事件屿岂,并讀取 playerName
和 killerName
3爷怀、還需要把玩家分數(shù)輸出到另一個格式為 {roomName}-score-topic
的 topic 中浴井。
首先需要設置 Pulsar Function 開發(fā)相關的 Maven 依賴:
然后就可以開始開發(fā)了霉撵,完整的源碼在 function-code 目錄:
public class ScoreboardFunction implements Function<GenericJsonRecord, Void> {
public Void process(GenericJsonRecord input, Context context) {
String type = (String) input.getField("type");
if (type.equals("UserDeadEvent")) {
String player = (String) input.getField("name");
String killer = (String) input.getField("comment");
if (player.equals(killer)) {
// kill himself
return null;
// get the source topic of this message
Optional<String> inputTopic = context.getCurrentRecord().getTopicName();
if (inputTopic.isEmpty()) {
return null;
// calculate the corresponding topic to send score
Optional<String> outputTopic = changeEventTopicNameToScoreTopicName(inputTopic.get());
if (outputTopic.isEmpty()) {
return null;
// roomName-playerName as the stateful key /
// store the score in stateful function
String killerKey = parseRoomName(inputTopic.get()).get() + "-" + killer;
context.incrCounter(killerKey, 1);
// send the score messages to score topic
long score = context.getCounter(killerKey);
try {
// player name as the key, score as the value
context.newOutputMessage(outputTopic.get(), Schema.STRING)
.value(score + "")
} catch (PulsarClientException e) {
// todo: ignore error for now
return null;
因為我們前文給 topic 中的消息設置了 JSON Schema,所以這里設置 topic 中的消息類型為 GenericJsonRecord
就是發(fā)到 event topic 的消息,通過 Pulsar Function 的 context
可以拿到這個 event topic 的名字瘤缩,由于 event topic 名字包含游戲房間名喇完,所以只要修改 event topic 名稱后綴即可得到 score topic 的名字。
函數(shù)的主要工作是過濾出 UserDeadEvent
剥啤,讀取 killerName
锦溪〔桓考慮到不能把不同房間的擊殺事件混在一起,我把 {roomName}-{killerName}
作為 Function 的鍵刻诊,并遞增計數(shù)器記錄玩家的分數(shù)防楷,最后調用 context.newOutputMessage
把玩家的分數(shù)發(fā)送到房間對應的 score topic 中。
Function 的調試
可以參考這篇官網(wǎng)文檔则涯,用 localrun 模式在本地調試 Function:
localrun 模式相當于直接在本地起了一個 Function worker复局,能夠連接到 Pulsar,并運行我們剛才開發(fā)的 Function 代碼粟判。
完整的源碼在 function-code 目錄亿昏,注意我們要對 Function 進行正確的配置,比如 Function 類以及作為輸入的 topic 名稱等等:
String inputTopic = ".*-event-topic";
// enable regex support to subscribe multiple topics
HashMap<String, ConsumerConfig> inputSpecs = new HashMap<>();
ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(true).build();
inputSpecs.put(inputTopic, consumerConfig);
配置完 functionConfig
后可以啟動一個本地的 Function worker:
LocalRunner localRunner = LocalRunner.builder()
其中 brokerServiceUrl
是 Pulsar broker 的連接地址档礁,stateStorageServiceUrl
是提供 stateStorage 的 bookkeeper 地址角钩,默認情況下在 4181 端口。
這樣呻澜,只要啟動 main 函數(shù)递礼,就會啟動 local runner,并加載我們剛開發(fā)的 Function易迹,把所有后綴為 -event-topic
的 topic 中的消息輸入給 Function宰衙。
我們剛才開發(fā)的 Function 會把玩家名稱和該玩家獲得的分數(shù)作為一條消息的鍵和值發(fā)送到 {roomName}-score-topic
中,那么玩家客戶端如何獲取這些信息呢睹欲?這就要用到之前介紹的 tableView 功能了供炼。
可以在游戲客戶端代碼中看到 tableView 的使用:
tableView, err := client.CreateTableView(pulsar.TableViewOptions{
Topic: roomName + "-score-topic",
Schema: pulsar.NewStringSchema(nil),
SchemaValueType: reflect.TypeOf(""),
我們在游戲數(shù)據(jù)中維護一個名為 scores
的 lru 緩存,存儲最近的最多 5 名玩家的分數(shù)信息窘疮,同時利用 tableView 的 ForEachAndListen
方法更新 lru 緩存:
client.tableView.ForEachAndListen(func(playerName string, i interface{}) error {
score := *i.(*string)
g.scores.Add(playerName, score)
return nil
這樣袋哼,當玩家分數(shù)更新時,lru 緩存中的數(shù)據(jù)就會更新闸衫,我們只要把對應的分數(shù)數(shù)據(jù)顯示到游戲界面上即可涛贯。
更多高質量干貨文章,請關注我的微信公眾號 labuladong 和算法博客 labuladong 的算法秘籍蔚出。