用 Pulsar 開發(fā)多人小游戲(六):用 Pulsar Function 制作房間計分板

note:本文是《用 Pulsar 開發(fā)多人在線小游戲》的第三篇,配套源碼和全部文檔參見我的 GitHub 倉庫 play-with-pulsar 以及我的文章列表逸尖。

Pulsar Function 允許你編寫函數(shù)對 topic 中的數(shù)據(jù)進行一些處理,函數(shù)的輸入就是一個或多個 topic 中的消息叽粹,函數(shù)的返回值可以發(fā)送到其他 topic 中。

官網(wǎng)的一張圖就能看明白了:

比方說,發(fā)送到 topicA 中的消息都是英文單詞班缎,我想把這些英文單詞都轉化成大寫并轉發(fā)到 topicB 中各聘,那么就可以寫一個 Pulsar function 做這個事情揣非。

Pulsar Function 還支持 Stateful Storage,簡單來說就是鍵值對的存儲服務躲因。

比如官網(wǎng)給了一個單詞計數(shù)器的例子:

這個 Pulsar Function 會從一個 topic 中讀取句子并切分成單詞早敬,然后統(tǒng)計每個單詞出現(xiàn)的頻率。

單詞頻率其實是以鍵值對的形式存儲在這個 Function 中的大脉,可以通過 admin API 來讀取鍵對應的值搞监,官網(wǎng)文檔:

https://pulsar.apache.org/docs/next/functions-quickstart/#start-stateful-functions

Pulsar Function 可以單獨部署成服務,也可以上傳到 broker 上镰矿,作為 broker 的一部分琐驴。不過目前社區(qū)的建議是部署單獨的 Function 集群。

目前 Pulsar 支持使用 Python、Go棍矛、Java 來開發(fā) Function安疗,API 文檔:

https://pulsar.apache.org/docs/next/functions-develop-api/

文檔給出的例子比較少,可以直接看 Pulsar Function examples够委,直接根據(jù)需求選擇合適的 Function 進行開發(fā)就行了荐类。

本文就以炸彈人游戲為例,利用 Pulsar Function 開發(fā)游戲房間的計分板功能茁帽。

在我們的炸彈人游戲中玉罐,玩家的死亡也會被抽象成事件發(fā)送到 topic 中:

type UserDeadEvent struct {
    // 被炸死的玩家名
    playerName string
    // 殺手玩家名
    killerName string
}

類似單詞計數(shù)器,我們這里也可以實現(xiàn)一個 Pulsar Function潘拨,專門過濾玩家死亡的 UserDeadEvent 事件吊输,然后統(tǒng)計 killerName 出現(xiàn)的次數(shù),就可以作為該玩家的分數(shù)了铁追。

當然季蚂,我們需要實時更新房間內玩家的分數(shù),所以每個游戲房間除了 event topic 和 map topic 之外琅束,我們還需要一個 score topic扭屁,讓 Pulsar Function 把分數(shù)更新事件輸出到 score topic,并且利用 Pulsar client 的 tableview 功能做一個比較好的展現(xiàn)涩禀。

那么現(xiàn)在需要實現(xiàn)的 Pulsar Function 有如下需求:

1料滥、因為玩家產(chǎn)生的事件都發(fā)到了格式為 {roomName}-event-topic 的 topic 中,所以函數(shù)應該接收所有這些 topic 的消息艾船。

2葵腹、讀取這些消息的 Type 字段,過濾出 UserDeadEvent 事件屿岂,并讀取 playerNamekillerName践宴,killerName 出現(xiàn)的次數(shù)就是該玩家獲得的分數(shù)。

3爷怀、還需要把玩家分數(shù)輸出到另一個格式為 {roomName}-score-topic 的 topic 中浴井。

下面開始開發(fā)。

先貼官網(wǎng)文檔:

https://pulsar.apache.org/docs/next/functions-develop-api/#use-sdk-for-javapythongo

首先需要設置 Pulsar Function 開發(fā)相關的 Maven 依賴:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-functions-api</artifactId>
    <version>${pulsar.version}</version>
</dependency>

然后就可以開始開發(fā)了霉撵,完整的源碼在 function-code 目錄:

public class ScoreboardFunction implements Function<GenericJsonRecord, Void> {

    @Override
    public Void process(GenericJsonRecord input, Context context) {

        String type = (String) input.getField("type");
        if (type.equals("UserDeadEvent")) {
            String player = (String) input.getField("name");
            String killer = (String) input.getField("comment");
           if (player.equals(killer)) {
               // kill himself
               return null;
           }

            // get the source topic of this message
            Optional<String> inputTopic = context.getCurrentRecord().getTopicName();
            if (inputTopic.isEmpty()) {
                return null;
            }
            // calculate the corresponding topic to send score
            Optional<String> outputTopic = changeEventTopicNameToScoreTopicName(inputTopic.get());
            if (outputTopic.isEmpty()) {
                return null;
            }
            // roomName-playerName as the stateful key  /
            // store the score in stateful function
            String killerKey = parseRoomName(inputTopic.get()).get() + "-" + killer;
            context.incrCounter(killerKey, 1);

            // send the score messages to score topic
            long score = context.getCounter(killerKey);
            try {
                // player name as the key, score as the value
                context.newOutputMessage(outputTopic.get(), Schema.STRING)
                        .key(killer)
                        .value(score + "")
                        .send();
            } catch (PulsarClientException e) {
                // todo: ignore error for now
                e.printStackTrace();
            }
        }

        return null;
    }
}

因為我們前文給 topic 中的消息設置了 JSON Schema,所以這里設置 topic 中的消息類型為 GenericJsonRecord洪囤。

這段代碼的邏輯應該不難理解徒坡,input 就是發(fā)到 event topic 的消息,通過 Pulsar Function 的 context 可以拿到這個 event topic 的名字瘤缩,由于 event topic 名字包含游戲房間名喇完,所以只要修改 event topic 名稱后綴即可得到 score topic 的名字。

函數(shù)的主要工作是過濾出 UserDeadEvent剥啤,讀取 killerName锦溪〔桓考慮到不能把不同房間的擊殺事件混在一起,我把 {roomName}-{killerName} 作為 Function 的鍵刻诊,并遞增計數(shù)器記錄玩家的分數(shù)防楷,最后調用 context.newOutputMessage 把玩家的分數(shù)發(fā)送到房間對應的 score topic 中。

Function 的調試

可以參考這篇官網(wǎng)文檔则涯,用 localrun 模式在本地調試 Function:

https://pulsar.apache.org/docs/next/functions-debug-localrun/

localrun 模式相當于直接在本地起了一個 Function worker复局,能夠連接到 Pulsar,并運行我們剛才開發(fā)的 Function 代碼粟判。

完整的源碼在 function-code 目錄亿昏,注意我們要對 Function 進行正確的配置,比如 Function 類以及作為輸入的 topic 名稱等等:

 String inputTopic = ".*-event-topic";
// enable regex support to subscribe multiple topics
HashMap<String, ConsumerConfig> inputSpecs = new HashMap<>();
ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(true).build();
inputSpecs.put(inputTopic, consumerConfig);
functionConfig.setInputSpecs(inputSpecs);

functionConfig.setClassName(ScoreboardFunction.class.getName());

配置完 functionConfig 后可以啟動一個本地的 Function worker:

LocalRunner localRunner = LocalRunner.builder()
        .brokerServiceUrl("pulsar://localhost:6650")
        .stateStorageServiceUrl("bk://localhost:4181")
        .functionConfig(functionConfig)
        .build();

localRunner.start(false);

其中 brokerServiceUrl 是 Pulsar broker 的連接地址档礁,stateStorageServiceUrl 是提供 stateStorage 的 bookkeeper 地址角钩,默認情況下在 4181 端口。

這樣呻澜,只要啟動 main 函數(shù)递礼,就會啟動 local runner,并加載我們剛開發(fā)的 Function易迹,把所有后綴為 -event-topic 的 topic 中的消息輸入給 Function宰衙。

計分板的開發(fā)

我們剛才開發(fā)的 Function 會把玩家名稱和該玩家獲得的分數(shù)作為一條消息的鍵和值發(fā)送到 {roomName}-score-topic 中,那么玩家客戶端如何獲取這些信息呢睹欲?這就要用到之前介紹的 tableView 功能了供炼。

可以在游戲客戶端代碼中看到 tableView 的使用:

tableView, err := client.CreateTableView(pulsar.TableViewOptions{
    Topic:           roomName + "-score-topic",
    Schema:          pulsar.NewStringSchema(nil),
    SchemaValueType: reflect.TypeOf(""),
})

我們在游戲數(shù)據(jù)中維護一個名為 scores 的 lru 緩存,存儲最近的最多 5 名玩家的分數(shù)信息窘疮,同時利用 tableView 的 ForEachAndListen 方法更新 lru 緩存:

client.tableView.ForEachAndListen(func(playerName string, i interface{}) error {
    score := *i.(*string)
    g.scores.Add(playerName, score)
    return nil
})

這樣袋哼,當玩家分數(shù)更新時,lru 緩存中的數(shù)據(jù)就會更新闸衫,我們只要把對應的分數(shù)數(shù)據(jù)顯示到游戲界面上即可涛贯。

更多高質量干貨文章,請關注我的微信公眾號 labuladong 和算法博客 labuladong 的算法秘籍蔚出。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末弟翘,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子骄酗,更是在濱河造成了極大的恐慌稀余,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件趋翻,死亡現(xiàn)場離奇詭異睛琳,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門师骗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來历等,“玉大人,你說我怎么就攤上這事辟癌『停” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵愿待,是天一觀的道長浩螺。 經(jīng)常有香客問我,道長仍侥,這世上最難降的妖魔是什么要出? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮农渊,結果婚禮上患蹂,老公的妹妹穿的比我還像新娘。我一直安慰自己砸紊,他們只是感情好传于,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著醉顽,像睡著了一般沼溜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上游添,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天系草,我揣著相機與錄音,去河邊找鬼唆涝。 笑死找都,一個胖子當著我的面吹牛,可吹牛的內容都是我干的廊酣。 我是一名探鬼主播能耻,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼亡驰!你這毒婦竟也來了晓猛?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤凡辱,失蹤者是張志新(化名)和其女友劉穎鞍帝,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體煞茫,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了续徽。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蚓曼。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖钦扭,靈堂內的尸體忽然破棺而出纫版,到底是詐尸還是另有隱情,我是刑警寧澤客情,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布其弊,位于F島的核電站,受9級特大地震影響膀斋,放射性物質發(fā)生泄漏梭伐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一仰担、第九天 我趴在偏房一處隱蔽的房頂上張望糊识。 院中可真熱鬧,春花似錦摔蓝、人聲如沸赂苗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拌滋。三九已至,卻和暖如春猜谚,著一層夾襖步出監(jiān)牢的瞬間败砂,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工龄毡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留吠卷,地道東北人。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓沦零,卻偏偏與公主長得像祭隔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子路操,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內容