messageID 消息唯一標(biāo)識(shí)
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error){
msg := NewMessage(topic.GenerateID(), messageBody)
}
// ID生成器
func (t *Topic) GenerateID() MessageID {
retry:
id, err := t.idFactory.NewGUID()
if err != nil {
time.Sleep(time.Millisecond)
goto retry
}
return id.Hex()
}
func (f *guidFactory) NewGUID() (guid, error) {
f.Lock()
// divide by 1048576, giving pseudo-milliseconds, 1ms 約等于 1048576
// 獲取當(dāng)前時(shí)間為多少毫秒, 2^20 = 1048576
ts := time.Now().UnixNano() >> 20
// 最新消息時(shí)間毫秒數(shù) > 當(dāng)前键耕,報(bào)錯(cuò)
if ts < f.lastTimestamp {
f.Unlock()
return 0, ErrTimeBackwards
}
// 最新消息時(shí)間毫秒數(shù) = 當(dāng)前锌半,根據(jù)sequence計(jì)數(shù)器區(qū)分
if f.lastTimestamp == ts {
f.sequence = (f.sequence + 1) & sequenceMask
if f.sequence == 0 {
f.Unlock()
return 0, ErrSequenceExpired
}
} else {
f.sequence = 0
}
f.lastTimestamp = ts
// id = [ 22位ts + 10位 workerId + 12位 sequence ]
id := guid(((ts - twepoch) << timestampShift) |
(f.nodeID << nodeIDShift) |
f.sequence)
if id <= f.lastID {
f.Unlock()
return 0, ErrIDBackwards
}
f.lastID = id
f.Unlock()
return id, nil
}
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者