jedis是一個著名的key-value存儲系統(tǒng)盛垦,而作為其官方推薦的java版客戶端jedis也非常強大和穩(wěn)定,支持事務冗懦、管道及有jedis自身實現(xiàn)的分布式景殷。
在這里對jedis關(guān)于事務、管道和分布式的調(diào)用方式做一個簡單的介紹和對比:
一杖狼、普通同步方式
最簡單和基礎(chǔ)的調(diào)用方式炼蛤,
@Test
public?void?test1Normal()?{
Jedis?jedis?=?new?Jedis("localhost");
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
String?result?=?jedis.set("n"?+?i,?"n"?+?i);
}
long?end?=?System.currentTimeMillis();
System.out.println("Simple?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
jedis.disconnect();
}
很簡單吧,每次set之后都可以返回結(jié)果蝶涩,標記是否成功理朋。
二、事務方式(Transactions)
redis的事務很簡單绿聘,他主要目的是保障嗽上,一個client發(fā)起的事務中的命令可以連續(xù)的執(zhí)行,而中間不會插入其他client的命令熄攘。
看下面例子:
@Test
public?void?test2Trans()?{
Jedis?jedis?=?new?Jedis("localhost");
long?start?=?System.currentTimeMillis();
Transaction?tx?=?jedis.multi();
for?(int?i?=?0;?i?<?100000;?i++)?{
tx.set("t"?+?i,?"t"?+?i);
}
List?results?=?tx.exec();
long?end?=?System.currentTimeMillis();
System.out.println("Transaction?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
jedis.disconnect();
}
我們調(diào)用jedis.watch(…)方法來監(jiān)控key兽愤,如果調(diào)用后key值發(fā)生變化,則整個事務會執(zhí)行失敗挪圾。另外浅萧,事務中某個操作失敗,并不會回滾其他操作哲思。這一點需要注意惯殊。還有,我們可以使用discard()方法來取消事務也殖。
三土思、管道(Pipelining)
有時务热,我們需要采用異步方式,一次發(fā)送多個指令己儒,不同步等待其返回結(jié)果崎岂。這樣可以取得非常好的執(zhí)行效率。這就是管道闪湾,調(diào)用方法如下:
@Test
public?void?test3Pipelined()?{
Jedis?jedis?=?new?Jedis("localhost");
Pipeline?pipeline?=?jedis.pipelined();
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
pipeline.set("p"?+?i,?"p"?+?i);
}
List?results?=?pipeline.syncAndReturnAll();
long?end?=?System.currentTimeMillis();
System.out.println("Pipelined?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
jedis.disconnect();
}
四冲甘、管道中調(diào)用事務
就Jedis提供的方法而言,是可以做到在管道中使用事務途样,其代碼如下:
@Test
public?void?test4combPipelineTrans()?{
jedis?=?new?Jedis("localhost");
long?start?=?System.currentTimeMillis();
Pipeline?pipeline?=?jedis.pipelined();
pipeline.multi();
for?(int?i?=?0;?i?<?100000;?i++)?{
pipeline.set(""?+?i,?""?+?i);
}
pipeline.exec();
List?results?=?pipeline.syncAndReturnAll();
long?end?=?System.currentTimeMillis();
System.out.println("Pipelined?transaction:?"?+?((end?-?start)/1000.0)?+?"?seconds");
jedis.disconnect();
}
但是經(jīng)測試(見本文后續(xù)部分)江醇,發(fā)現(xiàn)其效率和單獨使用事務差不多,甚至還略微差點何暇。
五陶夜、分布式直連同步調(diào)用
@Test
public?void?test5shardNormal()?{
List?shards?=?Arrays.asList(
new?JedisShardInfo("localhost",6379),
new?JedisShardInfo("localhost",6380));
ShardedJedis?sharding?=?new?ShardedJedis(shards);
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
String?result?=?sharding.set("sn"?+?i,?"n"?+?i);
}
long?end?=?System.currentTimeMillis();
System.out.println("Simple@Sharing?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
sharding.disconnect();
}
這個是分布式直接連接,并且是同步調(diào)用裆站,每步執(zhí)行都返回執(zhí)行結(jié)果条辟。類似地,還有異步管道調(diào)用宏胯。
六羽嫡、分布式直連異步調(diào)用
@Test
public?void?test6shardpipelined()?{
List?shards?=?Arrays.asList(
new?JedisShardInfo("localhost",6379),
new?JedisShardInfo("localhost",6380));
ShardedJedis?sharding?=?new?ShardedJedis(shards);
ShardedJedisPipeline?pipeline?=?sharding.pipelined();
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
pipeline.set("sp"?+?i,?"p"?+?i);
}
List?results?=?pipeline.syncAndReturnAll();
long?end?=?System.currentTimeMillis();
System.out.println("Pipelined@Sharing?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
sharding.disconnect();
}
七、分布式連接池同步調(diào)用
如果肩袍,你的分布式調(diào)用代碼是運行在線程中杭棵,那么上面兩個直連調(diào)用方式就不合適了,因為直連方式是非線程安全的氛赐,這個時候颜屠,你就必須選擇連接池調(diào)用。ShardedJedis是通過一致性哈希來實現(xiàn)分布式緩存的鹰祸,通過一定的策略把不同的key分配到不同的redis server上甫窟,達到橫向擴展的目的。
ShardedJedis的使用方法除了配置時有點區(qū)別蛙婴,其他和Jedis基本類似粗井,有一點要注意的是ShardedJedis不支持多命令操作,像mget街图、mset浇衬、brpop等可以在redis命令后一次性操作多個key的命令,具體包括哪些餐济,大家可以看Jedis下的MultiKeyCommands這個類耘擂,這里面就包含了所有的多命令操作。
在初始化ShardedJedisPool時絮姆,我們還可以傳入ShardedJedis采用的hash算法醉冤,支持MURMUR_HASH和md5兩種算法秩霍,默認是使用MURMUR_HASH(可以查看redis.clients.util.Hashing類查看相關(guān)的信息)
@Test
public?void?test7shardSimplePool()?{
List?shards?=?Arrays.asList(
new?JedisShardInfo("localhost",6379),
new?JedisShardInfo("localhost",6380));
ShardedJedisPool?pool?=?new?ShardedJedisPool(new?JedisPoolConfig(),?shards);
ShardedJedis?one?=?pool.getResource();
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
String?result?=?one.set("spn"?+?i,?"n"?+?i);
}
long?end?=?System.currentTimeMillis();
pool.returnResource(one);
System.out.println("Simple@Pool?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
pool.destroy();
}
jedis獲取后一定要關(guān)閉,這和我們使用數(shù)據(jù)庫連接池是一樣的蚁阳,放在finally塊中保證jedis的關(guān)閉.
ps:如果大家使用的jdk是1.7版本或者以上的話铃绒,可以使用1.7加入的try-with-resources語句。
另外還可以傳入keyTagPattern來指定我們key的分布策略螺捐,所有能夠匹配keyTagPattern的key(通過正則匹配)將放在同一個redis里颠悬,默認的是直接使用key來進行判定。Redis自帶了一個Sharded.keyTagPattern定血,如下
Pattern DEFAULT_KEY_TAG_PATTERN = Pattern.compile("\\{(.+?)\\}");
我們可以用下面的代碼來實際測試下
ShardedJedis jedis =jedisPool.getResource();
jedis.set("cnblog", "cnblog");
jedis.set("redis", "redis");
jedis.set("test", "test");
jedis.set("123456", "1234567");
Client client1= jedis.getShard("cnblog").getClient();
Client client2= jedis.getShard("redis").getClient();
Client client3= jedis.getShard("test").getClient();
Client client4= jedis.getShard("123456").getClient();////打印key在哪個server中System.out.println("cnblog in server:" + client1.getHost() + " and port is:" +client1.getPort());
System.out.println("redis? in server:" + client2.getHost() + " and port is:" +client2.getPort());
System.out.println("test? in server:" + client3.getHost() + " and port is:" +client3.getPort());
System.out.println("123456 in server:" + client4.getHost() + " and port is:" + client4.getPort());
八赔癌、分布式連接池異步調(diào)用
@Test
public?void?test8shardPipelinedPool()?{
List?shards?=?Arrays.asList(
new?JedisShardInfo("localhost",6379),
new?JedisShardInfo("localhost",6380));
ShardedJedisPool?pool?=?new?ShardedJedisPool(new?JedisPoolConfig(),?shards);
ShardedJedis?one?=?pool.getResource();
ShardedJedisPipeline?pipeline?=?one.pipelined();
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
pipeline.set("sppn"?+?i,?"n"?+?i);
}
List?results?=?pipeline.syncAndReturnAll();
long?end?=?System.currentTimeMillis();
pool.returnResource(one);
System.out.println("Pipelined@Pool?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
pool.destroy();
}
九、需要注意的地方
事務和管道都是異步模式澜沟。在事務和管道中不能同步查詢結(jié)果灾票。比如下面兩個調(diào)用,都是不允許的:
Transaction?tx?=?jedis.multi();
for?(int?i?=?0;?i?<?100000;?i++)?{
tx.set("t"?+?i,?"t"?+?i);
}
System.out.println(tx.get("t1000").get());??//不允許
List?results?=?tx.exec();
…
…
Pipeline?pipeline?=?jedis.pipelined();
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
pipeline.set("p"?+?i,?"p"?+?i);
}
System.out.println(pipeline.get("p1000").get());?//不允許
List?results?=?pipeline.syncAndReturnAll();
事務和管道都是異步的倔喂,個人感覺铝条,在管道中再進行事務調(diào)用靖苇,沒有必要席噩,不如直接進行事務模式。
分布式中贤壁,連接池的性能比直連的性能略好(見后續(xù)測試部分)悼枢。
分布式調(diào)用中不支持事務。
因為事務是在服務器端實現(xiàn)脾拆,而在分布式中馒索,每批次的調(diào)用對象都可能訪問不同的機器,所以名船,沒法進行事務绰上。
十、測試
運行上面的代碼渠驼,進行測試蜈块,其結(jié)果如下:
Simple?SET:?5.227?seconds
Transaction?SET:?0.5?seconds
Pipelined?SET:?0.353?seconds
Pipelined?transaction:?0.509?seconds
Simple@Sharing?SET:?5.289?seconds
Pipelined@Sharing?SET:?0.348?seconds
Simple@Pool?SET:?5.039?seconds
Pipelined@Pool?SET:?0.401?seconds
另外,經(jīng)測試分布式中用到的機器越多迷扇,調(diào)用會越慢百揭。上面是2片,下面是5片:
Simple@Sharing?SET:?5.494?seconds
Pipelined@Sharing?SET:?0.51?seconds
Simple@Pool?SET:?5.223?seconds
Pipelined@Pool?SET:?0.518?seconds
下面是10片:
Simple@Sharing?SET:?5.9?seconds
Pipelined@Sharing?SET:?0.794?seconds
Simple@Pool?SET:?5.624?seconds
Pipelined@Pool?SET:?0.762?seconds
下面是100片:
Simple@Sharing?SET:?14.055?seconds
Pipelined@Sharing?SET:?8.185?seconds
Simple@Pool?SET:?13.29?seconds
Pipelined@Pool?SET:?7.767?seconds
分布式中蜓席,連接池方式調(diào)用不但線程安全外器一,根據(jù)上面的測試數(shù)據(jù),也可以看出連接池比直連的效率更好厨内。
十一祈秕、完整的測試代碼
package?com.example.nosqlclient;
import?java.util.Arrays;
import?java.util.List;
import?org.junit.AfterClass;
import?org.junit.BeforeClass;
import?org.junit.Test;
import?redis.clients.jedis.Jedis;
import?redis.clients.jedis.JedisPoolConfig;
import?redis.clients.jedis.JedisShardInfo;
import?redis.clients.jedis.Pipeline;
import?redis.clients.jedis.ShardedJedis;
import?redis.clients.jedis.ShardedJedisPipeline;
import?redis.clients.jedis.ShardedJedisPool;
import?redis.clients.jedis.Transaction;
import?org.junit.FixMethodOrder;
import?org.junit.runners.MethodSorters;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public?class?TestJedis?{
private?static?Jedis?jedis;
private?static?ShardedJedis?sharding;
private?static?ShardedJedisPool?pool;
@BeforeClass
public?static?void?setUpBeforeClass()?throws?Exception?{
List?shards?=?Arrays.asList(
new?JedisShardInfo("localhost",6379),
new?JedisShardInfo("localhost",6379));?//使用相同的ip:port,僅作測試
jedis?=?new?Jedis("localhost");
sharding?=?new?ShardedJedis(shards);
pool?=?new?ShardedJedisPool(new?JedisPoolConfig(),?shards);
}
@AfterClass
public?static?void?tearDownAfterClass()?throws?Exception?{
jedis.disconnect();
sharding.disconnect();
pool.destroy();
}
@Test
public?void?test1Normal()?{
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
String?result?=?jedis.set("n"?+?i,?"n"?+?i);
}
long?end?=?System.currentTimeMillis();
System.out.println("Simple?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
}
@Test
public?void?test2Trans()?{
long?start?=?System.currentTimeMillis();
Transaction?tx?=?jedis.multi();
for?(int?i?=?0;?i?<?100000;?i++)?{
tx.set("t"?+?i,?"t"?+?i);
}
//System.out.println(tx.get("t1000").get());
List?results?=?tx.exec();
long?end?=?System.currentTimeMillis();
System.out.println("Transaction?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
}
@Test
public?void?test3Pipelined()?{
Pipeline?pipeline?=?jedis.pipelined();
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
pipeline.set("p"?+?i,?"p"?+?i);
}
//System.out.println(pipeline.get("p1000").get());
List?results?=?pipeline.syncAndReturnAll();
long?end?=?System.currentTimeMillis();
System.out.println("Pipelined?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
}
@Test
public?void?test4combPipelineTrans()?{
long?start?=?System.currentTimeMillis();
Pipeline?pipeline?=?jedis.pipelined();
pipeline.multi();
for?(int?i?=?0;?i?<?100000;?i++)?{
pipeline.set(""?+?i,?""?+?i);
}
pipeline.exec();
List?results?=?pipeline.syncAndReturnAll();
long?end?=?System.currentTimeMillis();
System.out.println("Pipelined?transaction:?"?+?((end?-?start)/1000.0)?+?"?seconds");
}
@Test
public?void?test5shardNormal()?{
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
String?result?=?sharding.set("sn"?+?i,?"n"?+?i);
}
long?end?=?System.currentTimeMillis();
System.out.println("Simple@Sharing?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
}
@Test
public?void?test6shardpipelined()?{
ShardedJedisPipeline?pipeline?=?sharding.pipelined();
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
pipeline.set("sp"?+?i,?"p"?+?i);
}
List?results?=?pipeline.syncAndReturnAll();
long?end?=?System.currentTimeMillis();
System.out.println("Pipelined@Sharing?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
}
@Test
public?void?test7shardSimplePool()?{
ShardedJedis?one?=?pool.getResource();
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
String?result?=?one.set("spn"?+?i,?"n"?+?i);
}
long?end?=?System.currentTimeMillis();
pool.returnResource(one);
System.out.println("Simple@Pool?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
}
@Test
public?void?test8shardPipelinedPool()?{
ShardedJedis?one?=?pool.getResource();
ShardedJedisPipeline?pipeline?=?one.pipelined();
long?start?=?System.currentTimeMillis();
for?(int?i?=?0;?i?<?100000;?i++)?{
pipeline.set("sppn"?+?i,?"n"?+?i);
}
List?results?=?pipeline.syncAndReturnAll();
long?end?=?System.currentTimeMillis();
pool.returnResource(one);
System.out.println("Pipelined@Pool?SET:?"?+?((end?-?start)/1000.0)?+?"?seconds");
}
}