note:本文是《用 Pulsar 開發(fā)多人在線小游戲》的第三篇,配套源碼和全部文檔參見我的 GitHub 倉庫 play-with-pulsar 以及我的文章列表逸尖。
Pulsar Function 允許你編寫函數(shù)對 topic 中的數(shù)據(jù)進行一些處理,函數(shù)的輸入就是一個或多個 topic 中的消息叽粹,函數(shù)的返回值可以發(fā)送到其他 topic 中。
官網(wǎng)的一張圖就能看明白了:
比方說,發(fā)送到 topicA
中的消息都是英文單詞班缎,我想把這些英文單詞都轉化成大寫并轉發(fā)到 topicB
中各聘,那么就可以寫一個 Pulsar function 做這個事情揣非。
Pulsar Function 還支持 Stateful Storage,簡單來說就是鍵值對的存儲服務躲因。
比如官網(wǎng)給了一個單詞計數(shù)器的例子:
這個 Pulsar Function 會從一個 topic 中讀取句子并切分成單詞早敬,然后統(tǒng)計每個單詞出現(xiàn)的頻率。
單詞頻率其實是以鍵值對的形式存儲在這個 Function 中的大脉,可以通過 admin API 來讀取鍵對應的值搞监,官網(wǎng)文檔:
https://pulsar.apache.org/docs/next/functions-quickstart/#start-stateful-functions
Pulsar Function 可以單獨部署成服務,也可以上傳到 broker 上镰矿,作為 broker 的一部分琐驴。不過目前社區(qū)的建議是部署單獨的 Function 集群。
目前 Pulsar 支持使用 Python、Go棍矛、Java 來開發(fā) Function安疗,API 文檔:
https://pulsar.apache.org/docs/next/functions-develop-api/
文檔給出的例子比較少,可以直接看 Pulsar Function examples够委,直接根據(jù)需求選擇合適的 Function 進行開發(fā)就行了荐类。
本文就以炸彈人游戲為例,利用 Pulsar Function 開發(fā)游戲房間的計分板功能茁帽。
在我們的炸彈人游戲中玉罐,玩家的死亡也會被抽象成事件發(fā)送到 topic 中:
type UserDeadEvent struct {
// 被炸死的玩家名
playerName string
// 殺手玩家名
killerName string
}
類似單詞計數(shù)器,我們這里也可以實現(xiàn)一個 Pulsar Function潘拨,專門過濾玩家死亡的 UserDeadEvent
事件吊输,然后統(tǒng)計 killerName
出現(xiàn)的次數(shù),就可以作為該玩家的分數(shù)了铁追。
當然季蚂,我們需要實時更新房間內玩家的分數(shù),所以每個游戲房間除了 event topic 和 map topic 之外琅束,我們還需要一個 score topic扭屁,讓 Pulsar Function 把分數(shù)更新事件輸出到 score topic,并且利用 Pulsar client 的 tableview 功能做一個比較好的展現(xiàn)涩禀。
那么現(xiàn)在需要實現(xiàn)的 Pulsar Function 有如下需求:
1料滥、因為玩家產(chǎn)生的事件都發(fā)到了格式為 {roomName}-event-topic
的 topic 中,所以函數(shù)應該接收所有這些 topic 的消息艾船。
2葵腹、讀取這些消息的 Type
字段,過濾出 UserDeadEvent
事件屿岂,并讀取 playerName
和 killerName
践宴,killerName
出現(xiàn)的次數(shù)就是該玩家獲得的分數(shù)。
3爷怀、還需要把玩家分數(shù)輸出到另一個格式為 {roomName}-score-topic
的 topic 中浴井。
下面開始開發(fā)。
先貼官網(wǎng)文檔:
https://pulsar.apache.org/docs/next/functions-develop-api/#use-sdk-for-javapythongo
首先需要設置 Pulsar Function 開發(fā)相關的 Maven 依賴:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>${pulsar.version}</version>
</dependency>
然后就可以開始開發(fā)了霉撵,完整的源碼在 function-code 目錄:
public class ScoreboardFunction implements Function<GenericJsonRecord, Void> {
@Override
public Void process(GenericJsonRecord input, Context context) {
String type = (String) input.getField("type");
if (type.equals("UserDeadEvent")) {
String player = (String) input.getField("name");
String killer = (String) input.getField("comment");
if (player.equals(killer)) {
// kill himself
return null;
}
// get the source topic of this message
Optional<String> inputTopic = context.getCurrentRecord().getTopicName();
if (inputTopic.isEmpty()) {
return null;
}
// calculate the corresponding topic to send score
Optional<String> outputTopic = changeEventTopicNameToScoreTopicName(inputTopic.get());
if (outputTopic.isEmpty()) {
return null;
}
// roomName-playerName as the stateful key /
// store the score in stateful function
String killerKey = parseRoomName(inputTopic.get()).get() + "-" + killer;
context.incrCounter(killerKey, 1);
// send the score messages to score topic
long score = context.getCounter(killerKey);
try {
// player name as the key, score as the value
context.newOutputMessage(outputTopic.get(), Schema.STRING)
.key(killer)
.value(score + "")
.send();
} catch (PulsarClientException e) {
// todo: ignore error for now
e.printStackTrace();
}
}
return null;
}
}
因為我們前文給 topic 中的消息設置了 JSON Schema,所以這里設置 topic 中的消息類型為 GenericJsonRecord
洪囤。
這段代碼的邏輯應該不難理解徒坡,input
就是發(fā)到 event topic 的消息,通過 Pulsar Function 的 context
可以拿到這個 event topic 的名字瘤缩,由于 event topic 名字包含游戲房間名喇完,所以只要修改 event topic 名稱后綴即可得到 score topic 的名字。
函數(shù)的主要工作是過濾出 UserDeadEvent
剥啤,讀取 killerName
锦溪〔桓考慮到不能把不同房間的擊殺事件混在一起,我把 {roomName}-{killerName}
作為 Function 的鍵刻诊,并遞增計數(shù)器記錄玩家的分數(shù)防楷,最后調用 context.newOutputMessage
把玩家的分數(shù)發(fā)送到房間對應的 score topic 中。
Function 的調試
可以參考這篇官網(wǎng)文檔则涯,用 localrun 模式在本地調試 Function:
https://pulsar.apache.org/docs/next/functions-debug-localrun/
localrun 模式相當于直接在本地起了一個 Function worker复局,能夠連接到 Pulsar,并運行我們剛才開發(fā)的 Function 代碼粟判。
完整的源碼在 function-code 目錄亿昏,注意我們要對 Function 進行正確的配置,比如 Function 類以及作為輸入的 topic 名稱等等:
String inputTopic = ".*-event-topic";
// enable regex support to subscribe multiple topics
HashMap<String, ConsumerConfig> inputSpecs = new HashMap<>();
ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(true).build();
inputSpecs.put(inputTopic, consumerConfig);
functionConfig.setInputSpecs(inputSpecs);
functionConfig.setClassName(ScoreboardFunction.class.getName());
配置完 functionConfig
后可以啟動一個本地的 Function worker:
LocalRunner localRunner = LocalRunner.builder()
.brokerServiceUrl("pulsar://localhost:6650")
.stateStorageServiceUrl("bk://localhost:4181")
.functionConfig(functionConfig)
.build();
localRunner.start(false);
其中 brokerServiceUrl
是 Pulsar broker 的連接地址档礁,stateStorageServiceUrl
是提供 stateStorage 的 bookkeeper 地址角钩,默認情況下在 4181 端口。
這樣呻澜,只要啟動 main 函數(shù)递礼,就會啟動 local runner,并加載我們剛開發(fā)的 Function易迹,把所有后綴為 -event-topic
的 topic 中的消息輸入給 Function宰衙。
計分板的開發(fā)
我們剛才開發(fā)的 Function 會把玩家名稱和該玩家獲得的分數(shù)作為一條消息的鍵和值發(fā)送到 {roomName}-score-topic
中,那么玩家客戶端如何獲取這些信息呢睹欲?這就要用到之前介紹的 tableView 功能了供炼。
可以在游戲客戶端代碼中看到 tableView 的使用:
tableView, err := client.CreateTableView(pulsar.TableViewOptions{
Topic: roomName + "-score-topic",
Schema: pulsar.NewStringSchema(nil),
SchemaValueType: reflect.TypeOf(""),
})
我們在游戲數(shù)據(jù)中維護一個名為 scores
的 lru 緩存,存儲最近的最多 5 名玩家的分數(shù)信息窘疮,同時利用 tableView 的 ForEachAndListen
方法更新 lru 緩存:
client.tableView.ForEachAndListen(func(playerName string, i interface{}) error {
score := *i.(*string)
g.scores.Add(playerName, score)
return nil
})
這樣袋哼,當玩家分數(shù)更新時,lru 緩存中的數(shù)據(jù)就會更新闸衫,我們只要把對應的分數(shù)數(shù)據(jù)顯示到游戲界面上即可涛贯。
更多高質量干貨文章,請關注我的微信公眾號 labuladong 和算法博客 labuladong 的算法秘籍蔚出。