歡迎關(guān)注「Keegan小鋼」公眾號獲取更多文章
撮合引擎開發(fā):數(shù)據(jù)結(jié)構(gòu)設(shè)計
程序入口
我們要開始聊代碼實現(xiàn)邏輯了,如果不記得之前講的目錄結(jié)構(gòu),請回去翻看前文腮鞍。聊代碼實現(xiàn)的第一步自然從程序入口開始评汰,核心就兩個函數(shù):init() 和 main()阱穗,其代碼如下:
package main
... //other codes
func init() {
initViper()
initLog()
engine.Init()
middleware.Init()
process.Init()
}
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/openMatching", handler.OpenMatching)
mux.HandleFunc("/closeMatching", handler.CloseMatching)
mux.HandleFunc("/handleOrder", handler.HandleOrder)
log.Printf("HTTP ListenAndServe at port %s", viper.GetString("server.port"))
if err := http.ListenAndServe(viper.GetString("server.port"), mux); err != nil {
panic(err)
}
}
init() 函數(shù)做了一些初始化的操作稚矿,我來簡單介紹這幾個初始化函數(shù):
- initViper():配置文件初始化,使用了第三方配置庫 viper闷堡,這是一個被廣泛使用的配置庫骚勘,其 github 地址為 https://github.com/spf13/viper蕴掏。
- initLog():日志初始化,程序主要使用自己定義的日志包用來輸出日志文件调鲸,該日志包的實現(xiàn)后續(xù)文章再單獨講。
- engine.Init():引擎包的初始化挽荡,只是初始化了一個 map藐石,用來保存不同交易標(biāo)的的訂單 channel,作為各交易標(biāo)的的定序隊列來用定拟。
- middleware.Init():中間件的初始化于微,我們用到的中間件就只有 Redis,所以這里其實就是初始化 Redis 連接青自。Redis 客戶端庫方面我選擇的是 go-redis/redis株依。
- process.Init():這一步主要是從緩存加載和恢復(fù)各交易標(biāo)的引擎的啟動和所有訂單數(shù)據(jù)。
viper 和 redis 的初始化都是參照官方 demo 寫的延窜,這里就不展開說明了恋腕。log 后續(xù)再單獨講。engine 包和 process 包的初始化就需要好好講講逆瑞。
其中荠藤,引擎包的初始化雖然非常簡單伙单,但很關(guān)鍵,其代碼寫在 engine/init.go 文件中哈肖,完整代碼如下:
package engine
var ChanMap map[string]chan Order
func Init() {
ChanMap = make(map[string]chan Order)
}
這個保存通道的 map吻育,其 Key 是各交易標(biāo)的的 symbol,即是說每個交易標(biāo)的各有一個訂單通道淤井,這些訂單通道將作為每個交易標(biāo)的的定序隊列布疼。
process 包的初始化則如下:
func Init() {
symbols := cache.GetSymbols()
for _, symbol := range symbols {
price := cache.GetPrice(symbol)
NewEngine(symbol, price)
orderIds := cache.GetOrderIdsWithAction(symbol)
for _, orderId := range orderIds {
mapOrder := cache.GetOrder(symbol, orderId)
order := engine.Order{}
order.FromMap(mapOrder)
engine.ChanMap[order.Symbol] <- order
}
}
}
簡單講解下實現(xiàn)邏輯:
- 從緩存讀取所有 symbol,即程序重啟之前币狠,已經(jīng)開啟了撮合的所有交易標(biāo)的的 symbol游两;
- 從緩存讀取每個 symbol 對應(yīng)的價格,這是程序重啟前的最新成交價格总寻;
- 啟動每個 symbol 的撮合引擎器罐;
- 從緩存讀取每個 symbol 的所有訂單,這些訂單都是按時間順序排列的渐行;
- 按順序?qū)⑦@些訂單添加到對應(yīng) symbol 的訂單通道里去轰坊。
如果對這里面有些設(shè)計邏輯還不太明白的話,也沒關(guān)系祟印,后面講到對應(yīng)模塊時會再詳細說明肴沫。
main() 函數(shù)里,定義了我們之前所說的三個接口蕴忆,分別交由對應(yīng)的 handler 去處理具體的請求颤芬,之后就啟動 http 服務(wù)了。
handler
因為只有幾個接口套鹅,而且也很簡單站蝠,因此,并沒有引入第三方 web 框架卓鹿,handler 都是用原生實現(xiàn)的菱魔。先來看看 OpenMatching 的完整實現(xiàn):
package handler
import (
"encoding/json"
"io/ioutil"
"net/http"
"strings"
"matching/errcode"
"matching/process"
"github.com/shopspring/decimal"
)
type openMatchingParams struct {
Symbol string `json:"symbol"`
Price decimal.Decimal `json:"price"`
}
func OpenMatching(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
var params openMatchingParams
if err := json.Unmarshal(body, ¶ms); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
if strings.TrimSpace(params.Symbol) == "" {
w.Write(errcode.BlankSymbol.ToJson())
return
}
if params.Price.IsNegative() {
w.Write(errcode.InvalidPrice.ToJson())
return
}
if e := process.NewEngine(params.Symbol, params.Price); !e.IsOK() {
w.Write(e.ToJson())
return
}
w.Write(errcode.OK.ToJson())
}
邏輯非常簡單,先判斷是否為 POST 請求吟孙,再讀取 body 里的數(shù)據(jù)并轉(zhuǎn)為結(jié)構(gòu)體對象澜倦,接著對參數(shù)做個簡單的檢查,最后就調(diào)用 process.NewEngine(symbol, price) 進入下一步的業(yè)務(wù)邏輯杰妓,如果結(jié)果返回是 OK藻治,也返回 OK 作為請求的響應(yīng)。
另外巷挥,用到了第三方的 decimal.Decimal 類型用來表示價格桩卵,整個程序都統(tǒng)一用 decimal 來表示浮點數(shù)和做精確計算。
CloseMatching 和 HandleOrder 的實現(xiàn)邏輯也是同理,CloseMatching 最后會調(diào)用 process.CloseEngine(symbol) 函數(shù)進入下一步的處理吸占,HandleOrder 最后則調(diào)用 process.Dispatch(order) 進入下一步晴叨。不過,Order 結(jié)構(gòu)體是定義在 engine 包的矾屯,其結(jié)構(gòu)如下:
type Order struct {
Action enum.OrderAction `json:"action"`
Symbol string `json:"symbol"`
OrderId string `json:"orderId"`
Side enum.OrderSide `json:"side"`
Type enum.OrderType `json:"type"`
Amount decimal.Decimal `json:"amount"`
Price decimal.Decimal `json:"price"`
Timestamp int64 `json:"timestamp"`
}
可以看到兼蕊,其中的字段,除了有 Decimal 類型件蚕,還有 enum 包的幾個類型孙技,這幾個其實是我們程序中自己定義的枚舉類型。Golang 語言本身并沒有提供和其他語言一樣的 enum 關(guān)鍵字來定義枚舉類型排作,所以一般采用類型定義+常量來模擬枚舉類型牵啦,以 enum.OrderAction 為例:
type OrderAction string
const (
ActionCreate OrderAction = "create"
ActionCancel OrderAction = "cancel"
)
其他幾個枚舉類型也是這樣定義的。
另外妄痪,為了方便轉(zhuǎn)為字符串和檢驗參數(shù)是否有效哈雏,程序中還為每個枚舉類型分別提供了兩個函數(shù),還是以 OrderAction 為例:
func (o OrderAction) String() string {
switch o {
case ActionCreate:
return "create"
case ActionCancel:
return "cancel"
default:
return "unknown"
}
}
func (o OrderAction) Valid() bool {
if o.String() == "unknown" {
return false
}
return true
}
其他幾個枚舉類型也都定義了類似的兩個函數(shù)衫生,就不再貼代碼了裳瘪。
process 包
來回顧下 process 包有哪些文件:
└── process #
├── close_engine.go # 關(guān)閉引擎
├── dispatch.go # 分發(fā)訂單
├── init.go # 初始化
└── new_engine.go # 啟動新引擎
init.go 就一個初始化函數(shù),上文已經(jīng)講了罪针。其他三個文件分別定義了上文三個 handler 對應(yīng)的下一步邏輯實現(xiàn)彭羹。
啟動新引擎
先來看看 new_engine.go:
package process
import (
"matching/engine"
"matching/errcode"
"matching/middleware/cache"
"github.com/shopspring/decimal"
)
func NewEngine(symbol string, price decimal.Decimal) *errcode.Errcode {
if engine.ChanMap[symbol] != nil {
return errcode.EngineExist
}
engine.ChanMap[symbol] = make(chan engine.Order, 100)
go engine.Run(symbol, price)
cache.SaveSymbol(symbol)
cache.SavePrice(symbol, price)
return errcode.OK
}
邏輯也是比較簡單的,第一步先判斷 ChanMap[symbol] 是否為空泪酱,該 ChanMap 就是上文所說的引擎包初始化時用來保存訂單通道的 map派殷。如果 ChanMap[symbol] 不為空,說明該 symbol 的撮合引擎已經(jīng)啟動過了墓阀,那就返回錯誤毡惜。如果為空,那就初始化這個 symbol 的通道斯撮,從代碼可知虱黄,ChanMap[symbol] 初始化為一個緩沖大小為 100 的訂單通道。
接著吮成,就調(diào)用 engine.Run() 啟動一個 goroutine 了,這行代碼即表示用 goroutine 的方式啟動指定 symbol 的撮合引擎了辜梳。
然后粱甫,就將 symbol 和 price 都緩存起來了。
最后作瞄,返回 OK茶宵,搞定。
2. 分發(fā)訂單
接著宗挥,來看看 Dispatch 的實現(xiàn)又是怎樣的:
func Dispatch(order engine.Order) *errcode.Errcode {
if engine.ChanMap[order.Symbol] == nil {
return errcode.EngineNotFound
}
if order.Action == enum.ActionCreate {
if cache.OrderExist(order.Symbol, order.OrderId, order.Action.String()) {
return errcode.OrderExist
}
} else {
if !cache.OrderExist(order.Symbol, order.OrderId, enum.ActionCreate.String()) {
return errcode.OrderNotFound
}
}
order.Timestamp = time.Now().UnixNano() / 1e3
cache.SaveOrder(order.ToMap())
engine.ChanMap[order.Symbol] <- order
return errcode.OK
}
第一步乌庶,判斷 ChanMap[order.Symbol] 是否為空种蝶,如果為空,表示引擎沒開啟瞒大,那就無法處理訂單螃征。
第二步,判斷訂單是否存在透敌。如果是 create 訂單盯滚,那緩存中就不應(yīng)該查到訂單,否則說明是重復(fù)請求酗电。如果是 cancel 訂單魄藕,那緩存中如果也查不到訂單,那說明該訂單已經(jīng)全部成交或已經(jīng)成功撤單過了撵术。
第三步背率,將訂單時間設(shè)為當(dāng)前時間,時間單位是 100 納秒嫩与,這可以保證時間戳長度剛好為 16 位寝姿,保存到 Redis 里就不會有精度失真的問題。這點后續(xù)文章講到 Redis 詳細設(shè)計時再說蕴纳。
第四步会油,將訂單緩存。
第五步古毛,將訂單傳入對應(yīng)的訂單通道翻翩,對應(yīng)引擎會從該通道中獲取該訂單進行處理。這一步就實現(xiàn)了訂單的分發(fā)稻薇。
第六步嫂冻,返回 OK。
3. 關(guān)閉引擎
關(guān)閉引擎的實現(xiàn)就非常簡單了塞椎,請看代碼:
func CloseEngine(symbol string) *errcode.Errcode {
if engine.ChanMap[symbol] == nil {
return errcode.EngineNotFound
}
close(engine.ChanMap[symbol])
return errcode.OK
}
核心代碼就一行桨仿,將對應(yīng) symbol 的訂單通道關(guān)閉。后續(xù)的處理其實是在引擎里完成的案狠,待會我們再結(jié)合引擎里的代碼來講解這個設(shè)計服傍。
引擎入口的實現(xiàn)
交易引擎 goroutine 的啟動入口就是 engine.Run() 函數(shù),來看看其代碼實現(xiàn):
func Run(symbol string, price decimal.Decimal) {
lastTradePrice := price
book := &orderBook{}
book.init()
log.Info("engine %s is running", symbol)
for {
order, ok := <-ChanMap[symbol]
if !ok {
log.Info("engine %s is closed", symbol)
delete(ChanMap, symbol)
cache.Clear(symbol)
return
}
log.Info("engine %s receive an order: %s", symbol, order.ToJson())
switch order.Action {
case enum.ActionCreate:
dealCreate(&order, book, &lastTradePrice)
case enum.ActionCancel:
dealCancel(&order, book)
}
}
}
第一步骂铁,先定義和初始化了一個 book 變量吹零,該變量就是用來保存整個交易委托賬本。
接著拉庵,就是一個 for 循環(huán)了灿椅,for 循環(huán)里的第一行就是從對應(yīng) symbol 的訂單通道里讀取出一個訂單,讀取到訂單時,order 變量就會有值茫蛹,且 ok 變量為 true操刀。如果通道里暫時沒有訂單,那就會阻塞在這行代碼婴洼,直到從通道中獲取到訂單或通道已關(guān)閉的消息骨坑。
當(dāng)通道被關(guān)閉之后,最后窃蹋,從通道中讀取到的 ok 變量則為 false卡啰,當(dāng)然,在這之前警没,會先依序讀取完通道里剩下的訂單匈辱。當(dāng) ok 為 false 時,引擎里會執(zhí)行兩步操作:一是從 ChanMap 中刪除該 symbol 對應(yīng)的記錄杀迹,二是清空該 symbol 對應(yīng)的緩存數(shù)據(jù)亡脸。最后用 return 來退出 for 循環(huán),這樣树酪,整個 Run() 函數(shù)就結(jié)束退出了浅碾,意味著該引擎也真正關(guān)閉了。
當(dāng)每讀取到一個訂單续语,就會判斷是下單還是撤單垂谢,然后進行相應(yīng)的邏輯處理了。
我們先來看看撤單的邏輯疮茄,這個比較簡單:
func dealCancel(order *Order, book *orderBook) {
var ok bool
switch order.Side {
case enum.SideBuy:
ok = book.removeBuyOrder(order)
case enum.SideSell:
ok = book.removeSellOrder(order)
}
cache.RemoveOrder(order.ToMap())
mq.SendCancelResult(order.Symbol, order.OrderId, ok)
log.Info("engine %s, order %s cancel result is %s", order.Symbol, order.OrderId, ok)
}
核心就三個步驟:
- 從委托賬本中移除該訂單滥朱;
- 從緩存中移除該訂單;
- 發(fā)送撤單結(jié)果到 MQ力试。
下單邏輯就比較復(fù)雜了徙邻,需要根據(jù)不同的訂單類型做不同的邏輯處理,請看代碼:
func dealCreate(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
switch order.Type {
case enum.TypeLimit:
dealLimit(order, book, lastTradePrice)
case enum.TypeLimitIoc:
dealLimitIoc(order, book, lastTradePrice)
case enum.TypeMarket:
dealMarket(order, book, lastTradePrice)
case enum.TypeMarketTop5:
dealMarketTop5(order, book, lastTradePrice)
case enum.TypeMarketTop10:
dealMarketTop10(order, book, lastTradePrice)
case enum.TypeMarketOpponent:
dealMarketOpponent(order, book, lastTradePrice)
}
}
每個類型再分買賣方向處理畸裳,以 dealLimit() 為例:
func dealLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
switch order.Side {
case enum.SideBuy:
dealBuyLimit(order, book, lastTradePrice)
case enum.SideSell:
dealSellLimit(order, book, lastTradePrice)
}
}
然后缰犁,再來看看 dealBuyLimit() 的處理邏輯:
func dealBuyLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
LOOP:
headOrder := book.getHeadSellOrder()
if headOrder == nil || order.Price.LessThan(headOrder.Price) {
book.addBuyOrder(order)
log.Info("engine %s, a order has added to the orderbook: %s", order.Symbol, order.ToJson())
} else {
matchTrade(headOrder, order, book, lastTradePrice)
if order.Amount.IsPositive() {
goto LOOP
}
}
}
我來解析下這個處理流程:
- 從委托賬本中讀取出賣單隊列的頭部訂單;
- 如果頭部訂單為空怖糊,或新訂單(買單)價格小于頭部訂單(賣單)帅容,則無法匹配成交,那就將新訂單添加到委托賬本的買單隊列中去伍伤;
- 如果頭部訂單不為空丰嘉,且新訂單(買單)價格大于等于頭部訂單(賣單),則兩個訂單可以匹配成交嚷缭,那就對這兩個訂單進行成交處理;
- 如果上一步的成交處理完之后,新訂單的剩余數(shù)量還不為零阅爽,那就繼續(xù)重復(fù)第一步路幸。
其中,匹配成交的記錄會作為一條輸出記錄發(fā)送到 MQ付翁。
對其他類型的處理也是類似的简肴,就不再一一講解了。
那引擎包的實現(xiàn)就先講到這里百侧,后續(xù)文章再聊其他部分的實現(xiàn)砰识。
小結(jié)
本小節(jié)主要還是通過代碼梳理清楚整個數(shù)據(jù)流程,包括一些細節(jié)上的設(shè)計佣渴。理解了本文所列舉的這些代碼辫狼,也就對整個撮合服務(wù)的實現(xiàn)理解一大半了。
這次的思考題:ChanMap 保存的訂單通道是否可以改用無緩沖的通道辛润?用無緩沖的通道和用有緩沖的通道處理邏輯有哪些不同膨处?兩種方案各自的優(yōu)缺點是什么?
掃描以下二維碼即可關(guān)注公眾號(公眾號名稱:Keegan小鋼)