在響應應用請求的過程中, 有時候會遇到比較耗時的任務, 比如給用戶發(fā)送郵件, 耗時任務時間不可能控, 很可能超過 1s, 為了給用戶比較好的體驗, 一般會控制請求響應時間(RT, response time)在300ms內(不考慮網絡波動), 甚至在 200ms 內. 面對這樣的工作場景, 就需要使用異步任務進行處理.
Go協(xié)程與異步
從一段簡單的代碼開始:
func TestTask(t *testing.T) {
task()
log.Print("req done")
}
func task() {
// 模擬耗時任務
time.Sleep(time.Second)
log.Print("task done")
}
- 代碼在 Goland 中編寫, 同時也推薦使用 Goland 進行 Go 開發(fā)
- 這里使用單測(
test
)演示代碼:- 輸入
test
就可以快速生成代碼(Goland 中稱之為live templates
, 其實就是預設好的代碼片段) - 在單測點擊可以執(zhí)行: 1. 點擊左側(gutter icon)的運行圖標; 2. 函數上右鍵菜單鍵; 3. 快捷鍵
ctl-shift-R
- 輸入
上面使用 task()
模擬耗時 1s 的任務, 整個test代表一次請求, 執(zhí)行如下:
=== RUN TestTask
2022/11/17 20:11:15 task done
2022/11/17 20:11:15 req done
--- PASS: TestTask (1.00s)
PASS
Go基礎知識: 天生并發(fā), 使用
go
關鍵字就可以開新協(xié)程, 將代碼放到新協(xié)程中執(zhí)行
func TestTask(t *testing.T) {
go task()
log.Print("req done")
}
func task() {
// 模擬耗時任務
time.Sleep(time.Second)
log.Print("task done")
}
- 只需要在
task()
前添加go
關鍵字, 就可以新開一個協(xié)程, 將task()
在新協(xié)程中執(zhí)行
不過在這里, 并沒有得到預期的結果:
=== RUN TestTask
2022/11/17 20:16:08 req done
--- PASS: TestTask (0.00s)
PASS
- 輸出顯示:
task()
中的日志沒有輸出, 看起來像沒有執(zhí)行
Go基礎知識: Go的代碼都在協(xié)程中執(zhí)行, 入口
main()
函數是主協(xié)程, 之后使用go
關鍵詞開的協(xié)程都是子協(xié)程, 主協(xié)程退出后, 程序會終止(exit)
也就是說上面的 TestTask()
(主協(xié)程) 和 go task()
(子協(xié)程)都執(zhí)行了, 但是主協(xié)程執(zhí)行完, 程序退出了, 子協(xié)程沒執(zhí)行完(或者沒調度到), 就被強制退出了
簡單 Go 并發(fā): 任務編排
上面的例子, 常見有 3 種解決方案:
- 方案1: 等子協(xié)程執(zhí)行完
func TestTask(t *testing.T) {
go task()
time.Sleep(time.Second) // 等待子協(xié)程執(zhí)行完
log.Print("req done")
}
func task() {
// 模擬耗時任務
time.Sleep(time.Second)
log.Print("task done")
}
- 方案2: 使用
WaitGroup
func TestTask(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
task()
wg.Done()
}()
wg.Wait()
log.Print("req done")
}
func task() {
// 模擬耗時任務
time.Sleep(time.Second)
log.Print("task done")
}
WaitGroup
其實很好理解, 就是同時等待一組任務完成, 它分為 3 步: 1. Add
: 總共有多少任務; 2. Done()
: 表示任務執(zhí)行完; 3. Wait()
: 等待所有任務完成
- 方案3: 使用 Go 的并發(fā)語言
chan
func TestTask(t *testing.T) {
ch := make(chan struct{}) // 初始化 chan
go func() {
task()
ch <- struct{}{} // 發(fā)送到 chan
}()
<-ch // 從 chan 獲取
log.Print("req done")
}
func task() {
// 模擬耗時任務
time.Sleep(time.Second)
log.Print("task done")
}
Go基礎知識: 通過
chan T
就可以申明T
類型的 chan, 供協(xié)程間進行通信;struct{}
是 Go 中0 memory use
(0內存占用)類型, 適合上面使用 chan 進行 控制 而不需要 數據 進行通信的情況
雖然只是3個簡單的 demo code, Go 提供的 2 種并發(fā)能力都有展示:
- 傳統(tǒng)并發(fā)原語: 大部分集中在
sync
包下, 上面案例2中的sync.WaitGroup
就是其中之一 - Go 基于 CSP 的并發(fā)編程范式: 包括
go chan select
, 上面的案例3中展示了go+chan
的基本用法
簡單 Go 并發(fā)講完了, 那任務編排又是啥? 其實, 某等程度上, 任務編排=異步, 任務需要 分工 完成時, 也就是一個任務相對于另一個任務需要 異步處理. 而任務編排, 恰恰是 Go 語言中基于 chan 進行并發(fā)編程的強項.
Go 中有一個大的方向贰逾,就是任務編排用 Channel,共享資源保護用傳統(tǒng)并發(fā)原語。
回到最初的代碼, 在實際使用的使用, 到底使用的是哪種方案呢? 答案是 方案1. 看看接近真實場景的代碼
func TestTrace(t *testing.T) {
for { // 服務以 daemon 的方式持續(xù)運行
// 不斷處理用戶的請求
{
go task()
log.Print("req done")
}
}
}
func task() {
// 模擬耗時任務
time.Sleep(time.Second)
log.Print("task done")
}
也就是真實場景下, 主協(xié)程所在的 server 會一直常駐, 請求(request)所有的子協(xié)程不用擔心還沒執(zhí)行完就被強制退出了
避坑: 野生 Goroutine
在繼續(xù)講解之前, 一定要提一下使用 go
開協(xié)程的一個坑, 或者說一個非常重要的基礎知識:
Go基礎知識: panic只對當前goroutine的defer有效
Go中出現 panic()
, 程序會立即終止:
func TestPanic(t *testing.T) {
panic("panic")
log.Print("end")
}
=== RUN TestPanic
--- FAIL: TestPanic (0.00s)
panic: panic [recovered]
panic: panic
goroutine 118 [running]:
testing.tRunner.func1.2({0x103e15940, 0x10405c208})
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1396 +0x1c8
testing.tRunner.func1()
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1399 +0x378
panic({0x103e15940, 0x10405c208})
/opt/homebrew/opt/go/libexec/src/runtime/panic.go:884 +0x204
...
...
testing.tRunner(0x14000603040, 0x104058678)
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1446 +0x10c
created by testing.(*T).Run
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1493 +0x300
Process finished with the exit code 1
- 可以看到,
panic
后程序直接退出,panic
后的log.Print("end")
并沒有執(zhí)行
當然, 想要程序健壯一些, panic
是可以 吃掉
的:
func TestPanic(t *testing.T) {
defer func() {
if r := recover(); r != nil {
log.Print(r)
}
}()
panic("panic")
log.Print("end")
}
=== RUN TestPanic
2022/11/17 22:25:08 panic
--- PASS: TestPanic (0.00s)
PASS
使用 recover()
對 panic()
進行恢復, 程序就不會崩掉(exit)
但是, 一定要注意
panic只對當前goroutine的defer有效!
panic只對當前goroutine的defer有效!
panic只對當前goroutine的defer有效!
重要的事情說三遍.
func TestPanic(t *testing.T) {
defer func() {
if r := recover(); r != nil {
log.Print(r)
}
}()
go func() {
panic("panic")
}()
log.Print("end")
}
=== RUN TestPanic
panic: panic
goroutine 88 [running]:
...
...
...
Process finished with the exit code 1
而 go 里面開協(xié)程又是如此的方便, 簡單一個 go
關鍵字即可, 所以大家給這種情況起了個外號: 野生 Goroutine. 最簡單的做法就是對協(xié)程進行一次封裝, 比如這樣:
package gox
// Run start with a goroutine
func Run(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Print(r)
}
}()
fn()
}()
}
原本的 go task()
, 使用 gox.Run(task)
進行替換, 就可以 task 出現 panic 的時候, 程序還能恢復
Trace: 異步任務還能進行鏈路追蹤么?
隨著可觀測技術的不斷演進, 基建上的不斷提升, 鏈路追蹤技術也進行了演進
- trace1.0: opentracing jaeger
- trace2.0: otel
當用戶請求進來時, 可以通過 traceId
串聯(lián)起用戶的完成調用鏈, 監(jiān)控和排查問題能力大大增強!
{
"code": 200,
"status": 200,
"msg": "成功",
"errors": null,
"data": "env-t0",
"timestamp": 1668696256,
"traceId": "..."
}
trace 通過請求(request)中的 context
, 不斷向下傳遞, 從而將當前請求的所用調用通過同一個 traceId 串聯(lián)起來
func TestTrace(t *testing.T) {
Op1(ctx) // 比如操作了 DB
Op2(ctx) // 比如操作了 cache
Task(ctx)
log.Print("req done")
}
func Task(ctx context.Context) {
// 使用自定義span, 將當前操作上報到trace
_, span := otel.GetTracerProvider().Tracer("task").Start(ctx, "xxxTask")
defer span.End()
// 模擬耗時任務
time.Sleep(time.Second)
log.Print("task done")
}
如同上面演示的 demo code 演示:
- 通過
ctx
, 將當前請求(request)的所有操作使用同一個 traceId 串起來 - otel 默認了很多操作的 trace 上報, 比如 mysql/redis/kafka 等等, 也可以使用自定義
span
的方式進行新增
如果要進行耗時任務異步處理, 直覺上直接 go
一下:
func TestTrace(t *testing.T) {
Op1(ctx) // 比如操作了 DB
Op2(ctx) // 比如操作了 cache
go Task(ctx)
log.Print("req done")
}
這時候腦海中陡然蹦出一個聲音: 野生Goroutine
func TestTrace(t *testing.T) {
Op1(ctx) // 比如操作了 DB
Op2(ctx) // 比如操作了 cache
gox.RunCtx(ctx, Task) // 在 gox.Run 的基礎上, 添加 ctx 支持
log.Print("req done")
}
可是等測試一下, 就會發(fā)現, task()
并沒有執(zhí)行!
細心的小伙伴就會發(fā)現, 這和開始的例子有點像呀, 而且對比下就會知道, 此處多了一個 ctx
:
func TestTask(t *testing.T) {
go task(ctx)
log.Print("req done")
}
func task() {
// 模擬耗時任務
time.Sleep(time.Second)
log.Print("task done")
}
- 沒有 ctx 的時候, 因為主協(xié)程一直在, 子協(xié)程可以處理完任務在退出, 也就是子協(xié)程的生命周期都在主協(xié)程內
- 有 ctx 的時候, 由于 ctx 的存在, 請求(request)中主協(xié)程需要接受 ctx 控制, 異步處理后, 請求也就結束了(上面
log.Print("req done")
模擬的部分), 這是 ctx 就會控制子協(xié)程一起結束掉, 也就是子協(xié)程的生命周期都在當前請求的協(xié)程內
于是, 又有了 2 種處理辦法:
- 簡單做法, 就像上面一樣, 沒有 ctx, 就沒有問題了嘛. 如果用一句話來概括這種方法:
面試官: 你可以回家等消息了
- 既然又要執(zhí)行異步任務, 又要有 trace, 那把 trace 繼續(xù)傳下, 用一個新的 ctx 就好了嘛
上代碼:
- 復制 ctx, 把 trace 繼續(xù)傳下去
package ctxkit
// Clone 復制 ctx 中對應 key 的值润绵,移除父級 cancel毯炮。
func Clone(preCtx context.Context, keys ...interface{}) context.Context {
newCtx := context.Background()
// 從 pctx 開啟一個子 span她肯,來傳遞 traceId
_, ospan := otel.GetTracerProvider().
Tracer(trace_in.InstrumentationPrefix+"/ctxkit").
Start(preCtx, "ctxkit.Clone", otel_trace.WithAttributes(
trace_attr.AttrAsyncFlag.Int(1), // 標記為異步
))
defer ospan.End()
newCtx = trace.ContextWithSpan(newCtx, ospan)
return ctxClone(newCtx, preCtx, keys...)
}
// CloneWithoutSpan 功能同 Clone撤逢,但不會創(chuàng)建 trace span窒朋,建議在大批數據 for 循環(huán)之前使用争拐,避免 span 鏈路過長腋粥。
func CloneWithoutSpan(preCtx context.Context, keys ...interface{}) context.Context {
tid := trace_in.GetOtelTraceId(preCtx)
if tid == "" {
tid = trace_in.FakeTraceId()
}
newCtx := context.WithValue(context.Background(), ictx.CtxKeyFakeTraceId, tid)
return ctxClone(newCtx, preCtx, keys...)
}
func ctxClone(baseCtx, preCtx context.Context, keys ...interface{}) context.Context {
for _, key := range _ctxKeys {
if v := preCtx.Value(key); v != nil {
baseCtx = context.WithValue(baseCtx, key, v)
}
}
keys = append(keys, _strKeys...)
for _, key := range keys {
if v := preCtx.Value(key); v != nil {
baseCtx = context.WithValue(baseCtx, key, v) //nolint
}
}
return baseCtx
}
- 實際使用
func TestTask(t *testing.T) {
nexCtx := ctxkit.Clone(ctx)
go task(newCtx)
log.Print("req done")
}
func task() {
// 模擬耗時任務
time.Sleep(time.Second)
log.Print("task done")
}
異步任務: 能否更優(yōu)雅點
如果是從請求過來的, 請求中自帶 trace, 并會在請求(request)的初始化的時候建 trace 寫入到請求的 ctx 中, 那如果直接執(zhí)行一個異步任務呢?
那就需要手動初始化 trace 了.
上代碼:
- 封裝異步任務(job): 封裝trace -> clone ctx -> 指標收集(jobMetricsWrap) -> 野生Goroutine捕獲
package job
// AsyncJob 異步任務。
// name: 任務名架曹。
// return: waitFunc灯抛,調用可以等待任務完成。
func AsyncJob(ctx context.Context, name string, fn func(ctx context.Context) error, opts ...Option) func() {
ctx = tel_in.CtxAdjuster(ctx) // 初始化 trace
newCtx := ctxkit.Clone(ctx)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// 指標收集
jobMetricsWrap(newCtx, fn, applyOption(name, true, opts...))
}()
return wg.Wait
}
- 實際使用:
func TestJob(t *testing.T) {
ctx := context.Background()
// 異步任務
// 邏輯在協(xié)程中執(zhí)行音瓷,已包裝 recover 邏輯
wait := job.AsyncJob(ctx, "your_task_name", func(ctx context.Context) error {
// 內部處理使用傳入的 ctx对嚼,已經執(zhí)行過 citkit.Clone
return doAsyncTask(ctx)
})
wait() // 如果需要等待任務結束則調用 wait,不需要則忽略返回值
}
func doAsyncTask(ctx context.Context) error {
logs.InfoCtx(ctx, "async task done")
return nil
}
=== RUN TestJob
2022-11-18T10:18:39.014+0800 INFO tests/async_job_test.go:250 async task done {"traceId": "..."}
--- PASS: TestJob (0.00s)
PASS
PS: 這里需要查看效果, 所以調用了
wait()
等待異步任務結束, 實際使用可以直接使用job.AsyncJob()
或者_ = job.AsyncJob()
最后一起來看看 trace 使用的效果:
todo: img
Asynq: 專業(yè)異步任務框架
如果只是 異步一下, 上面講解的內容也基本夠用了; 如果有重度異步任務使用, 就得考慮專業(yè)的異步任務隊列框架了, Go 中可以選擇 Async
Features
- Guaranteed at least one execution of a task
- Scheduling of tasks
- Retries of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- Weighted priority queues
- Strict priority queues
- Low latency to add a task since writes are fast in Redis
- De-duplication of tasks using unique option
- Allow timeout and deadline per task
- Allow aggregating group of tasks to batch multiple successive operations
- Flexible handler interface with support for middlewares
-
Ability to pause queue
to stop processing tasks from the queue - Periodic Tasks
- Support Redis Cluster for automatic sharding and high availability
- Support Redis Sentinels for high availability
- Integration with Prometheus to collect and visualize queue metrics
-
Web UI
to inspect and remote-control queues and tasks -
CLI
to inspect and remote-control queues and tasks
整體架構圖
todo
實際使用
使用的 demo 就不貼了, asynq 的文檔很詳細, 說一下具體實踐中遇到的 2個 case:
- 使用
web UI
: 處于安全考慮, 設置了ReadOnly
h := asynqmon.New(asynqmon.Options{
RootPath: "/monitoring", // RootPath specifies the root for asynqmon app
RedisConnOpt: tasks.GetRedis(),
ReadOnly: true, // admin web can't operation
})
r := mux.NewRouter()
r.PathPrefix(h.RootPath()).Handler(h)
srv := &http.Server{
Handler: r,
Addr: ":8080",
}
PS: 使用
web UI
由于涉及到使用新的端口, 而應用部署已經上 k8s 了, 如何順利訪問就需要一系列運維操作, 留個坑, 以后有機會再填
- 測試環(huán)境OK, 線上報錯:
recoverer: could not move task to archive: INTERNAL_ERROR: redis eval error: ERR 'asynq:{}:t:' and 'asynq:{}:active' not in the same slot
對比發(fā)現, 是測試和線上使用的不同類型的 redis 實例導致的, 搜索云服務的幫助文檔:
todo
集群架構實例的命令限制: 如需在集群架構實例中執(zhí)行下述受限制的命令绳慎,請使用hash tag確保命令所要操作的key都分布在1個hash slot中
但是查看 asqnq 源碼: 以 enqueue
操作為例, lua 操作中的部分 key 無法通過外部添加 hash tag
// github.com/hibiken/asynq/internal/rdb/rdb.go
// enqueueCmd enqueues a given task message.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task message data
// ARGV[2] -> task ID
// ARGV[3] -> current unix time in nsec
//
// Output:
// Returns 1 if successfully enqueued
// Returns 0 if task ID already exists
var enqueueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 1 then
return 0
end
redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"state", "pending",
"pending_since", ARGV[3])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)
最終, 通過使用線上另一臺主從版redis解決問題
寫在最后
到這里, 工作用Go: 異步任務怎么寫
就暫時告一段落了, 這個過程中:
- 一些計算機基礎概念的理解: 同步與異步, 異步與任務編排, 協(xié)程與異步, 協(xié)程與生命周期
- 一些 Go 語言的基礎知識以及基礎不牢地動山搖的坑: 野生Goroutine, panic&recover
- 可觀測的實踐之一: trace
- 專業(yè)的異步任務框架 Asynq 以及踩坑記
一起擁抱變化, 直面問題和挑戰(zhàn), 不斷精進, 我們下個話題再見????.