斷續(xù)器
計(jì)時(shí)器 是當(dāng)想在未來(lái)做一些事情 - tickers 是用于定期做一些事情跛蛋。這里是一個(gè)例行程序登馒,周期性執(zhí)行直到停止式曲。
代碼使用與計(jì)時(shí)器的機(jī)制類似:發(fā)送值到通道义矛。這里我們將使用通道上的一個(gè)范圍內(nèi)來(lái)迭代发笔,每隔500ms發(fā)送一次。
代碼可以像定時(shí)器一樣停止凉翻,當(dāng)代碼停止后了讨,它不會(huì)再其通道上接收值。
package main
import (
"time"
"fmt"
)
func main(){
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C{
fmt.Println("Tick at ",t)
}
}()
time.Sleep(time.Millisecond * 1600 )
ticker.Stop()
fmt.Println("Ticker stopped")
}
Go 工作池
在這個(gè)例子中制轰,我們將實(shí)現(xiàn)如何使用 goroutine 和 channel 實(shí)現(xiàn)一個(gè)工作池量蕊。
這里是工作程序(worker),我們將運(yùn)行幾個(gè)并發(fā)實(shí)例艇挨。這些工作程序(worker)將在工作 chan 上接收工作残炮,并將發(fā)送相應(yīng)的結(jié)果。這里使用 延時(shí)1s的方式模擬工作的過(guò)程缩滨。
為了使用工作程序(worker)池势就,需要向他們發(fā)送任務(wù)并收集相關(guān)結(jié)果。這里實(shí)現(xiàn)的時(shí)候使用了兩個(gè)通道脉漏。這啟動(dòng)了 3 個(gè)worker苞冯,最初被阻止,因?yàn)闆](méi)有任務(wù)侧巨。
然后手機(jī)作業(yè)的所有結(jié)果舅锄。
package main
import (
"fmt"
"time"
)
//worker本體函數(shù)
func worker(id int,job <-chan int, result chan<- int){
for j:=range job{
fmt.Println("worker",id,"started job",j)
time.Sleep(time.Second)
fmt.Println("worker",id,"finished job",j)
result<- j*2
}
}
func main(){
jobs:= make(chan int,100)
results := make(chan int,100)
//創(chuàng)建3個(gè)worker
for w:=1 ; w<= 3;w++{
go worker(w,jobs,results)
}
//分配5個(gè)任務(wù)
for j:=1 ;j<= 5 ; j++{
jobs <- j
}
close(jobs)
//等待所有工作完成
for a :=1 ; a<=5 ; a++{
<- results
}
}
Go 速率限制
速率限制是控制資源利用和維持服務(wù)質(zhì)量的重要機(jī)制。通過(guò) goroutines司忱,channel皇忿,ticker 都可以優(yōu)雅的支持速率限制。
首先我們來(lái)看一下基本速率限制坦仍。假設(shè)想限制對(duì)傳入請(qǐng)求的處理鳍烁。我們需要在同一個(gè)通道上處理。
這個(gè)限制器通道將 2000ms 接收一個(gè)值繁扎。這是速率限制方案中的調(diào)節(jié)器幔荒。
通過(guò)在服務(wù)每個(gè)請(qǐng)求之前阻塞來(lái)自限制器信道的接收糊闽,我們限制自己每200ms接收一個(gè)請(qǐng)求。
我們可能希望在速率限制方案中允許端脈沖串請(qǐng)求爹梁,同時(shí)保持總體速率限制右犹。可以通過(guò)緩沖的限制器通道來(lái)實(shí)現(xiàn)姚垃。這個(gè) burstyLimiter通道將允許最多 3 個(gè)事件的突發(fā)傀履。
填充通道以表示允許突發(fā)。
每2000ms莉炉,將嘗試向 burstyLimiter添加一個(gè)新值,最大限制為 3 〔耆現(xiàn)在模擬 5個(gè)更多的傳入請(qǐng)求絮宁。這些傳入請(qǐng)求的前三個(gè)未超過(guò)burstyLimiter 值。
package main
import (
"time"
"fmt"
)
func main(){
requests := make(chan int , 5)
for i:= 1 ; i<= 5 ; i++{
requests <- i
}
close(requests)
limiter := time.Tick(time.Millisecond * 2000)
for req := range requests{
<- limiter
fmt.Println("request",req,time.Now())
}
burstyLimiter := make(chan time.Time , 3)
for i:= 0 ; i<3;i++{
burstyLimiter <- time.Now()
}
go func() {
for t:= range time.Tick(time.Millisecond * 2000){
burstyLimiter <- t
}
}()
burstyRequests := make(chan int , 5)
for i:=1 ; i<= 5 ; i++{
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests{
<- burstyLimiter
fmt.Println("request",req,time.Now())
}
}
Go原子計(jì)數(shù)器
go語(yǔ)言中管理狀態(tài)的主要機(jī)制是通過(guò)通道進(jìn)行通信服协。在過(guò)去的文章中绍昂,我們已經(jīng)看到了這一點(diǎn),例如工作池偿荷。還有一些其他選項(xiàng)用于管理狀態(tài)窘游。這里我們將使用 sync/atomic 包來(lái)實(shí)現(xiàn)由多個(gè) goroutine 訪問(wèn)的原子計(jì)數(shù)器。
使用一個(gè)無(wú)符號(hào)整數(shù)表示計(jì)數(shù)器(正數(shù))
為了模擬并發(fā)更新跳纳,將啟動(dòng) 50個(gè) goroutine 忍饰, 每個(gè)增量計(jì)數(shù)器大學(xué)是 1ms。
為了原子地遞增計(jì)數(shù)器寺庄,這里使用 AddUint64() 函數(shù)艾蓝,在 ops 計(jì)數(shù)器的內(nèi)存地址上使用 & 語(yǔ)法。
為了安全地使用計(jì)數(shù)器斗塘,同時(shí)它任然被其他 goroutine 更新赢织。通過(guò) LoadUint64提取一個(gè)當(dāng)前值的副本到 opsFinal。如上所述馍盟,需要獲取值的內(nèi)存地址 &ops 給這個(gè)函數(shù)于置。
運(yùn)行程序顯示執(zhí)行了大約 40000次操作。根據(jù)自己機(jī)器性能可以嘗試其他更nice的操作贞岭。
package main
import (
"sync/atomic"
"time"
"fmt"
)
func main(){
var ops uint64 = 0
for i:= 0 ; i< 50 ; i++{
go func() {
for{
atomic.AddUint64(&ops,1)
time.Sleep(time.Millisecond * 1 )
}
}()
}
time.Sleep(time.Second * 10)
opsFinal := atomic.LoadUint64(&ops)
fmt.Println("ops",opsFinal)
}