Golang微服務(wù)框架Kratos應(yīng)用分布式計(jì)劃任務(wù)隊(duì)列Asynq
任務(wù)隊(duì)列(Task Queue) 一般用于跨線程或跨計(jì)算機(jī)分配工作的一種機(jī)制拄查。其本質(zhì)是生產(chǎn)者消費(fèi)者模型啃炸,生產(chǎn)者發(fā)送任務(wù)到消息隊(duì)列肤舞,消費(fèi)者負(fù)責(zé)處理任務(wù)电禀。
任務(wù)隊(duì)列的輸入是稱(chēng)為任務(wù)(Task)
的工作單元扣甲。專(zhuān)用的工作進(jìn)程不斷監(jiān)視任務(wù)隊(duì)列以查找要執(zhí)行的新工作鸽凶。
在Golang語(yǔ)言里面币砂,我們有像Asynq和Machinery這樣的類(lèi)似于Celery
的分布式任務(wù)隊(duì)列。
什么是任務(wù)隊(duì)列
消息隊(duì)列(Message Queue)玻侥,一般來(lái)說(shuō)知道的人不少决摧。比如常見(jiàn)的:kafka、Rabbitmq凑兰、RocketMQ等掌桩。
任務(wù)隊(duì)列(Task Queue),聽(tīng)說(shuō)過(guò)這個(gè)概念的人不會(huì)太多姑食,清楚它的概念的人怕是更少波岛。
這兩個(gè)概念是有關(guān)系的,他們是怎樣的關(guān)系呢矢门?任務(wù)隊(duì)列(Task Queue)是消息隊(duì)列(Message Queue)的超集盆色。任務(wù)隊(duì)列是構(gòu)建在消息隊(duì)列之上的。消息隊(duì)列是任務(wù)隊(duì)列的一部分祟剔。
提起分布式任務(wù)隊(duì)列(Distributed Task Queue)隔躲,就不得不提Python
的Celery。故而物延,下面我們來(lái)看Celery的架構(gòu)圖宣旱,以此來(lái)講解。其他的任務(wù)隊(duì)列也并不會(huì)與之有太大的差異性叛薯,基礎(chǔ)的原理是一致的浑吟。
在 Celery
的架構(gòu)中,由多臺(tái) Server 發(fā)起異步任務(wù)(Async Task)
耗溜,發(fā)送任務(wù)到 Broker
的隊(duì)列中组力,其中的 Celery Beat
進(jìn)程可負(fù)責(zé)發(fā)起定時(shí)任務(wù)。當(dāng) Task
到達(dá) Broker
后抖拴,會(huì)將其分發(fā)給相應(yīng)的 Celery Worker
進(jìn)行處理燎字。當(dāng) Task
處理完成后腥椒,其結(jié)果存儲(chǔ)至 Backend
。
在上述過(guò)程中的 Broker
和 Backend
候衍,Celery
并沒(méi)有去實(shí)現(xiàn)笼蛛,而是使用了已有的開(kāi)源實(shí)現(xiàn),例如 RabbitMQ
作為 Broker
提供消息隊(duì)列服務(wù)蛉鹿,Redis
作為 Backend
提供結(jié)果存儲(chǔ)服務(wù)滨砍。Celery 就像是抽象了消息隊(duì)列架構(gòu)中 Producer
、Consumer
的實(shí)現(xiàn)妖异,將消息隊(duì)列中基本單位“消息”
抽象成了任務(wù)隊(duì)列中的“任務(wù)”惋戏,并將異步、定時(shí)任務(wù)的發(fā)起和結(jié)果存儲(chǔ)等操作進(jìn)行了封裝随闺,讓開(kāi)發(fā)者可以忽略 AMQP日川、RabbitMQ 等實(shí)現(xiàn)細(xì)節(jié)蔓腐,為開(kāi)發(fā)帶來(lái)便利矩乐。
綜上所述,Celery 作為任務(wù)隊(duì)列是基于消息隊(duì)列的進(jìn)一步封裝回论,其實(shí)現(xiàn)依賴消息隊(duì)列散罕。
任務(wù)隊(duì)列的應(yīng)用場(chǎng)景
我們現(xiàn)在知道了任務(wù)隊(duì)列是什么,也知道了它的工作原理傀蓉。但是欧漱,我們并不知道它可以用來(lái)做什么。下面葬燎,我們就來(lái)看看误甚,它到底用在什么樣的場(chǎng)景下。
- 分布式任務(wù):可以將任務(wù)分發(fā)到多個(gè)工作者進(jìn)程或機(jī)器上執(zhí)行谱净,以提高任務(wù)處理速度窑邦。
- 定時(shí)任務(wù):可以在指定時(shí)間執(zhí)行任務(wù)。例如:每天定時(shí)備份數(shù)據(jù)壕探、日志歸檔冈钦、心跳測(cè)試、運(yùn)維巡檢李请。支持 crontab 定時(shí)模式
- 后臺(tái)任務(wù):可以在后臺(tái)執(zhí)行耗時(shí)任務(wù)瞧筛,例如圖像處理、數(shù)據(jù)分析等导盅,不影響用戶界面的響應(yīng)较幌。
- 解耦任務(wù):可以將任務(wù)與主程序解耦,以提高代碼的可讀性和可維護(hù)性白翻,解耦應(yīng)用程序最直接的好處就是可擴(kuò)展性和并發(fā)性能的提高乍炉。支持并發(fā)執(zhí)行任務(wù),同時(shí)支持自動(dòng)動(dòng)態(tài)擴(kuò)展。
- 實(shí)時(shí)處理:可以支持實(shí)時(shí)處理任務(wù)恩急,例如即時(shí)通訊杉畜、消息隊(duì)列等泌辫。
Asynq概述
Asynq是一個(gè)使用Go語(yǔ)言實(shí)現(xiàn)的分布式任務(wù)隊(duì)列和異步處理庫(kù)柠衍,它由Redis提供支持再沧,它提供了輕量級(jí)的尺栖、易于使用的API尚卫,并且具有高可擴(kuò)展性和高可定制化性补疑。其作者Ken Hibino班利,任職于Google洞豁。
Asynq主要由以下幾個(gè)組件組成:
- 任務(wù)(Task):需要被異步執(zhí)行的操作窗看;
- 處理器(Processor):負(fù)責(zé)執(zhí)行任務(wù)的工作進(jìn)程茸歧;
- 隊(duì)列(Queue):存放待執(zhí)行任務(wù)的隊(duì)列;
- 調(diào)度器(Scheduler):根據(jù)規(guī)則將任務(wù)分配給不同的處理器進(jìn)行執(zhí)行显沈。
通過(guò)使用Asynq软瞎,我們可以非常輕松的實(shí)現(xiàn)異步任務(wù)處理,同時(shí)還可以提供高效率拉讯、高可擴(kuò)展性和高自定義性的處理方案涤浇。
Asynq的特點(diǎn)
- 保證至少執(zhí)行一次任務(wù)
- 任務(wù)寫(xiě)入Redis后可以持久化
- 任務(wù)失敗之后,會(huì)自動(dòng)重試
- worker崩潰自動(dòng)恢復(fù)
- 可是實(shí)現(xiàn)任務(wù)的優(yōu)先級(jí)
- 任務(wù)可以進(jìn)行編排
- 任務(wù)可以設(shè)定執(zhí)行時(shí)間或者最長(zhǎng)可執(zhí)行的時(shí)間
- 支持中間件
- 可以使用 unique-option 來(lái)避免任務(wù)重復(fù)執(zhí)行魔慷,實(shí)現(xiàn)唯一性
- 支持 Redis Cluster 和 Redis Sentinels 以達(dá)成高可用性
- 作者提供了Web UI & CLI Tool讓大家查看任務(wù)的執(zhí)行情況
Asynq可視化監(jiān)控
Asynq提供了兩種監(jiān)控手段:CLI和Web UI只锭。
命令行工具CLI
go install github.com/hibiken/asynq/tools/asynq@latest
Web UI
Asynqmon是一個(gè)基于Web的工具,用于監(jiān)視管理Asynq的任務(wù)和隊(duì)列院尔,有關(guān)詳細(xì)的信息可以參閱工具的README蜻展。
Web UI我們可以通過(guò)Docker的方式來(lái)進(jìn)行安裝:
docker pull hibiken/asynqmon:latest
docker run -d \
--name asynq \
-p 8080:8080 \
hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379
安裝好Web UI之后,我們就可以打開(kāi)瀏覽器訪問(wèn)管理后臺(tái)了:http://localhost:8080
- 儀表盤(pán)
- 任務(wù)視圖
- 性能
Kratos下如何應(yīng)用Asynq邀摆?
我們將分布式任務(wù)隊(duì)列以transport.Server
的形式整合進(jìn)微服務(wù)框架Kratos
纵顾。
目前,go里面有兩個(gè)分布式任務(wù)隊(duì)列可用:
我已經(jīng)對(duì)這兩個(gè)庫(kù)進(jìn)行了支持:
Docker部署依賴組件
因?yàn)樗蕾嘡edis隧熙,因此片挂,我們使用Docker的方式安裝Redis的服務(wù)器:
docker pull bitnami/redis:latest
docker run -itd \
--name redis-test \
-p 6379:6379 \
-e ALLOW_EMPTY_PASSWORD=yes \
bitnami/redis:latest
安裝依賴庫(kù)
我們需要在項(xiàng)目中安裝Asynq的依賴庫(kù):
go get -u github.com/tx7do/kratos-transport/transport/asynq
創(chuàng)建Kratos服務(wù)端
我們?cè)诖a當(dāng)中引入庫(kù),并且創(chuàng)建出來(lái)Server
:
首先贞盯,我們要?jiǎng)?chuàng)建Server
:
package server
import (
...
"github.com/tx7do/kratos-transport/transport/asynq"
)
// NewAsynqServer create a asynq server.
func NewAsynqServer(cfg *conf.Bootstrap, _ log.Logger, svc *service.TaskService) *machinery.Server {
ctx := context.Background()
srv := asynq.NewServer(
asynq.WithAddress(cfg.Server.Asynq.Broker),
)
registerAsynqTasks(ctx, srv, svc)
return srv
}
注冊(cè)任務(wù)回調(diào)
然后音念,把回調(diào)函數(shù)注冊(cè)進(jìn)服務(wù)器:
const (
testTask1 = "test_task_1"
testDelayTask = "test_delay_task"
testPeriodicTask = "test_periodic_task"
)
type TaskPayload struct {
Message string `json:"message"`
}
func registerAsynqTasks(ctx context.Context, srv *asynq.Server, svc *service.TaskService) {
var err error
err = asynq.RegisterSubscriber(srv, testTask1, svc.HandleTask1)
err = asynq.RegisterSubscriber(srv, testDelayTask, svc.HandleDelayTask)
err = asynq.RegisterSubscriber(srv, testPeriodicTask, svc.HandlePeriodicTask)
}
Asynq服務(wù)器注冊(cè)到Kratos
接著,調(diào)用kratos.Server
把Asynq服務(wù)器注冊(cè)到Kratos里去:
func newApp(ll log.Logger, rr registry.Registrar, ks *asynq.Server) *kratos.App {
return kratos.New(
kratos.ID(Service.GetInstanceId()),
kratos.Name(Service.Name),
kratos.Version(Service.Version),
kratos.Metadata(Service.Metadata),
kratos.Logger(ll),
kratos.Server(
ks,
),
kratos.Registrar(rr),
)
}
實(shí)現(xiàn)任務(wù)回調(diào)方法
最后躏敢,我們就可以在Service
里愉快的玩耍了:
package service
type TaskService struct {
log *log.Helper
}
func NewTaskService(
logger log.Logger,
) *TaskService {
l := log.NewHelper(log.With(logger, "module", "task/service/logger-service"))
return &TaskService{
log: l,
statusRepo: statusRepo,
realtimeRepo: realtimeRepo,
}
}
func (s *TaskService) HandleTask1() error {
fmt.Println("################ 執(zhí)行任務(wù)Task1 #################")
return nil
}
func (s *TaskService) HandleTask1(taskType string, taskData *TaskPayload) error {
s.log.Infof("[%s] Task Type: [%s], Payload: [%s]", time.Now().Format("2006-01-02 15:04:05"), taskType, taskData.Message)
return nil
}
func (s *TaskService) HandleDelayTask(taskType string, taskData *TaskPayload) error {
s.log.Infof("[%s] Delay Task Type: [%s], Payload: [%s]", time.Now().Format("2006-01-02 15:04:05"), taskType, taskData.Message)
return nil
}
func (s *TaskService) HandlePeriodicTask(taskType string, taskData *TaskPayload) error {
s.log.Infof("[%s] Periodic Task Type: [%s], Payload: [%s]", time.Now().Format("2006-01-02 15:04:05"), taskType, taskData.Message)
return nil
}
創(chuàng)建新任務(wù)
新建任務(wù)闷愤,有兩個(gè)方法:NewTask
和NewPeriodicTask
,內(nèi)部分別對(duì)應(yīng)著asynq.Client
和asynq.Scheduler
件余。
NewTask
是通過(guò)asynq.Client
將任務(wù)直接入了隊(duì)列讥脐。
普通任務(wù)
普通任務(wù)通常是入列后立即執(zhí)行的(如果不需要排隊(duì)的)遭居,下面就是最簡(jiǎn)單的任務(wù),一個(gè)類(lèi)型(Type)旬渠,一個(gè)負(fù)載數(shù)據(jù)(Payload)就構(gòu)成了一個(gè)最簡(jiǎn)單的任務(wù):
err = srv.NewTask(testTask1,
&DelayTask{Message: "delay task"},
)
當(dāng)然俱萍,你也可以添加一些的參數(shù),比如重試次數(shù)告丢、超時(shí)時(shí)間枪蘑、過(guò)期時(shí)間等……
// 最多重試3次,10秒超時(shí)岖免,20秒后過(guò)期
err = srv.NewTask(testTask1,
&DelayTask{Message: "delay task"},
asynq.MaxRetry(10),
asynq.Timeout(10*time.Second),
asynq.Deadline(time.Now().Add(20*time.Second)),
)
延遲任務(wù)(Delay Task)
延遲任務(wù)岳颇,顧名思義,也就是推遲到指定時(shí)間執(zhí)行的任務(wù)颅湘,我們可以有兩個(gè)參數(shù)可以注入:ProcessAt
和ProcessIn
话侧。
ProcessIn
指的是從現(xiàn)在開(kāi)始推遲多少時(shí)間執(zhí)行:
// 3秒后執(zhí)行
err = srv.NewTask(testDelayTask,
&DelayTask{Message: "delay task"},
asynq.ProcessIn(3*time.Second),
)
ProcessAt
指的是在指定的某一個(gè)具體時(shí)間執(zhí)行:
// 1小時(shí)后的時(shí)間點(diǎn)執(zhí)行
oneHourLater := now.Add(time.Hour)
err = srv.NewTask(testDelayTask,
&DelayTask{Message: "delay task"},
asynq.ProcessAt(oneHourLater),
)
周期性任務(wù)(Periodic Task)
周期性任務(wù)asynq.Scheduler
內(nèi)部是通過(guò)Crontab來(lái)實(shí)現(xiàn)定時(shí)的,定時(shí)器到點(diǎn)之后闯参,就調(diào)度任務(wù)瞻鹏。它默認(rèn)使用的是UTC時(shí)區(qū)。
// 每分鐘執(zhí)行一次
_, err = srv.NewPeriodicTask(
"*/1 * * * ?",
testPeriodicTask,
&DelayTask{Message: "periodic task"},
)
需要注意的是赢赊,若要保證周期性任務(wù)的持續(xù)調(diào)度執(zhí)行乙漓,asynq.Scheduler
必須要一直運(yùn)行著级历,否則調(diào)度將不會(huì)發(fā)生释移。調(diào)度器本身不參與任務(wù)的執(zhí)行,但是沒(méi)有它的存在寥殖,調(diào)度將不不復(fù)存在玩讳,也不會(huì)發(fā)生。
示例代碼
示例代碼可以在單元測(cè)試代碼中找到:https://github.com/tx7do/kratos-transport/tree/main/transport/asynq/server_test.go
參考資料
- Asynq - Github
- Celery - Github
- Celery 簡(jiǎn)介
- 分布式任務(wù)隊(duì)列Celery的實(shí)踐
- 分布式任務(wù)隊(duì)列 Celery
- Asynq: Golang distributed task queue library
- 異步任務(wù)處理系統(tǒng)嚼贡,如何解決業(yè)務(wù)長(zhǎng)耗時(shí)熏纯、高并發(fā)難題?
- Asynq: simple, reliable & efficient distributed task queue for your next Go project
- Asynq: Golang distributed task queue library