Golang微服務(wù)框架Kratos應(yīng)用分布式計(jì)劃任務(wù)隊(duì)列Asynq

Golang微服務(wù)框架Kratos應(yīng)用分布式計(jì)劃任務(wù)隊(duì)列Asynq

任務(wù)隊(duì)列(Task Queue) 一般用于跨線程或跨計(jì)算機(jī)分配工作的一種機(jī)制拄查。其本質(zhì)是生產(chǎn)者消費(fèi)者模型啃炸,生產(chǎn)者發(fā)送任務(wù)到消息隊(duì)列肤舞,消費(fèi)者負(fù)責(zé)處理任務(wù)电禀。

任務(wù)隊(duì)列的輸入是稱(chēng)為任務(wù)(Task)的工作單元扣甲。專(zhuān)用的工作進(jìn)程不斷監(jiān)視任務(wù)隊(duì)列以查找要執(zhí)行的新工作鸽凶。

在Golang語(yǔ)言里面币砂,我們有像AsynqMachinery這樣的類(lèi)似于Celery的分布式任務(wù)隊(duì)列。

什么是任務(wù)隊(duì)列

消息隊(duì)列(Message Queue)玻侥,一般來(lái)說(shuō)知道的人不少决摧。比如常見(jiàn)的:kafka、Rabbitmq凑兰、RocketMQ等掌桩。

任務(wù)隊(duì)列(Task Queue),聽(tīng)說(shuō)過(guò)這個(gè)概念的人不會(huì)太多姑食,清楚它的概念的人怕是更少波岛。

這兩個(gè)概念是有關(guān)系的,他們是怎樣的關(guān)系呢矢门?任務(wù)隊(duì)列(Task Queue)是消息隊(duì)列(Message Queue)的超集盆色。任務(wù)隊(duì)列是構(gòu)建在消息隊(duì)列之上的。消息隊(duì)列是任務(wù)隊(duì)列的一部分祟剔。

提起分布式任務(wù)隊(duì)列(Distributed Task Queue)隔躲,就不得不提PythonCelery。故而物延,下面我們來(lái)看Celery的架構(gòu)圖宣旱,以此來(lái)講解。其他的任務(wù)隊(duì)列也并不會(huì)與之有太大的差異性叛薯,基礎(chǔ)的原理是一致的浑吟。

Celery 的架構(gòu)中,由多臺(tái) Server 發(fā)起異步任務(wù)(Async Task)耗溜,發(fā)送任務(wù)到 Broker 的隊(duì)列中组力,其中的 Celery Beat 進(jìn)程可負(fù)責(zé)發(fā)起定時(shí)任務(wù)。當(dāng) Task 到達(dá) Broker 后抖拴,會(huì)將其分發(fā)給相應(yīng)的 Celery Worker 進(jìn)行處理燎字。當(dāng) Task 處理完成后腥椒,其結(jié)果存儲(chǔ)至 Backend

在上述過(guò)程中的 BrokerBackend候衍,Celery 并沒(méi)有去實(shí)現(xiàn)笼蛛,而是使用了已有的開(kāi)源實(shí)現(xiàn),例如 RabbitMQ 作為 Broker 提供消息隊(duì)列服務(wù)蛉鹿,Redis 作為 Backend 提供結(jié)果存儲(chǔ)服務(wù)滨砍。Celery 就像是抽象了消息隊(duì)列架構(gòu)中 ProducerConsumer 的實(shí)現(xiàn)妖异,將消息隊(duì)列中基本單位“消息”抽象成了任務(wù)隊(duì)列中的“任務(wù)”惋戏,并將異步、定時(shí)任務(wù)的發(fā)起和結(jié)果存儲(chǔ)等操作進(jìn)行了封裝随闺,讓開(kāi)發(fā)者可以忽略 AMQP日川、RabbitMQ 等實(shí)現(xiàn)細(xì)節(jié)蔓腐,為開(kāi)發(fā)帶來(lái)便利矩乐。

綜上所述,Celery 作為任務(wù)隊(duì)列是基于消息隊(duì)列的進(jìn)一步封裝回论,其實(shí)現(xiàn)依賴消息隊(duì)列散罕。

任務(wù)隊(duì)列的應(yīng)用場(chǎng)景

我們現(xiàn)在知道了任務(wù)隊(duì)列是什么,也知道了它的工作原理傀蓉。但是欧漱,我們并不知道它可以用來(lái)做什么。下面葬燎,我們就來(lái)看看误甚,它到底用在什么樣的場(chǎng)景下。

  1. 分布式任務(wù):可以將任務(wù)分發(fā)到多個(gè)工作者進(jìn)程或機(jī)器上執(zhí)行谱净,以提高任務(wù)處理速度窑邦。
  2. 定時(shí)任務(wù):可以在指定時(shí)間執(zhí)行任務(wù)。例如:每天定時(shí)備份數(shù)據(jù)壕探、日志歸檔冈钦、心跳測(cè)試、運(yùn)維巡檢李请。支持 crontab 定時(shí)模式
  3. 后臺(tái)任務(wù):可以在后臺(tái)執(zhí)行耗時(shí)任務(wù)瞧筛,例如圖像處理、數(shù)據(jù)分析等导盅,不影響用戶界面的響應(yīng)较幌。
  4. 解耦任務(wù):可以將任務(wù)與主程序解耦,以提高代碼的可讀性和可維護(hù)性白翻,解耦應(yīng)用程序最直接的好處就是可擴(kuò)展性和并發(fā)性能的提高乍炉。支持并發(fā)執(zhí)行任務(wù),同時(shí)支持自動(dòng)動(dòng)態(tài)擴(kuò)展。
  5. 實(shí)時(shí)處理:可以支持實(shí)時(shí)處理任務(wù)恩急,例如即時(shí)通訊杉畜、消息隊(duì)列等泌辫。

Asynq概述

Asynq是一個(gè)使用Go語(yǔ)言實(shí)現(xiàn)的分布式任務(wù)隊(duì)列和異步處理庫(kù)柠衍,它由Redis提供支持再沧,它提供了輕量級(jí)的尺栖、易于使用的API尚卫,并且具有高可擴(kuò)展性和高可定制化性补疑。其作者Ken Hibino班利,任職于Google洞豁。

Asynq主要由以下幾個(gè)組件組成:

  • 任務(wù)(Task):需要被異步執(zhí)行的操作窗看;
  • 處理器(Processor):負(fù)責(zé)執(zhí)行任務(wù)的工作進(jìn)程茸歧;
  • 隊(duì)列(Queue):存放待執(zhí)行任務(wù)的隊(duì)列;
  • 調(diào)度器(Scheduler):根據(jù)規(guī)則將任務(wù)分配給不同的處理器進(jìn)行執(zhí)行显沈。

通過(guò)使用Asynq软瞎,我們可以非常輕松的實(shí)現(xiàn)異步任務(wù)處理,同時(shí)還可以提供高效率拉讯、高可擴(kuò)展性和高自定義性的處理方案涤浇。

Asynq的特點(diǎn)

  • 保證至少執(zhí)行一次任務(wù)
  • 任務(wù)寫(xiě)入Redis后可以持久化
  • 任務(wù)失敗之后,會(huì)自動(dòng)重試
  • worker崩潰自動(dòng)恢復(fù)
  • 可是實(shí)現(xiàn)任務(wù)的優(yōu)先級(jí)
  • 任務(wù)可以進(jìn)行編排
  • 任務(wù)可以設(shè)定執(zhí)行時(shí)間或者最長(zhǎng)可執(zhí)行的時(shí)間
  • 支持中間件
  • 可以使用 unique-option 來(lái)避免任務(wù)重復(fù)執(zhí)行魔慷,實(shí)現(xiàn)唯一性
  • 支持 Redis Cluster 和 Redis Sentinels 以達(dá)成高可用性
  • 作者提供了Web UI & CLI Tool讓大家查看任務(wù)的執(zhí)行情況

Asynq可視化監(jiān)控

Asynq提供了兩種監(jiān)控手段:CLI和Web UI只锭。

命令行工具CLI

go install github.com/hibiken/asynq/tools/asynq@latest

Web UI

Asynqmon是一個(gè)基于Web的工具,用于監(jiān)視管理Asynq的任務(wù)和隊(duì)列院尔,有關(guān)詳細(xì)的信息可以參閱工具的README蜻展。

Web UI我們可以通過(guò)Docker的方式來(lái)進(jìn)行安裝:

docker pull hibiken/asynqmon:latest

docker run -d \
    --name asynq \
    -p 8080:8080 \
    hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379

安裝好Web UI之后,我們就可以打開(kāi)瀏覽器訪問(wèn)管理后臺(tái)了:http://localhost:8080

  • 儀表盤(pán)
  • 任務(wù)視圖
  • 性能

Kratos下如何應(yīng)用Asynq邀摆?

我們將分布式任務(wù)隊(duì)列以transport.Server的形式整合進(jìn)微服務(wù)框架Kratos纵顾。

目前,go里面有兩個(gè)分布式任務(wù)隊(duì)列可用:

我已經(jīng)對(duì)這兩個(gè)庫(kù)進(jìn)行了支持:

Docker部署依賴組件

因?yàn)樗蕾嘡edis隧熙,因此片挂,我們使用Docker的方式安裝Redis的服務(wù)器:

docker pull bitnami/redis:latest

docker run -itd \
    --name redis-test \
    -p 6379:6379 \
    -e ALLOW_EMPTY_PASSWORD=yes \
    bitnami/redis:latest

安裝依賴庫(kù)

我們需要在項(xiàng)目中安裝Asynq的依賴庫(kù):

go get -u github.com/tx7do/kratos-transport/transport/asynq

創(chuàng)建Kratos服務(wù)端

我們?cè)诖a當(dāng)中引入庫(kù),并且創(chuàng)建出來(lái)Server

首先贞盯,我們要?jiǎng)?chuàng)建Server

package server

import (
    ...
    "github.com/tx7do/kratos-transport/transport/asynq"
)

// NewAsynqServer create a asynq server.
func NewAsynqServer(cfg *conf.Bootstrap, _ log.Logger, svc *service.TaskService) *machinery.Server {
    ctx := context.Background()

    srv := asynq.NewServer(
        asynq.WithAddress(cfg.Server.Asynq.Broker),
    )

    registerAsynqTasks(ctx, srv, svc)

    return srv
}

注冊(cè)任務(wù)回調(diào)

然后音念,把回調(diào)函數(shù)注冊(cè)進(jìn)服務(wù)器:

const (
    testTask1        = "test_task_1"
    testDelayTask    = "test_delay_task"
    testPeriodicTask = "test_periodic_task"
)

type TaskPayload struct {
    Message string `json:"message"`
}

func registerAsynqTasks(ctx context.Context, srv *asynq.Server, svc *service.TaskService) {
    var err error
    err = asynq.RegisterSubscriber(srv, testTask1, svc.HandleTask1)
    err = asynq.RegisterSubscriber(srv, testDelayTask, svc.HandleDelayTask)
    err = asynq.RegisterSubscriber(srv, testPeriodicTask, svc.HandlePeriodicTask)
}

Asynq服務(wù)器注冊(cè)到Kratos

接著,調(diào)用kratos.Server把Asynq服務(wù)器注冊(cè)到Kratos里去:

func newApp(ll log.Logger, rr registry.Registrar, ks *asynq.Server) *kratos.App {
    return kratos.New(
        kratos.ID(Service.GetInstanceId()),
        kratos.Name(Service.Name),
        kratos.Version(Service.Version),
        kratos.Metadata(Service.Metadata),
        kratos.Logger(ll),
        kratos.Server(
            ks,
        ),
        kratos.Registrar(rr),
    )
}

實(shí)現(xiàn)任務(wù)回調(diào)方法

最后躏敢,我們就可以在Service里愉快的玩耍了:

package service

type TaskService struct {
    log          *log.Helper
}

func NewTaskService(
    logger log.Logger,
) *TaskService {
    l := log.NewHelper(log.With(logger, "module", "task/service/logger-service"))
    return &TaskService{
        log:          l,
        statusRepo:   statusRepo,
        realtimeRepo: realtimeRepo,
    }
}

func (s *TaskService) HandleTask1() error {
    fmt.Println("################ 執(zhí)行任務(wù)Task1 #################")
    return nil
}

func (s *TaskService) HandleTask1(taskType string, taskData *TaskPayload) error {
    s.log.Infof("[%s] Task Type: [%s], Payload: [%s]", time.Now().Format("2006-01-02 15:04:05"), taskType, taskData.Message)
    return nil
}

func (s *TaskService) HandleDelayTask(taskType string, taskData *TaskPayload) error {
    s.log.Infof("[%s] Delay Task Type: [%s], Payload: [%s]", time.Now().Format("2006-01-02 15:04:05"), taskType, taskData.Message)
    return nil
}

func (s *TaskService) HandlePeriodicTask(taskType string, taskData *TaskPayload) error {
    s.log.Infof("[%s] Periodic Task Type: [%s], Payload: [%s]", time.Now().Format("2006-01-02 15:04:05"), taskType, taskData.Message)
    return nil
}

創(chuàng)建新任務(wù)

新建任務(wù)闷愤,有兩個(gè)方法:NewTaskNewPeriodicTask,內(nèi)部分別對(duì)應(yīng)著asynq.Clientasynq.Scheduler件余。

NewTask是通過(guò)asynq.Client將任務(wù)直接入了隊(duì)列讥脐。

普通任務(wù)

普通任務(wù)通常是入列后立即執(zhí)行的(如果不需要排隊(duì)的)遭居,下面就是最簡(jiǎn)單的任務(wù),一個(gè)類(lèi)型(Type)旬渠,一個(gè)負(fù)載數(shù)據(jù)(Payload)就構(gòu)成了一個(gè)最簡(jiǎn)單的任務(wù):

err = srv.NewTask(testTask1, 
    &DelayTask{Message: "delay task"},
)

當(dāng)然俱萍,你也可以添加一些的參數(shù),比如重試次數(shù)告丢、超時(shí)時(shí)間枪蘑、過(guò)期時(shí)間等……

// 最多重試3次,10秒超時(shí)岖免,20秒后過(guò)期
err = srv.NewTask(testTask1, 
    &DelayTask{Message: "delay task"},
    asynq.MaxRetry(10),
    asynq.Timeout(10*time.Second),
    asynq.Deadline(time.Now().Add(20*time.Second)),
)

延遲任務(wù)(Delay Task)

延遲任務(wù)岳颇,顧名思義,也就是推遲到指定時(shí)間執(zhí)行的任務(wù)颅湘,我們可以有兩個(gè)參數(shù)可以注入:ProcessAtProcessIn话侧。

ProcessIn指的是從現(xiàn)在開(kāi)始推遲多少時(shí)間執(zhí)行:

// 3秒后執(zhí)行
err = srv.NewTask(testDelayTask,
    &DelayTask{Message: "delay task"},
    asynq.ProcessIn(3*time.Second),
)

ProcessAt指的是在指定的某一個(gè)具體時(shí)間執(zhí)行:

// 1小時(shí)后的時(shí)間點(diǎn)執(zhí)行
oneHourLater := now.Add(time.Hour)
err = srv.NewTask(testDelayTask,
    &DelayTask{Message: "delay task"},
    asynq.ProcessAt(oneHourLater),
)

周期性任務(wù)(Periodic Task)

周期性任務(wù)asynq.Scheduler內(nèi)部是通過(guò)Crontab來(lái)實(shí)現(xiàn)定時(shí)的,定時(shí)器到點(diǎn)之后闯参,就調(diào)度任務(wù)瞻鹏。它默認(rèn)使用的是UTC時(shí)區(qū)。

// 每分鐘執(zhí)行一次
_, err = srv.NewPeriodicTask(
    "*/1 * * * ?",
    testPeriodicTask,
    &DelayTask{Message: "periodic task"},
)

需要注意的是赢赊,若要保證周期性任務(wù)的持續(xù)調(diào)度執(zhí)行乙漓,asynq.Scheduler必須要一直運(yùn)行著级历,否則調(diào)度將不會(huì)發(fā)生释移。調(diào)度器本身不參與任務(wù)的執(zhí)行,但是沒(méi)有它的存在寥殖,調(diào)度將不不復(fù)存在玩讳,也不會(huì)發(fā)生。

示例代碼

示例代碼可以在單元測(cè)試代碼中找到:https://github.com/tx7do/kratos-transport/tree/main/transport/asynq/server_test.go

參考資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末粤策,一起剝皮案震驚了整個(gè)濱河市樟澜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌叮盘,老刑警劉巖秩贰,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異柔吼,居然都是意外死亡毒费,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)愈魏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)觅玻,“玉大人想际,你說(shuō)我怎么就攤上這事∠澹” “怎么了胡本?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)畸悬。 經(jīng)常有香客問(wèn)我打瘪,道長(zhǎng),這世上最難降的妖魔是什么傻昙? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任闺骚,我火速辦了婚禮,結(jié)果婚禮上妆档,老公的妹妹穿的比我還像新娘僻爽。我一直安慰自己,他們只是感情好贾惦,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布胸梆。 她就那樣靜靜地躺著,像睡著了一般须板。 火紅的嫁衣襯著肌膚如雪碰镜。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,111評(píng)論 1 285
  • 那天习瑰,我揣著相機(jī)與錄音绪颖,去河邊找鬼。 笑死甜奄,一個(gè)胖子當(dāng)著我的面吹牛柠横,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播课兄,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼牍氛,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了烟阐?” 一聲冷哼從身側(cè)響起搬俊,我...
    開(kāi)封第一講書(shū)人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蜒茄,沒(méi)想到半個(gè)月后唉擂,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡扩淀,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年楔敌,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片驻谆。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡卵凑,死狀恐怖庆聘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情勺卢,我是刑警寧澤伙判,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站黑忱,受9級(jí)特大地震影響宴抚,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜甫煞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一菇曲、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧抚吠,春花似錦常潮、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至萧朝,卻和暖如春岔留,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背检柬。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工献联, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人厕吉。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓酱固,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親头朱。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容