之前總結(jié)過一篇利用Redis的事務特性和Watch實現(xiàn)CAS樂觀鎖的Case,除了用事務和Watch實現(xiàn)鎖還有更簡單的實現(xiàn)——基于Redis的悲觀鎖主要是依靠SETNX命令來實現(xiàn)。
SETNX:
  SETNX是Redis提供的一種類Set的命令,不同的是這個命令只會在鍵不存在的情況下為鍵設置值,官方對SETNX的解釋如下:
Paste_Image.png
當然只有這一個命令還是不夠的萝勤,我們還需要結(jié)合事務進行鎖的釋放,當然歸根結(jié)底最重要的性質(zhì)還是Redis的單線程的性質(zhì)甫匹。
在了解了SETNX之后称近,我們需要用Jedis實現(xiàn)一下簡單的分布式鎖纯赎,代碼如下:
package com.zhiming.redis.lock;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Transaction;
public class RedisSampleLock {
private static final String redisHost = "10.0.3.67";
private static final int port = 6381;
private static JedisPoolConfig config;
private static JedisPool pool;
private static ExecutorService service;
private static int ThLeng=10;
private static CountDownLatch latch;
private static AtomicInteger Countor = new AtomicInteger(0);
private static int count = 0;
private static String LockName = "mylock_test10";
static{
//利用Redis連接池块攒,保證多個線程利用多個連接励稳,充分模擬并發(fā)性
config = new JedisPoolConfig();
config.setMaxIdle(10);
config.setMaxWaitMillis(1000);
config.setMaxTotal(30);
pool = new JedisPool(config, redisHost, port);
//利用ExecutorService 管理線程
service = Executors.newFixedThreadPool(ThLeng);
//CountDownLatch保證主線程在全部線程結(jié)束之后退出
latch = new CountDownLatch(ThLeng);
}
/**
* 獲取鎖
* tips:生成一個UUID佃乘,作為Key的標識,不斷輪詢lockName驹尼,直到set成功趣避,表示成功獲取鎖。
* 其他的線程在set此lockName時被阻塞直到超時新翎。
* @param pool
* @param lockName
* @param timeouts
* @return 鎖標識
*/
public static String getLock(JedisPool pool,String lockName,long timeouts){
Jedis client = pool.getResource();
try{
String value = UUID.randomUUID().toString();
long timeWait = System.currentTimeMillis() + timeouts*1000;
while(System.currentTimeMillis()<timeWait){
if(client.setnx(lockName, value) == 1){
System.out.println("lock geted");
return value;
}
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("get lock timeouts");
}finally{
//pool.returnBrokenResource(client);
pool.returnResource(client);
}
return null;
}
/**
* 釋放鎖
* tips:對lockName做watch程帕,開啟一個事務,刪除以LockName為key的鎖地啰,刪除后骆捧,此鎖對于其他線程為可爭搶的。
*
* @param pool
* @param lockName
* @param value
*/
public static void relaseLock(JedisPool pool,String lockName,String value){
Jedis client = pool.getResource();
try{
while(true){
client.watch(lockName);
if (client.get(lockName).equals(value)){
Transaction tx = client.multi();
tx.del(lockName);
tx.exec();
return;
}
client.unwatch();
}
}finally{
//pool.returnBrokenResource(client);
pool.returnResource(client);
}
}
public static void main(String args[]){
for(int i=0;i<ThLeng;i++){
String tName = "thread-"+i;
Thread t = new Thread(new SubAddThread(pool,tName));
System.out.println(tName+"inited...");
service.submit(t);
}
service.shutdown();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Countor.get());
System.out.println(count);
}
public static class SubAddThread implements Runnable{
private String name;
private JedisPool pool;
public SubAddThread(JedisPool pool,String uname){
this.pool = pool;
this.name = uname;
}
public void run() {
for(int i=0;i<100;i++){
System.out.println(name+" starting...");
String valuse = getLock(pool,LockName,50);
System.out.println(name+" get Lock "+valuse);
count++;
relaseLock(pool,LockName,valuse);
Countor.incrementAndGet();
System.out.println(name+" "+count);
}
latch.countDown();
System.out.println(name+" complated");
}
}
}
主要是利用SetNx的特性實現(xiàn)髓绽,上面的實現(xiàn)還是有很多問題,但是說明了Redis實現(xiàn)分布式鎖的思想妆绞。經(jīng)過測試顺呕,上面的代碼保證了count++的原子性,最后輸出結(jié)果和AtomicInteger的實例Counter輸出的一致括饶。