goroutine
在go語言中胰舆,我們只需要在需要異步的函數(shù)前面加一個go 關(guān)鍵字即可完成異步
func main() {
for i := 0; i < 1000; i++ {
go func(i int) {
for {
fmt.Printf("Hello from" + " goroutine %d\n", i)
}
}(i)
}
time.Sleep(time.Millisecond)
}
定義
- 任何函數(shù)只需加上go關(guān)鍵字就能送給調(diào)度器運(yùn)行
- 不需要再定義是區(qū)分是否是異步函數(shù)
- 調(diào)度器在合適的點(diǎn)進(jìn)行切換
- 使用-race來檢測數(shù)據(jù)訪問沖突
goroutine 可能切換的點(diǎn)
- I/O, select
- channel
- 等待鎖
- 函數(shù)調(diào)用(有時)
- runtime.Gosched()
- 只是參考盐捷,不能保證切換门坷,不能保證在其他地方不切換
協(xié)程 Coroutine
在go中的并發(fā),是使用協(xié)程來處理的番电,這里的協(xié)程具有以下幾個特點(diǎn)
- 輕量級“線程”
- 非搶占式多任務(wù)處理竟坛,由協(xié)程主動交出控制權(quán)
- 編譯器/解釋器/虛擬機(jī)層面的多任務(wù)
- 多個協(xié)程可能在一個或多個線程上運(yùn)行
channel
下圖為channel和調(diào)度器之間的關(guān)系
channel 是一等公民
下面我們用一段代碼來演示channel在go中是一等公民
func worker(id int, c chan int) {
for n := range c{
fmt.Printf("worker %d received %c \n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func chanDemo() {
var channels [10]chan<- int
for i := 0; i < 10; i++ {
channels[i] = createWorker(i)
}
for i := 0; i < 10; i++ {
channels[i] <- 'a' + I
}
time.Sleep(time.Millisecond)
}
buffered channel
我們可以在make 一個channel時,后面跟一個數(shù)字來表示這個channel的緩存是多少,這里的d就不會被輸出出來
func bufferedChannel() {
c := make(chan int, 3)
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
time.Sleep(time.Millisecond)
}
close channel
在worker函數(shù)中担汤,使用range來判斷channel是否關(guān)閉涎跨,如果沒有關(guān)閉則會在此函數(shù)的生命周期內(nèi)一直循環(huán)
func channelClose() {
c := make(chan int)
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
close(c)
time.Sleep(time.Millisecond)
}
這里的關(guān)閉一定是發(fā)送方來進(jìn)行close,如果不使用range來判斷崭歧,我們還可以用下面的方式來判斷range
n, ok := <- c
if ok {
...
}
WaitGroup
這里我們使用WaitGroup 來創(chuàng)建兩個并發(fā)請求
type worker struct {
in chan int
done func()
}
func doWorker(id int, w worker) {
for n := range w.in{
fmt.Printf("worker %d received %c \n", id, n)
w.done()
}
}
func createWorker(id int, wg *sync.WaitGroup) worker {
w := worker{
in : make(chan int),
done : func() {
wg.Done()
},
}
go doWorker(id, w)
return w
}
func chanDemo() {
var wg sync.WaitGroup
var workers [10] worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wg)
}
wg.Add(20)
for i, worker := range workers {
worker.in <- 'a' + I
}
for i, worker := range workers {
worker.in <- 'A' + I
}
wg.Wait()
}
func main() {
chanDemo()
}
使用Select 來進(jìn)行調(diào)度
下面我們來實(shí)現(xiàn)一個非租塞式隅很,10秒鐘結(jié)束,中間800ms沒有操作則輸出timeout率碾,如果有操作則輸出的一個例子
func worker(id int, c chan int) {
for n := range c {
fmt.Printf("worker %d received %d \n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- I
I++
}
}()
return out
}
func main() {
var c1, c2 = generator(), generator()
var worker = createWorker(0)
var values []int
// 計(jì)時器
tm := time.After(10 * time.Second)
for {
var activeWorker chan<- int
var activeValue int
if len(values) > 0 {
activeWorker = worker
activeValue = values[0]
}
select {
case n := <-c1:
values = append(values, n)
case n := <-c2:
values = append(values, n)
case activeWorker <- activeValue:
values = values[1:]
case <-time.After(800 * time.Millisecond):
fmt.Println("timeout")
// 10s 后調(diào)用
case <-tm:
fmt.Println("bye")
return
}
}
}
atomic 原子操作
這段代碼如果不加鎖叔营,我們在race時,會告知當(dāng)前值在讀取時所宰,有可能會被寫
type atomicInt struct {
value int
lock sync.Mutex
}
func (a *atomicInt) increment() {
// 這樣保證defer只在這個匿名函數(shù)中執(zhí)行
func() {
a.lock.Lock()
defer a.lock.Unlock()
a.value++
}()
}
func (a *atomicInt) get() int {
a.lock.Lock()
defer a.lock.Unlock()
return a.value
}
func main() {
var a atomicInt
a.increment()
go func() {
a.increment()
}()
time.Sleep(time.Millisecond)
fmt.Println(a.get())
}