spring-boot基于zookeeper實(shí)現(xiàn)分布式鎖

Curator一套zookeeper客戶端框架恩伺,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作颈将,本文主要介紹使用curator框架來實(shí)現(xiàn)zookeeper的分布式鎖實(shí)現(xiàn)方案翠霍。

使用curator來實(shí)現(xiàn)zookeeper分布式鎖有多種方案牲剃,本文主要使用 InterProcessMutex 來實(shí)現(xiàn)全局共享鎖纹冤。

代碼已經(jīng)上傳至github:https://github.com/xsg1995/spring-boot-curator

引入依賴

pom.xml文件如下所示:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsg</groupId>
    <artifactId>spring-boot-curator</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring-boot-curator</name>
    <url>http://maven.apache.org</url>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>

    
    <dependencies>
        <!--spring-boot-starter-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!--spring-boot-starter-test-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--spring-boot-configuration-processor-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <!--curator-recipes-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

        <!--curator-framework-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>

        <!--curator-test 用來模擬 zookeeper 環(huán)境-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    
</project>

zookeeper的配置信息

在 application.yml 中配置 zookeeper 的配置:

zookeeper:
  #每次重試時(shí)間間隔话速,單位毫秒
  baseSleepTimeMs: 1000
  #重試次數(shù)
  maxRetries: 3
  #zookeeper服務(wù)連接id與端口
  connectString: 127.0.0.1:2181
  #會(huì)話超時(shí)時(shí)間讶踪,單位毫秒
  sessionTimeoutMs: 5000
  #連接創(chuàng)建超時(shí)時(shí)間,單位毫秒
  connection-timeout-ms: 5000

后臺(tái)定義一個(gè) ZookeeperProperties 類注入 zookeeper 的配置泊交,實(shí)現(xiàn)如下:

/**
 * 注入 zookeeper 的配置信息
 */
@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {
    private int baseSleepTimeMs;
    private int maxRetries;
    private String connectString;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    ...
}

配置 CuratorFramework

CuratorFramework 類封裝了對zookeeper底層的操作乳讥,配置如下:

/**
 * curator 配置
 */
@Configuration
public class ZookeeperConfig {

    /**
     * 獲取 CuratorFramework
     * 使用 curator 操作 zookeeper
     * @return
     */
    @Bean
    public CuratorFramework curatorFramework(ZookeeperProperties zookeeperProperties) {
        //配置zookeeper連接的重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(), zookeeperProperties.getMaxRetries());
        
        //構(gòu)建 CuratorFramework
        CuratorFramework curatorFramework =
                CuratorFrameworkFactory.builder()
                    .connectString(zookeeperProperties.getConnectString())
                    .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs())
                    .connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
                    .retryPolicy(retryPolicy)
                    .build();
        //連接 zookeeper
        curatorFramework.start();
        return curatorFramework;
    }

}

編寫加鎖柱查、釋放鎖具體實(shí)現(xiàn)邏輯

使用模板模式,將加鎖云石、釋放鎖的通用代碼給抽取出來唉工,通過接口回調(diào)方式回調(diào)具體的業(yè)務(wù)實(shí)現(xiàn)邏輯,實(shí)現(xiàn)如下:

/**
 * 可重入共享鎖組件
 */
@Component
public class ShardReentrantLockComponent {

    @Autowired
    private CuratorFramework curatorFramework;

    /**
     * 該方法為模板方法汹忠,獲得鎖后回調(diào) BaseLockHandler 中的 handler 方法
     * @return
     */
    public <T> T acquireLock(BaseLockHandler<T> baseLockHandler) {
        //獲取要加鎖的路徑
        String path = baseLockHandler.getPath();
        //獲取超時(shí)時(shí)間
        int timeOut = baseLockHandler.getTimeOut();
        //時(shí)間單位
        TimeUnit timeUnit = baseLockHandler.getTimeUnit();
        //通過 InterProcessMutex 該類來獲取可重入共性鎖
        InterProcessMutex lock = new InterProcessMutex(this.curatorFramework, path);
        //用于標(biāo)識(shí)是否獲取了鎖
        boolean acquire = false;
        try {
            try {
                //成功獲得鎖后返回 true
                acquire = lock.acquire(timeOut, timeUnit);
            } catch (Exception e) {
                e.printStackTrace();
            }
            if(acquire) {
                //獲得鎖后回調(diào)具體的業(yè)務(wù)邏輯
                return baseLockHandler.handler();
            } else {
                //沒有獲得鎖返回 null
                return null;
            }
        } finally {
            try {
                if(acquire) {
                    //釋放鎖
                    lock.release();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

}

BaseLockHandler 抽象類回調(diào)具體的業(yè)務(wù)邏輯淋硝,實(shí)現(xiàn)如下:

/**
 * 業(yè)務(wù)邏輯抽象類
 * @param <T>
 */
public abstract class BaseLockHandler<T> {
    //獲得鎖的默認(rèn)超時(shí)時(shí)間,默認(rèn)為 200ms
    private static final int DEFAULT_TIME_OUT = 200;
    //加鎖的資源路徑
    private String path;

    public BaseLockHandler(String path) {
        this.path = path;
    }

    /**
     * 具體的業(yè)務(wù)實(shí)現(xiàn)邏輯宽菜,重寫該方法
     * @return
     */
    public abstract T handler();

    /**
     * 返回加鎖的路徑
     * @return
     */
    public String getPath() {
        return this.path;
    }

    /**
     * 返回加鎖的超時(shí)時(shí)間
     * @return
     */
    public int getTimeOut() {
        return DEFAULT_TIME_OUT;
    }

    /**
     * 時(shí)間單位
     * @return
     */
    public TimeUnit getTimeUnit() {
        return TimeUnit.MILLISECONDS;
    }
}

測試

編寫測試用例類進(jìn)行測試:

/**
 * LockComponent的測試類
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class LockComponentTest {

    /**
     * 表示開啟多個(gè)線程并行執(zhí)行
     */
    private static final int THREAD_COUNT = 100;

    @Autowired
    private ShardReentrantLockComponent lockComponent;

    /**
     * 用來模擬 zookeeper 服務(wù)
     */
    private TestingServer server;

    /**
     * 用來實(shí)現(xiàn)具體邏輯谣膳,對該計(jì)數(shù)器加1
     */
    private int count;

    @Before
    public void before() throws Exception {
        //模擬一個(gè)zookeeper節(jié)點(diǎn),端口號(hào)為 2181
       server = new TestingServer(2181);
    }

    @After
    public void after() {
        if(server != null) {
            //關(guān)閉資源
            CloseableUtils.closeQuietly(server);
        }
    }

    /**
     * 不加鎖實(shí)現(xiàn)多個(gè)線程同時(shí)對 count 執(zhí)行 ++ 操作
     * 會(huì)出現(xiàn)數(shù)據(jù)不一致現(xiàn)象
     * @throws Exception
     */
    @Test
    public void noAcquireLockTest() throws Exception {
        //初始化一個(gè)擁有 100 個(gè)線程的線程池
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
        //使用 CountDownLatch 實(shí)現(xiàn)線程的協(xié)調(diào)
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
        for(int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            //提交線程
            executorService.submit(() -> {
                //name 表示該線程的名稱
                String name = "client" + (index + 1);
                //執(zhí)行 count++
                count++;
                try {
                    //睡眠 50ms ,使測試結(jié)果更明顯
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //打印各個(gè)線程執(zhí)行結(jié)果
                System.out.println(name + "    執(zhí)行業(yè)務(wù)方法铅乡,對 count 執(zhí)行 ++ 操作后 count 的值為 : " + count);
                //調(diào)用countDown方法继谚,表示該線程執(zhí)行完畢
                countDownLatch.countDown();
            });
        }
        //使該方法阻塞住,不然看不到效果
        countDownLatch.await();
    }

    /**
     * 使用 zookeeper 加鎖實(shí)現(xiàn)多個(gè)線程同時(shí)對 count 執(zhí)行 ++ 操作
     * @throws Exception
     */
    @Test
    public void acquireLockTest() throws Exception {
        //要加鎖節(jié)點(diǎn)的路徑
        String path = "/path/test";
        //初始化一個(gè)擁有 100 個(gè)線程的線程池
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
        //使用 CountDownLatch 實(shí)現(xiàn)線程的協(xié)調(diào)
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);

        for(int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            //提交線程
            executorService.submit(() -> {
                //name 表示該線程的名稱
                String name = "client" + (index + 1);
                //result 獲取執(zhí)行完業(yè)務(wù)邏輯后返回值
                String result = null;
                while (result == null) {
                    //result 為 null 表示沒有的鎖阵幸,會(huì)繼續(xù)執(zhí)行while循環(huán)去競爭獲取鎖
                    result = lockComponent.acquireLock(new BaseLockHandler<String>(path) {

                        //執(zhí)行具體的業(yè)務(wù)邏輯
                        @Override
                        public String handler() {
                            //執(zhí)行 count++
                            count++;
                            try {
                                //睡眠 50ms ,使測試結(jié)果更明顯
                                Thread.sleep(50);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            //打印各個(gè)線程執(zhí)行結(jié)果
                            System.out.println(name + "    執(zhí)行業(yè)務(wù)方法花履,對count++ : " + count);

                            //執(zhí)行成功后不要返回null,如果返回null挚赊,會(huì)繼續(xù)執(zhí)行while去競爭獲取鎖
                            return this.getPath();
                        }
                    });
                }
                //調(diào)用countDown方法诡壁,表示該線程執(zhí)行完畢
                countDownLatch.countDown();
            });
        }
        //使該方法阻塞住,不然看不到效果
        countDownLatch.await();
    }

}

測試的業(yè)務(wù)邏輯是對 count 局部變量執(zhí)行 ++ 操作咬腕,這里編寫了兩個(gè)方法:

  • noAcquireLockTest 該方法實(shí)現(xiàn)是通過開啟 100 個(gè)線程欢峰,以不加鎖的方法,并行的對 count 執(zhí)行 ++ 操作涨共;
  • acquireLockTest 該方法通過獲取 zookeeper 的加鎖機(jī)制,開啟 100 個(gè)線程宠漩,并行的對 count 執(zhí)行 ++ 操作举反;

不加鎖的執(zhí)行方式結(jié)果如下所示:


不加鎖的執(zhí)行方式結(jié)果

可以看到,數(shù)據(jù)已經(jīng)出現(xiàn)不一致現(xiàn)象扒吁。

使用 zookeeper 加鎖執(zhí)行結(jié)果如下所示:


使用 zookeeper 加鎖執(zhí)行結(jié)果

從右邊計(jì)數(shù)器 count 的值可以看出火鼻,加鎖操作是成功的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末雕崩,一起剝皮案震驚了整個(gè)濱河市魁索,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌盼铁,老刑警劉巖粗蔚,帶你破解...
    沈念sama閱讀 212,599評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異饶火,居然都是意外死亡鹏控,警方通過查閱死者的電腦和手機(jī)致扯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來当辐,“玉大人抖僵,你說我怎么就攤上這事≡稻荆” “怎么了耍群?”我有些...
    開封第一講書人閱讀 158,084評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長找筝。 經(jīng)常有香客問我蹈垢,道長,這世上最難降的妖魔是什么呻征? 我笑而不...
    開封第一講書人閱讀 56,708評(píng)論 1 284
  • 正文 為了忘掉前任耘婚,我火速辦了婚禮,結(jié)果婚禮上陆赋,老公的妹妹穿的比我還像新娘沐祷。我一直安慰自己,他們只是感情好攒岛,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,813評(píng)論 6 386
  • 文/花漫 我一把揭開白布赖临。 她就那樣靜靜地躺著,像睡著了一般灾锯。 火紅的嫁衣襯著肌膚如雪兢榨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,021評(píng)論 1 291
  • 那天顺饮,我揣著相機(jī)與錄音吵聪,去河邊找鬼。 笑死兼雄,一個(gè)胖子當(dāng)著我的面吹牛吟逝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播赦肋,決...
    沈念sama閱讀 39,120評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼块攒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了佃乘?” 一聲冷哼從身側(cè)響起囱井,我...
    開封第一講書人閱讀 37,866評(píng)論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎趣避,沒想到半個(gè)月后庞呕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,308評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鹅巍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,633評(píng)論 2 327
  • 正文 我和宋清朗相戀三年千扶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了料祠。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,768評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡澎羞,死狀恐怖髓绽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情妆绞,我是刑警寧澤顺呕,帶...
    沈念sama閱讀 34,461評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站括饶,受9級(jí)特大地震影響株茶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜图焰,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,094評(píng)論 3 317
  • 文/蒙蒙 一启盛、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧技羔,春花似錦僵闯、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至拙绊,卻和暖如春向图,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背标沪。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評(píng)論 1 267
  • 我被黑心中介騙來泰國打工榄攀, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人金句。 一個(gè)月前我還...
    沈念sama閱讀 46,571評(píng)論 2 362
  • 正文 我出身青樓航攒,卻偏偏與公主長得像,于是被迫代替她去往敵國和親趴梢。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,666評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容