為了提升發(fā)送時的吞吐量,本文采用類似Nagle算法的發(fā)送方式:
- 數(shù)據(jù)先存入發(fā)送緩沖區(qū)
- 若緩沖區(qū)已滿,則立即發(fā)送
- 若緩沖區(qū)未滿态罪,則等待指定時間發(fā)送
package main
import (
"fmt"
"sync"
"time"
)
type Message struct {
ch chan int
buf []int
size int
timeout <-chan time.Time
once sync.Once
}
func newMessage(size int) *Message {
return &Message{
ch: make(chan int, size),
buf: make([]int, 0, size),
size: size,
timeout: time.After(1 * time.Second),
}
}
/**
* 此方法僅執(zhí)行一次黎茎,所以不需要加鎖
*/
func (m *Message) Send() {
m.once.Do(func() {
fmt.Println("once")
for {
select {
case d := <-m.ch:
m.buf = append(m.buf, d)
if len(m.buf) >= m.size {
fmt.Println("full buffer, send data...", m.buf)
m.buf = m.buf[:0]
}
case <-m.timeout:
if len(m.buf) > 0 {
fmt.Println("timeout and send data...", m.buf)
m.buf = m.buf[:0]
}
m.timeout = time.After(1 * time.Second)
}
}
})
}
func (m *Message) Receive(val int) {
fmt.Println("receive data:", val)
m.ch <- val
time.Sleep(time.Second)
}
func main() {
msg := newMessage(100)
go msg.Send()
for i := 0; i < 100; i++ {
go msg.Receive(i)
}
time.Sleep(5 * time.Minute)
}