撮合引擎開發(fā):流程的代碼實現(xiàn)

歡迎關(guān)注「Keegan小鋼」公眾號獲取更多文章


撮合引擎開發(fā):開篇

撮合引擎開發(fā):MVP版本

撮合引擎開發(fā):數(shù)據(jù)結(jié)構(gòu)設(shè)計

撮合引擎開發(fā):對接黑箱

撮合引擎開發(fā):解密黑箱流程

撮合引擎開發(fā):流程的代碼實現(xiàn)


程序入口

我們要開始聊代碼實現(xiàn)邏輯了,如果不記得之前講的目錄結(jié)構(gòu),請回去翻看前文腮鞍。聊代碼實現(xiàn)的第一步自然從程序入口開始评汰,核心就兩個函數(shù):init()main()阱穗,其代碼如下:

package main

... //other codes

func init() {
    initViper()
    initLog()

    engine.Init()
    middleware.Init()
    process.Init()
}

func main() {
    mux := http.NewServeMux()
    mux.HandleFunc("/openMatching", handler.OpenMatching)
    mux.HandleFunc("/closeMatching", handler.CloseMatching)
    mux.HandleFunc("/handleOrder", handler.HandleOrder)

    log.Printf("HTTP ListenAndServe at port %s", viper.GetString("server.port"))
    if err := http.ListenAndServe(viper.GetString("server.port"), mux); err != nil {
        panic(err)
    }
}

init() 函數(shù)做了一些初始化的操作稚矿,我來簡單介紹這幾個初始化函數(shù):

  • initViper():配置文件初始化,使用了第三方配置庫 viper闷堡,這是一個被廣泛使用的配置庫骚勘,其 github 地址為 https://github.com/spf13/viper蕴掏。
  • initLog():日志初始化,程序主要使用自己定義的日志包用來輸出日志文件调鲸,該日志包的實現(xiàn)后續(xù)文章再單獨講。
  • engine.Init():引擎包的初始化挽荡,只是初始化了一個 map藐石,用來保存不同交易標(biāo)的的訂單 channel,作為各交易標(biāo)的的定序隊列來用定拟。
  • middleware.Init():中間件的初始化于微,我們用到的中間件就只有 Redis,所以這里其實就是初始化 Redis 連接青自。Redis 客戶端庫方面我選擇的是 go-redis/redis株依。
  • process.Init():這一步主要是從緩存加載和恢復(fù)各交易標(biāo)的引擎的啟動和所有訂單數(shù)據(jù)。

viper 和 redis 的初始化都是參照官方 demo 寫的延窜,這里就不展開說明了恋腕。log 后續(xù)再單獨講。engine 包和 process 包的初始化就需要好好講講逆瑞。

其中荠藤,引擎包的初始化雖然非常簡單伙单,但很關(guān)鍵,其代碼寫在 engine/init.go 文件中哈肖,完整代碼如下:

package engine

var ChanMap map[string]chan Order

func Init() {
    ChanMap = make(map[string]chan Order)
}

這個保存通道的 map吻育,其 Key 是各交易標(biāo)的的 symbol,即是說每個交易標(biāo)的各有一個訂單通道淤井,這些訂單通道將作為每個交易標(biāo)的的定序隊列布疼。

process 包的初始化則如下:

func Init() {
    symbols := cache.GetSymbols()
    for _, symbol := range symbols {
        price := cache.GetPrice(symbol)
        NewEngine(symbol, price)

        orderIds := cache.GetOrderIdsWithAction(symbol)
        for _, orderId := range orderIds {
            mapOrder := cache.GetOrder(symbol, orderId)
            order := engine.Order{}
            order.FromMap(mapOrder)
            engine.ChanMap[order.Symbol] <- order
        }
    }
}

簡單講解下實現(xiàn)邏輯:

  1. 從緩存讀取所有 symbol,即程序重啟之前币狠,已經(jīng)開啟了撮合的所有交易標(biāo)的的 symbol游两;
  2. 從緩存讀取每個 symbol 對應(yīng)的價格,這是程序重啟前的最新成交價格总寻;
  3. 啟動每個 symbol 的撮合引擎器罐;
  4. 從緩存讀取每個 symbol 的所有訂單,這些訂單都是按時間順序排列的渐行;
  5. 按順序?qū)⑦@些訂單添加到對應(yīng) symbol 的訂單通道里去轰坊。

如果對這里面有些設(shè)計邏輯還不太明白的話,也沒關(guān)系祟印,后面講到對應(yīng)模塊時會再詳細說明肴沫。

main() 函數(shù)里,定義了我們之前所說的三個接口蕴忆,分別交由對應(yīng)的 handler 去處理具體的請求颤芬,之后就啟動 http 服務(wù)了。

handler

因為只有幾個接口套鹅,而且也很簡單站蝠,因此,并沒有引入第三方 web 框架卓鹿,handler 都是用原生實現(xiàn)的菱魔。先來看看 OpenMatching 的完整實現(xiàn):

package handler

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
    "strings"

    "matching/errcode"
    "matching/process"

    "github.com/shopspring/decimal"
)

type openMatchingParams struct {
    Symbol string          `json:"symbol"`
    Price  decimal.Decimal `json:"price"`
}

func OpenMatching(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/json")
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    body, err := ioutil.ReadAll(r.Body)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    var params openMatchingParams
    if err := json.Unmarshal(body, &params); err != nil {
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    if strings.TrimSpace(params.Symbol) == "" {
        w.Write(errcode.BlankSymbol.ToJson())
        return
    }

    if params.Price.IsNegative() {
        w.Write(errcode.InvalidPrice.ToJson())
        return
    }

    if e := process.NewEngine(params.Symbol, params.Price); !e.IsOK() {
        w.Write(e.ToJson())
        return
    }

    w.Write(errcode.OK.ToJson())
}

邏輯非常簡單,先判斷是否為 POST 請求吟孙,再讀取 body 里的數(shù)據(jù)并轉(zhuǎn)為結(jié)構(gòu)體對象澜倦,接著對參數(shù)做個簡單的檢查,最后就調(diào)用 process.NewEngine(symbol, price) 進入下一步的業(yè)務(wù)邏輯杰妓,如果結(jié)果返回是 OK藻治,也返回 OK 作為請求的響應(yīng)。

另外巷挥,用到了第三方的 decimal.Decimal 類型用來表示價格桩卵,整個程序都統(tǒng)一用 decimal 來表示浮點數(shù)和做精確計算。

CloseMatchingHandleOrder 的實現(xiàn)邏輯也是同理,CloseMatching 最后會調(diào)用 process.CloseEngine(symbol) 函數(shù)進入下一步的處理吸占,HandleOrder 最后則調(diào)用 process.Dispatch(order) 進入下一步晴叨。不過,Order 結(jié)構(gòu)體是定義在 engine 包的矾屯,其結(jié)構(gòu)如下:

type Order struct {
    Action    enum.OrderAction `json:"action"`
    Symbol    string           `json:"symbol"`
    OrderId   string           `json:"orderId"`
    Side      enum.OrderSide   `json:"side"`
    Type      enum.OrderType   `json:"type"`
    Amount    decimal.Decimal  `json:"amount"`
    Price     decimal.Decimal  `json:"price"`
    Timestamp int64            `json:"timestamp"`
}

可以看到兼蕊,其中的字段,除了有 Decimal 類型件蚕,還有 enum 包的幾個類型孙技,這幾個其實是我們程序中自己定義的枚舉類型。Golang 語言本身并沒有提供和其他語言一樣的 enum 關(guān)鍵字來定義枚舉類型排作,所以一般采用類型定義+常量來模擬枚舉類型牵啦,以 enum.OrderAction 為例:

type OrderAction string

const (
    ActionCreate OrderAction = "create"
    ActionCancel OrderAction = "cancel"
)

其他幾個枚舉類型也是這樣定義的。

另外妄痪,為了方便轉(zhuǎn)為字符串和檢驗參數(shù)是否有效哈雏,程序中還為每個枚舉類型分別提供了兩個函數(shù),還是以 OrderAction 為例:

func (o OrderAction) String() string {
    switch o {
    case ActionCreate:
        return "create"
    case ActionCancel:
        return "cancel"
    default:
        return "unknown"
    }
}

func (o OrderAction) Valid() bool {
    if o.String() == "unknown" {
        return false
    }
    return true
}

其他幾個枚舉類型也都定義了類似的兩個函數(shù)衫生,就不再貼代碼了裳瘪。

process 包

來回顧下 process 包有哪些文件:

└── process                  #
    ├── close_engine.go      # 關(guān)閉引擎
    ├── dispatch.go          # 分發(fā)訂單
    ├── init.go              # 初始化
    └── new_engine.go        # 啟動新引擎

init.go 就一個初始化函數(shù),上文已經(jīng)講了罪针。其他三個文件分別定義了上文三個 handler 對應(yīng)的下一步邏輯實現(xiàn)彭羹。

啟動新引擎

先來看看 new_engine.go

package process

import (
    "matching/engine"
    "matching/errcode"
    "matching/middleware/cache"

    "github.com/shopspring/decimal"
)

func NewEngine(symbol string, price decimal.Decimal) *errcode.Errcode {
    if engine.ChanMap[symbol] != nil {
        return errcode.EngineExist
    }

    engine.ChanMap[symbol] = make(chan engine.Order, 100)
    go engine.Run(symbol, price)

    cache.SaveSymbol(symbol)
    cache.SavePrice(symbol, price)

    return errcode.OK
}

邏輯也是比較簡單的,第一步先判斷 ChanMap[symbol] 是否為空泪酱,該 ChanMap 就是上文所說的引擎包初始化時用來保存訂單通道的 map派殷。如果 ChanMap[symbol] 不為空,說明該 symbol 的撮合引擎已經(jīng)啟動過了墓阀,那就返回錯誤毡惜。如果為空,那就初始化這個 symbol 的通道斯撮,從代碼可知虱黄,ChanMap[symbol] 初始化為一個緩沖大小為 100 的訂單通道。

接著吮成,就調(diào)用 engine.Run() 啟動一個 goroutine 了,這行代碼即表示用 goroutine 的方式啟動指定 symbol 的撮合引擎了辜梳。

然后粱甫,就將 symbol 和 price 都緩存起來了。

最后作瞄,返回 OK茶宵,搞定。

2. 分發(fā)訂單

接著宗挥,來看看 Dispatch 的實現(xiàn)又是怎樣的:

func Dispatch(order engine.Order) *errcode.Errcode {
    if engine.ChanMap[order.Symbol] == nil {
        return errcode.EngineNotFound
    }

    if order.Action == enum.ActionCreate {
        if cache.OrderExist(order.Symbol, order.OrderId, order.Action.String()) {
            return errcode.OrderExist
        }
    } else {
        if !cache.OrderExist(order.Symbol, order.OrderId, enum.ActionCreate.String()) {
            return errcode.OrderNotFound
        }
    }

    order.Timestamp = time.Now().UnixNano() / 1e3
    cache.SaveOrder(order.ToMap())
    engine.ChanMap[order.Symbol] <- order

    return errcode.OK
}

第一步乌庶,判斷 ChanMap[order.Symbol] 是否為空种蝶,如果為空,表示引擎沒開啟瞒大,那就無法處理訂單螃征。

第二步,判斷訂單是否存在透敌。如果是 create 訂單盯滚,那緩存中就不應(yīng)該查到訂單,否則說明是重復(fù)請求酗电。如果是 cancel 訂單魄藕,那緩存中如果也查不到訂單,那說明該訂單已經(jīng)全部成交或已經(jīng)成功撤單過了撵术。

第三步背率,將訂單時間設(shè)為當(dāng)前時間,時間單位是 100 納秒嫩与,這可以保證時間戳長度剛好為 16 位寝姿,保存到 Redis 里就不會有精度失真的問題。這點后續(xù)文章講到 Redis 詳細設(shè)計時再說蕴纳。

第四步会油,將訂單緩存。

第五步古毛,將訂單傳入對應(yīng)的訂單通道翻翩,對應(yīng)引擎會從該通道中獲取該訂單進行處理。這一步就實現(xiàn)了訂單的分發(fā)稻薇。

第六步嫂冻,返回 OK。

3. 關(guān)閉引擎

關(guān)閉引擎的實現(xiàn)就非常簡單了塞椎,請看代碼:

func CloseEngine(symbol string) *errcode.Errcode {
    if engine.ChanMap[symbol] == nil {
        return errcode.EngineNotFound
    }

    close(engine.ChanMap[symbol])

    return errcode.OK
}

核心代碼就一行桨仿,將對應(yīng) symbol 的訂單通道關(guān)閉。后續(xù)的處理其實是在引擎里完成的案狠,待會我們再結(jié)合引擎里的代碼來講解這個設(shè)計服傍。

引擎入口的實現(xiàn)

交易引擎 goroutine 的啟動入口就是 engine.Run() 函數(shù),來看看其代碼實現(xiàn):

func Run(symbol string, price decimal.Decimal) {
    lastTradePrice := price

    book := &orderBook{}
    book.init()

    log.Info("engine %s is running", symbol)
    for {
        order, ok := <-ChanMap[symbol]
        if !ok {
            log.Info("engine %s is closed", symbol)
            delete(ChanMap, symbol)
            cache.Clear(symbol)
            return
        }
        log.Info("engine %s receive an order: %s", symbol, order.ToJson())
        switch order.Action {
        case enum.ActionCreate:
            dealCreate(&order, book, &lastTradePrice)
        case enum.ActionCancel:
            dealCancel(&order, book)
        }
    }
}

第一步骂铁,先定義和初始化了一個 book 變量吹零,該變量就是用來保存整個交易委托賬本

接著拉庵,就是一個 for 循環(huán)了灿椅,for 循環(huán)里的第一行就是從對應(yīng) symbol 的訂單通道里讀取出一個訂單,讀取到訂單時,order 變量就會有值茫蛹,且 ok 變量為 true操刀。如果通道里暫時沒有訂單,那就會阻塞在這行代碼婴洼,直到從通道中獲取到訂單或通道已關(guān)閉的消息骨坑。

當(dāng)通道被關(guān)閉之后,最后窃蹋,從通道中讀取到的 ok 變量則為 false卡啰,當(dāng)然,在這之前警没,會先依序讀取完通道里剩下的訂單匈辱。當(dāng) ok 為 false 時,引擎里會執(zhí)行兩步操作:一是從 ChanMap 中刪除該 symbol 對應(yīng)的記錄杀迹,二是清空該 symbol 對應(yīng)的緩存數(shù)據(jù)亡脸。最后用 return 來退出 for 循環(huán),這樣树酪,整個 Run() 函數(shù)就結(jié)束退出了浅碾,意味著該引擎也真正關(guān)閉了。

當(dāng)每讀取到一個訂單续语,就會判斷是下單還是撤單垂谢,然后進行相應(yīng)的邏輯處理了。

我們先來看看撤單的邏輯疮茄,這個比較簡單:

func dealCancel(order *Order, book *orderBook) {
    var ok bool
    switch order.Side {
    case enum.SideBuy:
        ok = book.removeBuyOrder(order)
    case enum.SideSell:
        ok = book.removeSellOrder(order)
    }

    cache.RemoveOrder(order.ToMap())
    mq.SendCancelResult(order.Symbol, order.OrderId, ok)
    log.Info("engine %s, order %s cancel result is %s", order.Symbol, order.OrderId, ok)
}

核心就三個步驟:

  1. 從委托賬本中移除該訂單滥朱;
  2. 從緩存中移除該訂單;
  3. 發(fā)送撤單結(jié)果到 MQ力试。

下單邏輯就比較復(fù)雜了徙邻,需要根據(jù)不同的訂單類型做不同的邏輯處理,請看代碼:

func dealCreate(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
    switch order.Type {
    case enum.TypeLimit:
        dealLimit(order, book, lastTradePrice)
    case enum.TypeLimitIoc:
        dealLimitIoc(order, book, lastTradePrice)
    case enum.TypeMarket:
        dealMarket(order, book, lastTradePrice)
    case enum.TypeMarketTop5:
        dealMarketTop5(order, book, lastTradePrice)
    case enum.TypeMarketTop10:
        dealMarketTop10(order, book, lastTradePrice)
    case enum.TypeMarketOpponent:
        dealMarketOpponent(order, book, lastTradePrice)
    }
}

每個類型再分買賣方向處理畸裳,以 dealLimit() 為例:

func dealLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
    switch order.Side {
    case enum.SideBuy:
        dealBuyLimit(order, book, lastTradePrice)
    case enum.SideSell:
        dealSellLimit(order, book, lastTradePrice)
    }
}

然后缰犁,再來看看 dealBuyLimit() 的處理邏輯:

func dealBuyLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
LOOP:
    headOrder := book.getHeadSellOrder()
    if headOrder == nil || order.Price.LessThan(headOrder.Price) {
        book.addBuyOrder(order)
        log.Info("engine %s, a order has added to the orderbook: %s", order.Symbol, order.ToJson())
    } else {
        matchTrade(headOrder, order, book, lastTradePrice)
        if order.Amount.IsPositive() {
            goto LOOP
        }
    }
}

我來解析下這個處理流程:

  1. 從委托賬本中讀取出賣單隊列的頭部訂單;
  2. 如果頭部訂單為空怖糊,或新訂單(買單)價格小于頭部訂單(賣單)帅容,則無法匹配成交,那就將新訂單添加到委托賬本的買單隊列中去伍伤;
  3. 如果頭部訂單不為空丰嘉,且新訂單(買單)價格大于等于頭部訂單(賣單),則兩個訂單可以匹配成交嚷缭,那就對這兩個訂單進行成交處理;
  4. 如果上一步的成交處理完之后,新訂單的剩余數(shù)量還不為零阅爽,那就繼續(xù)重復(fù)第一步路幸。

其中,匹配成交的記錄會作為一條輸出記錄發(fā)送到 MQ付翁。

對其他類型的處理也是類似的简肴,就不再一一講解了。

那引擎包的實現(xiàn)就先講到這里百侧,后續(xù)文章再聊其他部分的實現(xiàn)砰识。

小結(jié)

本小節(jié)主要還是通過代碼梳理清楚整個數(shù)據(jù)流程,包括一些細節(jié)上的設(shè)計佣渴。理解了本文所列舉的這些代碼辫狼,也就對整個撮合服務(wù)的實現(xiàn)理解一大半了。

這次的思考題:ChanMap 保存的訂單通道是否可以改用無緩沖的通道辛润?用無緩沖的通道和用有緩沖的通道處理邏輯有哪些不同膨处?兩種方案各自的優(yōu)缺點是什么?


掃描以下二維碼即可關(guān)注公眾號(公眾號名稱:Keegan小鋼)

作者的個人博客

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末砂竖,一起剝皮案震驚了整個濱河市真椿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌乎澄,老刑警劉巖突硝,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異置济,居然都是意外死亡解恰,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門舟肉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來修噪,“玉大人,你說我怎么就攤上這事路媚』魄恚” “怎么了?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵整慎,是天一觀的道長脏款。 經(jīng)常有香客問我,道長裤园,這世上最難降的妖魔是什么撤师? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮拧揽,結(jié)果婚禮上剃盾,老公的妹妹穿的比我還像新娘腺占。我一直安慰自己,他們只是感情好痒谴,可當(dāng)我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布衰伯。 她就那樣靜靜地躺著,像睡著了一般积蔚。 火紅的嫁衣襯著肌膚如雪意鲸。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天尽爆,我揣著相機與錄音怎顾,去河邊找鬼。 笑死漱贱,一個胖子當(dāng)著我的面吹牛槐雾,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播饱亿,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼蚜退,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了彪笼?” 一聲冷哼從身側(cè)響起钻注,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎配猫,沒想到半個月后幅恋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡泵肄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年捆交,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片腐巢。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡品追,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出冯丙,到底是詐尸還是另有隱情肉瓦,我是刑警寧澤,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布胃惜,位于F島的核電站泞莉,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏船殉。R本人自食惡果不足惜鲫趁,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望利虫。 院中可真熱鬧挨厚,春花似錦堡僻、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至慌申,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間理郑,已是汗流浹背蹄溉。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留您炉,地道東北人柒爵。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像赚爵,于是被迫代替她去往敵國和親棉胀。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內(nèi)容