項目背景
? ? ? ?由于項目由之前單一地區(qū)的推廣還算順利元莫,因此后面規(guī)劃是面向全國范圍推廣氮昧,之前的小步快走的開發(fā)模式?jīng)Q定了初期的項目架構(gòu)使用了mysql的單庫框杜,隨著全國范圍的推廣肯定單庫的高可用模式會面臨問題,因此要求進行數(shù)據(jù)的分庫分表拆分袖肥,項目一開始主鍵使用的自增主鍵咪辱,自增主鍵的N多好處不用再強調(diào)(索引查找效率、范圍查找椎组,巴拉巴拉)但是如何分庫分表以后就會面臨自增主鍵的不適用油狂,因此考慮引入分布式id生成組件"leaf",開源的有很多百度,滴滴寸癌,美團都有专筷。最終比較后選擇了美團。
源碼分析的意義與價值
? ? ? ?如果不能把控的中間件引入是災(zāi)難性的蒸苇,因此在引入leaf之前功課也是要做足的磷蛹,包括后面可能存在的定制化需求,當(dāng)初選擇阿里開源的RocketMq的原因也是因為是java語言溪烤,團隊成員起碼可以有些問題從源碼入手味咳。
? ? ? ?好了開始分析源碼庇勃,leaf提供了兩種id生成方式一種是基于mysql分段雙buffer模式,一種是基于zookeeper的槽驶。
分段id生成器
? ? ? ?宏觀上先說明一下责嚷,這里不得不引申一下一個概念方便理解,就是并發(fā)控制的本質(zhì)到底是什么掂铐,并發(fā)控制的本質(zhì)個人總結(jié)如下罕拂,多個線程(可能來自一個java進程也就是同一個java虛擬機,也可能來自于不同的機器上面不同的虛擬機)那這里面其實就是兩個不同概念堡纬,一個是并發(fā)控制聂受,另一個是分布式并發(fā)控制,他們的道理類似烤镐,在實現(xiàn)上都是通過鎖定一塊共享區(qū)域某一個共享區(qū)域蛋济,往往中間件作者為了性能會采用內(nèi)存來進行鎖定,比如之前文章的redis分布式鎖炮叶,比如Zookeeper里面系統(tǒng)并發(fā)控制也是通過DataTree里面dataNode的parent的對象頭進行鎖定的碗旅。這些無外乎要么利用的redis的單線程的特性,亦或者利用了zookeeper一主多從情況下的寫入安全性來完成的各種并發(fā)控制镜悉。
? ? ? ?美團實現(xiàn)的分段id生成器使用的則是mysql數(shù)據(jù)庫完成數(shù)據(jù)的共享祟辟,并通過update語句加鎖完成并發(fā)控制。接下來分析一下分段id生成的核心代碼就一個類侣肄。
package com.sankuai.inf.leaf.segment;
import com.sankuai.inf.leaf.IDGen;
import com.sankuai.inf.leaf.common.Result;
import com.sankuai.inf.leaf.common.Status;
import com.sankuai.inf.leaf.segment.dao.IDAllocDao;
import com.sankuai.inf.leaf.segment.model.*;
import org.perf4j.StopWatch;
import org.perf4j.slf4j.Slf4JStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class SegmentIDGenImpl implements IDGen {
private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class);
/**
* IDCache未初始化成功時的異常碼
*/
private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
/**
* key不存在時的異常碼
*/
private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;
/**
* SegmentBuffer中的兩個Segment均未從DB中裝載時的異常碼
*/
private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;
/**
* 最大步長不超過100,0000
*/
private static final int MAX_STEP = 1000000;
/**
* 一個Segment維持時間為15分鐘
* 這里指的是一個系統(tǒng)默認(rèn)認(rèn)為的合理時間旧困,主要用于調(diào)整buffer里面步長的大小,如果當(dāng)前次更新距離上次更新時間超過15分鐘的話
* 那么步長就會動態(tài)調(diào)整為二分之一之前的長度稼锅,如果說當(dāng)次更新時間距離上次更新時間未超過15分鐘那么說明系統(tǒng)壓力大吼具,那么就適當(dāng)調(diào)整步長到2倍直到最大步長
* MAX_STEP
*/
private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
/**
* 用來更新本地的buffer的線程池,用來更新每個tag對應(yīng)的segmentBufferr里面?zhèn)溆玫膕egment
*/
private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory());
/**
* 初始化狀態(tài)矩距,主要用來標(biāo)記mysql中tag是否初次被同步進入內(nèi)存中
*/
private volatile boolean initOK = false;
/**
* 用來保存每個tag對應(yīng)的segmentBuffer肠牲,業(yè)務(wù)通過tag進行隔離毫蚓,并且此處使用了并發(fā)安全的容器,主要是防止在刷新tag的時候出現(xiàn)線程不安全的問題
*/
private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>();
/**
* 主要是用來與mysql打交道膨报,加載tag腹鹉,加載step檀咙,更新maxid
*/
private IDAllocDao dao;
/**
* 作者比較優(yōu)秀凸丸,為了刷新線程起一個比較好聽的名字特意寫了個一個工廠很钓,哈哈并且內(nèi)部做了一個線程計數(shù)的變量
*/
public static class UpdateThreadFactory implements ThreadFactory {
private static int threadInitNumber = 0;
private static synchronized int nextThreadNum() {
return threadInitNumber++;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
}
}
/**
* 暴露給外部調(diào)用用來初始化分段id生成器的功能,主要包括更新所有的tag進入到內(nèi)存中允趟,并且啟動一個單線程的守護線程去做定時刷新這些tag的操作恼策,
* 間隔60秒,這里之所以用單線程的線程池我個人的判斷是為了充分利用阻塞的特性拼窥,因為在極端的情況下60秒加載不完那么就阻塞著在哪里戏蔑,當(dāng)然,絕大多數(shù)業(yè)務(wù)
* 一分鐘肯定是能夠加載完的鲁纠。
*/
@Override
public boolean init() {
logger.info("Init ...");
// 確保加載到kv后才初始化成功
updateCacheFromDb();
initOK = true;
updateCacheFromDbAtEveryMinute();
return initOK;
}
/**
* 刷新緩存的方法总棵。單線程每隔60秒刷新一次tag,與mysql 同步一次tag的信息
*/
private void updateCacheFromDbAtEveryMinute() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("check-idCache-thread");
t.setDaemon(true);
return t;
}
});
service.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
updateCacheFromDb();
}
}, 60, 60, TimeUnit.SECONDS);
}
/**
* 從mysql中同步tag的信息進入內(nèi)存中改含,這里作者做的也很巧妙情龄,并不著急立馬就去加載segment我們看到SegmentBuffer里面有一個初始化是否成功的標(biāo)志字段
* initOk 他標(biāo)志著目前這個segmentBuffer是否可用,但是這個方法里面默認(rèn)是false的捍壤,作者在這里巧妙的利用了懶加載的方式骤视,將max的值的更改延后,因為我們思考一種弄場景
* leaf在美團中可能是全集團公用的鹃觉,可能部署了上百個節(jié)點专酗,那么很有可能這些服務(wù)會面臨重啟,如果每次重啟都會默認(rèn)更新mysql的話盗扇,一方面會浪費非常多的step的id祷肯,另外一方面很有可能
* 就算浪費了id也可能會用不到,因此這里面用戶使用了懶加載的思想只是先進行占位疗隶,當(dāng)用戶在真正使用的時候再去查詢并填充segment并更新mysql佑笋,因此這里面有個細(xì)節(jié)就是
* 如果系統(tǒng)極端在乎平滑性,那么在leaf在對外提供服務(wù)前斑鼻,先手動調(diào)用一次蒋纬,以確保segment被填充完善,降低延時性坚弱。
*/
private void updateCacheFromDb() {
logger.info("update cache from db");
StopWatch sw = new Slf4JStopWatch();
try {
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌進cache
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
}
//cache中已失效的tags從cache刪除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
} catch (Exception e) {
logger.warn("update cache from db exception", e);
} finally {
sw.stop("updateCacheFromDb");
}
}
/**
* 主力接口蜀备,用于對外界提供id
* 判斷當(dāng)前tag緩存是否已經(jīng)就緒,如果未就緒直接報錯史汗,因此要求調(diào)用方應(yīng)該先調(diào)用init(),進行基礎(chǔ)環(huán)境的就緒
* 緩存就緒成功琼掠,從緩存中查看客戶端請求的key是否存在,不存在的可能有兩種停撞,一種是mysql中沒有瓷蛙,這個需要等大概60秒才會刷新,因此在leaf使用過程中應(yīng)該提前就緒好mysql戈毒,讓讓多個leaf服務(wù)都能刷新到相應(yīng)的key
* 另外一種可能就是mysql中也沒有艰猬,當(dāng)然也會造成cache中沒有,兩種情況造成的緩存中沒有埋市,系統(tǒng)都會返回key不存在冠桃,id生成失敗
* 如果緩存中也恰好查到了有key,那么就會因為懶加載的原因造成可能segmentBuffer沒有初始化道宅,(任何事情都有兩面性)
* 我們看到美團的處理方式是通過鎖定對應(yīng)的segmentBuffer的對象頭食听,可以說也是無所不用其極的減低鎖粒度胸蛛,不得不說一句nice
* 另外我們看到使用了雙重檢查,防止并發(fā)問題樱报,這里多啰嗦一句為什么回出現(xiàn)并發(fā)問題葬项,兩個線程都到了synchronized的臨界區(qū)后,一個線程拿到了buffer的頭鎖迹蛤,進入可能去更新mysql了民珍,如果他執(zhí)行完他會放開頭鎖
* 但是如果不通過判斷那么他也會繼續(xù)執(zhí)行更新mysql的操作,因此造成不滿足我們預(yù)期的事情發(fā)生了盗飒,所以這里通過一個initok的一個標(biāo)志進行雙重判定嚷量,那么就算是第二個線程進入后因為第一個線程退出前就更新了linitok為true
* 所以第二個線程進來后還是不能更新mysql就安全出去臨界區(qū)了。
*
*/
@Override
public Result get(final String key) {
if (!initOK) {
return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
}
if (cache.containsKey(key)) {
SegmentBuffer buffer = cache.get(key);
if (!buffer.isInitOk()) {
synchronized (buffer) {
if (!buffer.isInitOk()) {
try {
updateSegmentFromDb(key, buffer.getCurrent());
logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
buffer.setInitOk(true);
} catch (Exception e) {
logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
}
}
}
}
return getIdFromSegmentBuffer(cache.get(key));
}
return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
}
/**
* 這個方法主要是用來更新并填充好逆趣,指定key對應(yīng)的SegmentBuffer
* StopWatch 是一個計時器蝶溶,作者考慮到這個方法的性能問題,因此加了一個監(jiān)控
* 先是判斷指定的segmentBuffer是否初始化完成宣渗,如果沒有初始化完成也就是說沒有向數(shù)據(jù)庫去申請id段身坐,那么就去取申請并填充進segmentBuffer
* 如果是已經(jīng)初始化完成了,第二個分支其實特定指的是第二次申請
*/
public void updateSegmentFromDb(String key, Segment segment) {
StopWatch sw = new Slf4JStopWatch();
SegmentBuffer buffer = segment.getBuffer();
LeafAlloc leafAlloc;
//第一次申請id段
if (!buffer.isInitOk()) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step
} else if (buffer.getUpdateTimestamp() == 0) {
//第二次申請id段落包,因為之前的第一次申請動作談不上更新部蛇,因此在第二次的時候?qū)⒏聲r間進行填充
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step
} else {
//第N次申請id段動態(tài)優(yōu)化步長,我們看到有一個指定的時間以15分鐘為例咐蝇,如果兩次領(lǐng)取間隔少于15分鐘那么就將step拉大一倍涯鲁,但是不會超過系統(tǒng)默認(rèn)的10W的step
// 這樣做的好處其實也是降低mysql壓力
//如果兩次申請的超過30分鐘那么就將步長調(diào)整為原來的一半,但是不會小于最小步長
long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
int nextStep = buffer.getStep();
if (duration < SEGMENT_DURATION) {
if (nextStep * 2 > MAX_STEP) {
//do nothing
} else {
nextStep = nextStep * 2;
}
} else if (duration < SEGMENT_DURATION * 2) {
//do nothing with nextStep
} else {
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
}
logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
LeafAlloc temp = new LeafAlloc();
temp.setKey(key);
temp.setStep(nextStep);
leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(nextStep);
buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step為DB中的step
}
// must set value before set max
/**
* 此處很坑有序,這是第一版本留下的無效注釋
* https://github.com/Meituan-Dianping/Leaf/issues/16
* 可以不用強制要求的抹腿,因為是單線程更新,并且buffer還沒有就緒因此不存在優(yōu)先可見的問題
*/
long value = leafAlloc.getMaxId() - buffer.getStep();
segment.getValue().set(value);
segment.setMax(leafAlloc.getMaxId());
segment.setStep(buffer.getStep());
sw.stop("updateSegmentFromDb", key + " " + segment);
}
/**
* 核心處理方法
* 通過讀寫鎖提升并發(fā)旭寿,讀鎖主要負(fù)責(zé)id的自增警绩,但是如果只是自增那么靠automic操作就夠,因此還涉及到segment的切換盅称,因此此處使用了讀寫鎖進行分離
* 當(dāng)需要切換segment的時候讀鎖也會被掛起來肩祥,因為如果不掛起的話會出現(xiàn)臟讀。
* 方法的核心思想總結(jié)如下
* 通過while循環(huán)缩膝,死循環(huán)的去取數(shù)據(jù)混狠,先是拿到讀鎖,此處總結(jié)一下 JUC包里面的讀寫鎖的特性疾层,讀讀可并行将饺,讀寫不可并行,寫寫不可并行。
* 在這里從概念上先完成梳理
* 1予弧、備用buffer的更新是由單線程完成的刮吧,這里面是通過cas更新ThreadRunning實現(xiàn)的,因此備用buffer的更新是安全的
* 2掖蛤、id的自增是通過AutomicLong實現(xiàn)的因此也不存在自增時候的線程安全問題
* 3皇筛、主備buffer的切換是由讀寫鎖來進行控制的,讀鎖生效時候時候要么能夠自增成功則返回坠七,要么自增不成功,線程開始搶寫鎖旗笔,如果搶上彪置,那么新來的讀鎖請求就會被掛起,
* 直到寫鎖完成buffer的切換蝇恶,然后通過while循環(huán)自增后返回id
*/
public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
while (true) {
buffer.rLock().lock();
try {
final Segment segment = buffer.getCurrent();
if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
service.execute(new Runnable() {
@Override
public void run() {
Segment next = buffer.getSegments()[buffer.nextPos()];
boolean updateOk = false;
try {
updateSegmentFromDb(buffer.getKey(), next);
updateOk = true;
logger.info("update segment {} from db {}", buffer.getKey(), next);
} catch (Exception e) {
logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
} finally {
if (updateOk) {
buffer.wLock().lock();
buffer.setNextReady(true);
buffer.getThreadRunning().set(false);
buffer.wLock().unlock();
} else {
buffer.getThreadRunning().set(false);
}
}
}
});
}
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
} finally {
buffer.rLock().unlock();
}
waitAndSleep(buffer);
buffer.wLock().lock();
try {
//這里進行這么判斷是因為可能有多個寫鎖排隊在這里拳魁,一個寫鎖更新成了后,那么后面的線程直接取就好撮弧,不需要走后續(xù)的修改操作了潘懊。
final Segment segment = buffer.getCurrent();
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
//檢查備用buffer是否完成了準(zhǔn)備,如果準(zhǔn)備完成則進行切換贿衍,如果未準(zhǔn)備完成則拋出異常代表主buffer還有從buffer都沒有準(zhǔn)備好授舟,系統(tǒng)暫時不可用。
//產(chǎn)生的原因可能是刷新線程池阻塞贸辈,這可能性還是蠻小的释树,這也是為什么中間件作者在 update代碼段加入 stopwatch監(jiān)控的原因吧。
if (buffer.isNextReady()) {
buffer.switchPos();
buffer.setNextReady(false);
} else {
logger.error("Both two segments in {} are not ready!", buffer);
return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
}
} finally {
buffer.wLock().unlock();
}
}
}
/**
* 讓當(dāng)前線程進入死循環(huán)等待擎淤,為了降低無效的cpu輪訓(xùn)如果循環(huán)次數(shù)超過一萬后就休眠10ms
*/
private void waitAndSleep(SegmentBuffer buffer) {
int roll = 0;
while (buffer.getThreadRunning().get()) {
roll += 1;
if(roll > 10000) {
try {
TimeUnit.MILLISECONDS.sleep(10);
break;
} catch (InterruptedException e) {
logger.warn("Thread {} Interrupted",Thread.currentThread().getName());
break;
}
}
}
}
public List<LeafAlloc> getAllLeafAllocs() {
return dao.getAllLeafAllocs();
}
public Map<String, SegmentBuffer> getCache() {
return cache;
}
public IDAllocDao getDao() {
return dao;
}
public void setDao(IDAllocDao dao) {
this.dao = dao;
}
}
核心類的源碼分析都記錄在注釋之中了奢啥,下面貼一張圖方便理解,來自于美團團隊
雪花id生成器
? ? ? ?美團使用的是zookeeper實現(xiàn)的嘴拢,zookeeper在此中間件中扮演的角色總結(jié)如下桩盲,用來存儲每個分布式節(jié)點的ip,port信息席吴,主要是每天機器的時間戳赌结,這個信息用于保障每個節(jié)點生成的id不會出現(xiàn)回?fù)墁F(xiàn)象,出現(xiàn)回?fù)艿脑蚍治鋈缦?/p>
? ? ? ?41bit的計算方式是機器的當(dāng)前時間減去系統(tǒng)初始化的時間的時間戳,leaf給的是 :Thu Nov 04 2010 09:42:54 GMT+0800 (中國標(biāo)準(zhǔn)時間) 1288834974657L的差值填充41bit的空間孝冒,從id的組成我們可以知道10bit的工作機器id不同就能保障一定的程度id不同性姑曙,但是人如果41bit的位置出現(xiàn)時間回?fù)芎竽敲磫我粀orker生成的id就可能會出現(xiàn)重復(fù)的id。
源碼分析
這個類主要是處理zk的連接迈倍,以及數(shù)據(jù)存儲策略伤靠,包括如果分配workerid,以及workerid也會存儲在每個節(jié)點的本地化文件中,從代碼中我們可以知道宴合,每個zk的path是這樣的
/snowflake/leafname/forever/ip:port-0000000001 后面的數(shù)字就是workerid由zk分配的
public class SnowflakeZookeeperHolder {
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeZookeeperHolder.class);
private String zk_AddressNode = null;//保存自身的key ip:port-000000001
private String listenAddress = null;//保存自身的key ip:port
private int workerID;
private static final String PREFIX_ZK_PATH = "/snowflake/" + PropertyFactory.getProperties().getProperty("leaf.name");
private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("leaf.name") + "/leafconf/{port}/workerID.properties";
private static final String PATH_FOREVER = PREFIX_ZK_PATH + "/forever";//保存所有數(shù)據(jù)持久的節(jié)點
private String ip;
private String port;
private String connectionString;
private long lastUpdateTime;
public SnowflakeZookeeperHolder(String ip, String port, String connectionString) {
this.ip = ip;
this.port = port;
this.listenAddress = ip + ":" + port;
this.connectionString = connectionString;
}
/**
* 啟動應(yīng)用使用zk的curator客戶端連接zk焕梅,判斷l(xiāng)eaf指定業(yè)務(wù)的根節(jié)點是否存在,leaf中使用的劃分邏輯如下
* /snowflake/leafname/forever/ip:port-0000000001
* /snowflake/leafname(不同業(yè)務(wù)可以使用不同名字)訂單卦洽,用戶/forever/ip:port(這里指的是提供id生產(chǎn)服務(wù)的機器)-0000000001(順序節(jié)點序號)
* 里面的內(nèi)容存的是endpoint內(nèi)容{"ip","xxx.xxx.xxx.xxx","port":"8080","timestamp":"timestamp"}
* 先掃描業(yè)務(wù)目錄贞言,如果業(yè)務(wù)目錄不存在那么說明第一次啟動這個業(yè)務(wù),因此當(dāng)前主機要把自己的信息保存進去阀蒂,默認(rèn)id為0该窗,這里面可能會存在潛在的并發(fā)問題?蚤霞?
*/
public boolean init() {
try {
CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);
curator.start();
Stat stat = curator.checkExists().forPath(PATH_FOREVER);
if (stat == null) {
//不存在根節(jié)點,機器第一次啟動,創(chuàng)建/snowflake/ip:port-000000000,并上傳數(shù)據(jù)
zk_AddressNode = createNode(curator);
//worker id 默認(rèn)是0
//guozc 潛在并發(fā)問題酗失??兩個節(jié)點同時去創(chuàng)建節(jié)點都成功了昧绣,因為worker默認(rèn)是0可能會造成本地文件存儲的id為0规肴,極端情況下?
updateLocalWorkerID(workerID);
//定時上報本機時間給forever節(jié)點
//定時任務(wù)每三秒鐘上報一下本機信息夜畴,里面關(guān)鍵信息是每次上報的時間戳拖刃,防止數(shù)顯時鐘回?fù)? ScheduledUploadData(curator, zk_AddressNode);
return true;
} else {
//業(yè)務(wù)目錄存在那么就檢查是否有自己的節(jié)點
Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001
Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001)
//存在根節(jié)點,先檢查是否有屬于自己的根節(jié)點
List<String> keys = curator.getChildren().forPath(PATH_FOREVER);
for (String key : keys) {
String[] nodeKey = key.split("-");
realNode.put(nodeKey[0], key);
nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));
}
Integer workerid = nodeMap.get(listenAddress);
if (workerid != null) {
//有自己的節(jié)點,zk_AddressNode=ip:port
zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);
workerID = workerid;//啟動worder時使用會使用
if (!checkInitTimeStamp(curator, zk_AddressNode)) {
throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
}
//準(zhǔn)備創(chuàng)建臨時節(jié)點
doService(curator);
updateLocalWorkerID(workerID);
LOGGER.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID);
} else {
//表示新啟動的節(jié)點,創(chuàng)建持久節(jié)點 ,不用check時間
String newNode = createNode(curator);
zk_AddressNode = newNode;
String[] nodeKey = newNode.split("-");
workerID = Integer.parseInt(nodeKey[1]);
doService(curator);
updateLocalWorkerID(workerID);
LOGGER.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID);
}
}
} catch (Exception e) {
LOGGER.error("Start node ERROR {}", e);
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
workerID = Integer.valueOf(properties.getProperty("workerID"));
LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
LOGGER.error("Read file error ", e1);
return false;
}
}
return true;
}
private void doService(CuratorFramework curator) {
ScheduledUploadData(curator, zk_AddressNode);// /snowflake_forever/ip:port-000000001
}
private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "schedule-upload-time");
thread.setDaemon(true);
return thread;
}
}).scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
updateNewData(curator, zk_AddressNode);
}
}, 1L, 3L, TimeUnit.SECONDS);//每3s上報數(shù)據(jù)
}
/**
* 檢查zookeeper中數(shù)據(jù)是否小于當(dāng)前機器的系統(tǒng)時間,因為每個機器在zk中都有一個自己的節(jié)點用于存儲endpoint數(shù)據(jù)
*/
private boolean checkInitTimeStamp(CuratorFramework curator, String zk_AddressNode) throws Exception {
byte[] bytes = curator.getData().forPath(zk_AddressNode);
Endpoint endPoint = deBuildData(new String(bytes));
//該節(jié)點的時間不能小于最后一次上報的時間
return !(endPoint.getTimestamp() > System.currentTimeMillis());
}
/**
* 創(chuàng)建持久順序節(jié)點 ,并把節(jié)點數(shù)據(jù)放入 value
*
* @param curator
* @return
* @throws Exception
*/
private String createNode(CuratorFramework curator) throws Exception {
try {
return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(PATH_FOREVER + "/" + listenAddress + "-", buildData().getBytes());
} catch (Exception e) {
LOGGER.error("create node error msg {} ", e.getMessage());
throw e;
}
}
private void updateNewData(CuratorFramework curator, String path) {
try {
if (System.currentTimeMillis() < lastUpdateTime) {
return;
}
curator.setData().forPath(path, buildData().getBytes());
lastUpdateTime = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.info("update init data error path is {} error is {}", path, e);
}
}
/**
* 構(gòu)建需要上傳的數(shù)據(jù)
*
* @return
*/
private String buildData() throws JsonProcessingException {
Endpoint endpoint = new Endpoint(ip, port, System.currentTimeMillis());
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(endpoint);
return json;
}
/**
* 將json字符串轉(zhuǎn)換為endpoint對象
*/
private Endpoint deBuildData(String json) throws IOException {
ObjectMapper mapper = new ObjectMapper();
Endpoint endpoint = mapper.readValue(json, Endpoint.class);
return endpoint;
}
/**
* 在節(jié)點文件系統(tǒng)上緩存一個workid值,zk失效,機器重啟時保證能夠正常啟動
*
* @param workerID
*/
private void updateLocalWorkerID(int workerID) {
File leafConfFile = new File(PROP_PATH.replace("{port}", port));
boolean exists = leafConfFile.exists();
LOGGER.info("file exists status is {}", exists);
if (exists) {
try {
FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false);
LOGGER.info("update file cache workerID is {}", workerID);
} catch (IOException e) {
LOGGER.error("update file cache error ", e);
}
} else {
//不存在文件,父目錄頁肯定不存在
try {
boolean mkdirs = leafConfFile.getParentFile().mkdirs();
LOGGER.info("init local file cache create parent dis status is {}, worker id is {}", mkdirs, workerID);
if (mkdirs) {
if (leafConfFile.createNewFile()) {
FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false);
LOGGER.info("local file cache workerID is {}", workerID);
}
} else {
LOGGER.warn("create parent dir error===");
}
} catch (IOException e) {
LOGGER.warn("craete workerID conf file error", e);
}
}
}
private CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
return CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.build();
}
/**
* 上報數(shù)據(jù)結(jié)構(gòu)
*/
static class Endpoint {
private String ip;
private String port;
private long timestamp;
public Endpoint() {
}
public Endpoint(String ip, String port, long timestamp) {
this.ip = ip;
this.port = port;
this.timestamp = timestamp;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
}
public String getZk_AddressNode() {
return zk_AddressNode;
}
public void setZk_AddressNode(String zk_AddressNode) {
this.zk_AddressNode = zk_AddressNode;
}
public String getListenAddress() {
return listenAddress;
}
public void setListenAddress(String listenAddress) {
this.listenAddress = listenAddress;
}
public int getWorkerID() {
return workerID;
}
public void setWorkerID(int workerID) {
this.workerID = workerID;
}
public static void main(String[] args) {
try {
System.out.println(Integer.parseInt("0000000008"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
另外一個核心類是雪花的實現(xiàn)細(xì)節(jié)贪绘,包括了一個id是如何生成的兑牡,還有就是時間是如何更新到zk的,還有就是人如果當(dāng)前毫秒中id分配不夠了leaf是如何處理的
/**
* 使用位運算拼接一個long類型的id出來税灌,主要是利用時間做高41位的內(nèi)容发绢,中間是10bit的機器id,也基本夠用了垄琐,一個服務(wù)的id生成理論上不會超過1023個服務(wù)節(jié)點
* 最后的12bit用來做遞增
*/
public class SnowflakeIDGenImpl implements IDGen {
@Override
public boolean init() {
return true;
}
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeIDGenImpl.class);
//開始時間戳
//個數(shù)字可以指定為任意小于當(dāng)前時間的數(shù)字边酒,這樣就能讓競爭對手無法知道我們的id信息了,這就解決了基于mysql的segement方案造成的容易被競爭對手監(jiān)控的問題了狸窘,因為有時間維度的參與墩朦,對手不知道我們每時每刻的id發(fā)放信息
private final long twepoch;
//wokerid占用的位數(shù)
private final long workerIdBits = 10L;
//worker最大的id1023
private final long maxWorkerId = ~(-1L << workerIdBits);//最大能夠分配的workerid =1023
//每毫秒的id數(shù)字最大值
private final long sequenceBits = 12L;
//workerid的偏移量
private final long workerIdShift = sequenceBits;
//時間戳位移數(shù)
private final long timestampLeftShift = sequenceBits + workerIdBits;
//每個毫秒生成id數(shù)的掩碼,用來進行與運算提高運算效率翻擒,他讓自己的高位是1氓涣,其他都是0那么在與的時候如果沒滿則是sequence 如果是0說明與的那個數(shù)字二進制后12位全是0了,也就是滿了陋气,因此會休息一個死循環(huán)的時間然后繼續(xù)生成id
private final long sequenceMask = ~(-1L << sequenceBits);
//工作節(jié)點的id
private long workerId;
//每個毫秒自增id數(shù)字
private long sequence = 0L;
//保存上次生成id時候的時間戳
private long lastTimestamp = -1L;
//new一個隨機函數(shù)對象劳吠,多線程公用,并發(fā)性問題交給了synchronized關(guān)鍵字巩趁,并且公用對象后降低了new的成本
private static final Random RANDOM = new Random();
public SnowflakeIDGenImpl(String zkAddress, int port) {
//Thu Nov 04 2010 09:42:54 GMT+0800 (中國標(biāo)準(zhǔn)時間)
this(zkAddress, port, 1288834974657L);
}
/**
* @param zkAddress zk地址
* @param port snowflake監(jiān)聽端口
* @param twepoch
*
* 初始化應(yīng)用痒玩,主要是SnowflakeZookeeperHolder 里面的初始化,包括節(jié)點的創(chuàng)建,或者數(shù)據(jù)的同步蠢古,還有主要是完成時間的檢查奴曙,方式工作節(jié)點始終回?fù)? * 并且將workerid進行賦值,方便生成id時候使用
*/
public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch) {
this.twepoch = twepoch;
Preconditions.checkArgument(timeGen() > twepoch, "Snowflake not support twepoch gt currentTime");
final String ip = Utils.getIp();
SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress);
LOGGER.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", twepoch, ip, zkAddress, port);
boolean initFlag = holder.init();
if (initFlag) {
workerId = holder.getWorkerID();
LOGGER.info("START SUCCESS USE ZK WORKERID-{}", workerId);
} else {
Preconditions.checkArgument(initFlag, "Snowflake Id Gen is not init ok");
}
Preconditions.checkArgument(workerId >= 0 && workerId <= maxWorkerId, "workerID must gte 0 and lte 1023");
}
/**
* 核心方法這個就是獲得雪花id的方法草讶,因為是按照時間軸進行發(fā)布的洽糟,因此不存在不同的業(yè)務(wù)key的隔離,因為所有的業(yè)務(wù)的id都不會重復(fù)堕战,(就是這么的任性)
* 先取到系統(tǒng)時間戳坤溃,然后跟對象中的 lastTimestamp比較如果系統(tǒng)時間比對象時間回?fù)芰?毫秒那么久稍作休息wait一下,也就是等待兩倍的毫秒數(shù)嘱丢,因為左移動1二進制翻一倍薪介,
* 如果線程醒過來后還是有偏移量那么就返回錯誤。如果偏移量超過5毫秒屿讽,那么代表著偏移量太大,那么就返回錯誤吠裆,
* 如果對象中的 lastTimestamp 與當(dāng)前機器中系統(tǒng)時間一樣伐谈,這里面說明一下,這種情況下肯定是比較高的并發(fā)情況下的必然了试疙,因為每次發(fā)放id后對象時間都會被置為當(dāng)時取的系統(tǒng)時間
* 也就是一個毫秒中會發(fā)憷多個id诵棵,那么處理邏輯就是給 sequence不停的加一,這里面的與其實就是2的12次方-1祝旷,也就是整了個sequence的最大值履澳,這樣出現(xiàn)0代表 sequence + 1變成了
* 2的12次方-1了,那么也就是意味著并發(fā)真的很大怀跛,一毫秒中的id被打光了距贷,那么系統(tǒng)就調(diào)用 tilNextMillis 進行死循環(huán)的等待,因為這種等待是毫秒級的吻谋,所以使用了while循環(huán)
* 如果是新的毫秒是就生成一個隨機數(shù)忠蝗,作為sequence的新值,緊接著對lastTimestamp賦值漓拾,然后利用位運算生成一個long的id進行返回
*/
@Override
public synchronized Result get(String key) {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
try {
wait(offset << 1);
timestamp = timeGen();
if (timestamp < lastTimestamp) {
return new Result(-1, Status.EXCEPTION);
}
} catch (InterruptedException e) {
LOGGER.error("wait interrupted");
return new Result(-2, Status.EXCEPTION);
}
} else {
return new Result(-3, Status.EXCEPTION);
}
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
//seq 為0的時候表示是下一毫秒時間開始對seq做隨機
sequence = RANDOM.nextInt(100);
timestamp = tilNextMillis(lastTimestamp);
}
} else {
//如果是新的ms開始
sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
return new Result(id, Status.SUCCESS);
}
/**
* 通過死循環(huán)的形式確保時間進行了后移阁最,因為最多也就是停留一毫秒,所以使用死循環(huán)的形式代價更低
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
protected long timeGen() {
return System.currentTimeMillis();
}
public long getWorkerId() {
return workerId;
}
public static void main(String[] args) {
Date date = new Date("Mon 6 Jan 1997 13:3:00");
long id = ((System.currentTimeMillis()- date.getTime()) << 22L) | (10 << 12L) | (1) & ~(-1L << 12L);
System.out.println(id);
System.out.println(10 << 12L);
System.out.println(Long.toBinaryString(id));
}
}