1. Redis通信協(xié)議
??????Jedis Client是Redis官網(wǎng)推薦的一個(gè)面向java客戶端酌壕,庫文件實(shí)現(xiàn)了對redis各類API進(jìn)行封裝調(diào)用。redis通信協(xié)議是Redis客戶端與Redis Server之間交流的語言汪诉,它規(guī)定了請求和返回值的格式。redis-cli與server端使用一種專門為redis設(shè)計(jì)的協(xié)議RESP(Redis Serialization Protocol)交互棚潦,Resp本身沒有指定TCP儡率,但redis上下文只使用TCP連接。
??????RESP規(guī)定:
- 用 \r\n 做間隔
- 對于簡單的字符串胖喳,以+開頭
set hello world
+OK\r\n
- 對于錯(cuò)誤消息泡躯,以-開頭,例如:
sethx // 該命令不存在
-ERR unknown command 'sethx'
- 對于整數(shù)丽焊,以:開頭较剃,例如:
dbsize
:100\r\n
- 對于大字符串,以$開頭技健,接著跟上字符串長度的數(shù)字:最長為512MB写穴。例如:
get name
$6\r\nfoobar\r\n 代表一個(gè)長6的字符串, foobar
$0\r\n\r\n 長度為0 的空字符串
$-1\r\n Null
- 對于數(shù)組雌贱,以*開頭啊送,接上數(shù)組元素的個(gè)數(shù)。
*0\r\n 一個(gè)空的數(shù)組
mset name1 foo name2 bar
mget name1 name2
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n 一個(gè)有兩個(gè)元素的數(shù)組 foo bar
- 我們知道redis-cli只能看到最終的執(zhí)行結(jié)果欣孤,因?yàn)閞edis-cli本身就是按照RESP進(jìn)行結(jié)果解析的馋没,所以看不到中間結(jié)果。
- 通過RESP降传,執(zhí)行一個(gè)命令篷朵,客戶端與服務(wù)端的交互步驟如下:
輸入命令->將命令編碼成字節(jié)流->通過TCP發(fā)送到服務(wù)端->服務(wù)端解析字節(jié)流->服務(wù)端執(zhí)行命令->
->將結(jié)果編碼成字節(jié)流->通過TCP鏈接發(fā)送給客戶端->解析字節(jié)流->得到執(zhí)行結(jié)果
比如執(zhí)行set hello world,根據(jù)resp協(xié)議,需要客戶端解析為下面格式字節(jié)流發(fā)送給服務(wù)端
*3\r\n
$3\r\nset\r\n
$5\r\nhello\r\n
$5\r\nworld\r\n
2. jedis通信原理
??????試想婆排,如果讓我們自己根據(jù)上面提到的協(xié)議用java去實(shí)現(xiàn)一個(gè)客戶端與redis服務(wù)端實(shí)現(xiàn)通信声旺,該怎么做呢?
public class TSocketClient {
// 定義socket
private Socket socket;
public TSocketClient() {
try {
socket = new Socket("192.168.58.99", 6379);
} catch (IOException e) {
e.printStackTrace();
}
}
public String set(final String key, final String value) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append("*3").append("\r\n");
sb.append("$3").append("\r\n");
sb.append("set").append("\r\n");
sb.append("$").append(key.getBytes().length).append("\r\n");
sb.append(key).append("\r\n");
sb.append("$").append(value.getBytes().length).append("\r\n");
sb.append(value).append("\r\n");
socket.getOutputStream().write(sb.toString().getBytes());
byte[] b = new byte[2048];
socket.getInputStream().read(b);
return new String(b);
}
public String get(final String key) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append("*2").append("\r\n"); // *表示數(shù)組 后面數(shù)字表示數(shù)組長度
sb.append("$3").append("\r\n");
sb.append("get").append("\r\n");
sb.append("$").append(key.getBytes().length).append("\r\n"); // 美元符號表示字符串段只,后面的數(shù)字表示長度
sb.append(key).append("\r\n");
socket.getOutputStream().write(sb.toString().getBytes());
byte[] b = new byte[2048];
socket.getInputStream().read(b);
return new String(b);
}
public static void main(String[] args) throws IOException {
TSocketClient client = new TSocketClient();
client.set("hello", "ziyan");
}
}
???? 上面代碼通過實(shí)現(xiàn)resp協(xié)議實(shí)現(xiàn)了與redis服務(wù)端的通信腮猖,其實(shí)jedis客戶端本質(zhì)上也是通過建立socket按照resp協(xié)議與redis通信,下面來分析jedis具體的代碼:
???第一部分:jedis對象的創(chuàng)建:Jedis jedis = new Jedis(); 主要是創(chuàng)建連接Redis服務(wù)器的客戶端翼悴,在Jedis基類BinaryJedis中主要有Connection對象缚够,創(chuàng)建jedis對象的時(shí)候尚未連接到redis服務(wù)器,在Connection類中,主要設(shè)置了鏈接Redis所使用socket的參數(shù)以及操作socket所使用的工具鹦赎。
//創(chuàng)建Redis客戶端
Jedis jedis = new Jedis();
//調(diào)用set 命令,返回狀態(tài)標(biāo)記
String code=jedis.set("s", "s");
System.out.println("code="+code);
//調(diào)用get命令
String s =jedis.get("s");
System.out.println("s="+s);
//Jedis客戶端鏈接,使用原始socket進(jìn)行鏈接
public class Connection implements Closeable
{
private static final byte[][] EMPTY_ARGS = new byte[0][];
//默認(rèn)主機(jī)
private String host = Protocol.DEFAULT_HOST;
//默認(rèn)端口
private int port = Protocol.DEFAULT_PORT;
//原始socket
private Socket socket;
//輸入輸出流
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
private int soTimeout = Protocol.DEFAULT_TIMEOUT;
private boolean broken = false;
public Connection() {
}
}
???從Connection的成員變量中可以看出谍椅,jedis使用了jdk的io socket來處理網(wǎng)絡(luò)通信。
??第二部分:在調(diào)用 String code=jedis.set("s", "s"); 命令的時(shí)候,才是真正創(chuàng)建鏈接的過程古话。Client(BinaryClient).set(byte[], byte[]) 方法參數(shù)就是把由String 字符串轉(zhuǎn)換成字節(jié)數(shù)值雏吭,并調(diào)用Connection的sendCommand方法來發(fā)送Redis命令。
//每次發(fā)送命令前都判斷是否鏈接,如果鏈接端口并且鏈接不上陪踩,則拋出異常
protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
try {
connect();//每次發(fā)送Redis命令都會調(diào)用Connect()方法來鏈接Redis遠(yuǎn)程服務(wù)器
Protocol.sendCommand(outputStream, cmd, args); //操作socket 的輸出流來發(fā)送命令
return this;
} catch (JedisConnectionException ex) {
/*
* When client send request which formed by invalid protocol, Redis send back error message
* before close connection. We try to read it to provide reason of failure.
*/
try {
String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
if (errorMessage != null && errorMessage.length() > 0) {
ex = new JedisConnectionException(errorMessage, ex.getCause());
}
} catch (Exception e) {
/*
* Catch any IOException or JedisConnectionException occurred from InputStream#read and just
* ignore. This approach is safe because reading error message is optional and connection
* will eventually be closed.
*/
}
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}
每次調(diào)用sendCommand發(fā)送命令時(shí)候杖们,都會調(diào)用Connnect()方法嘗試鏈接遠(yuǎn)程端口悉抵。
//在發(fā)送命令之前連接redis服務(wù)器
public void connect() {
if (!isConnected()) {
try {
//創(chuàng)建新socket
socket = new Socket();
//設(shè)置socket參數(shù)
socket.setReuseAddress(true);
socket.setKeepAlive(true); // Will monitor the TCP connection is
// valid
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
// ensure timely delivery of data
socket.setSoLinger(true, 0); // Control calls close () method,
// the underlying socket is closed
// immediately
// <-@wjw_add
//設(shè)置鏈接超時(shí)時(shí)間
socket.connect(new InetSocketAddress(host, port), connectionTimeout);
//設(shè)置讀取超時(shí)時(shí)間
socket.setSoTimeout(soTimeout);
//獲取socket原始輸入輸出流
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
}
每次鏈接到遠(yuǎn)程Redis服務(wù)器后,第一個(gè)命令就是發(fā)送密鑰命令摘完,這是一個(gè)BinaryClient的重寫方法姥饰,方法里還是調(diào)用父類Connection的connect方法。
@Override
public void connect() {
if (!isConnected()) {
super.connect();
if (password != null) {
auth(password);
getStatusCodeReply();
}
if (db > 0) {
select(Long.valueOf(db).intValue());
getStatusCodeReply();
}
}
}
在每次發(fā)送一個(gè)命令后,都會去獲取返回碼孝治。
public String set(final String key, String value) {
checkIsInMultiOrPipeline();
client.set(key, value);
return client.getStatusCodeReply();
}
在取狀態(tài)碼時(shí)列粪,每次都去刷新通道。讀取數(shù)據(jù)流最終通過SocketInputStream 類來讀取谈飒。
public String getStatusCodeReply() {
flush();
pipelinedCommands--;
final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
if (null == resp) {
return null;
} else {
return SafeEncoder.encode(resp);
}
}
3. jedis的使用
???? jedis的使用方法很簡單:
// 創(chuàng)建Redis客戶端
Jedis jedis = new Jedis("192.168.58.99", 6379);
// 調(diào)用set 命令,返回狀態(tài)標(biāo)記
String code=jedis.set("s", "xxx");
System.out.println("code="+code);
// 調(diào)用get命令
String s =jedis.get("s");
System.out.println("s="+s);
????上面代碼中只用到了包含ip和端口兩個(gè)參數(shù)的構(gòu)造函數(shù)岂座,更常用的是包含四個(gè)參數(shù)的構(gòu)造函數(shù):
public Jedis(final String host, final int port, final int connectionTimeout, final int soTimeout)
// connectionTimeout 表示客戶端連接超時(shí)
// soTimeout 表示客戶端讀寫超時(shí)
3.1 為什么要用jedis連接池?
?????雖然基于內(nèi)存的Redis數(shù)據(jù)庫有著超高的性能杭措,但是底層的網(wǎng)絡(luò)通信卻占用了一次數(shù)據(jù)請求的大量時(shí)間费什,因?yàn)槊看螖?shù)據(jù)交互都需要先建立連接,假設(shè)一次數(shù)據(jù)交互總共用時(shí)30ms手素,超高性能的Redis數(shù)據(jù)庫處理數(shù)據(jù)所花的時(shí)間可能不到1ms鸳址,也即是說前期的連接占用了29ms,上面介紹的jedis直連方式刑桑,也就是每次new jedis都會新建tcp連接氯质,使用后再斷開連接,這對于頻繁訪問redis的場景顯然不是高效的使用方式祠斧。連接池則可以實(shí)現(xiàn)在客戶端建立多個(gè)鏈接并且不釋放闻察,當(dāng)需要使用連接的時(shí)候通過一定的算法獲取已經(jīng)建立的連接,使用完了以后則還給連接池琢锋,這就免去了數(shù)據(jù)庫連接所占用的時(shí)間辕漂。因此,通常會使用連接池的方式對Jedis連接進(jìn)行管理吴超,所有jedis對象會預(yù)先放在池子中(JedisPool)钉嘹,每次要連接redis,只需要在池子中借鲸阻,用完了再歸還給池子跋涣。
??????客戶端連接Redis使用的是TCP協(xié)議,直連的方式每次需要建立TCP連接鸟悴,而連接池的方式是可以預(yù)先初始化好jedis連接陈辱,每次只需要從jedis連接池借用即可,借用和歸還操作是在本地進(jìn)行的细诸,只有少量的并發(fā)同步開銷沛贪,遠(yuǎn)遠(yuǎn)小于新建tcp連接的開銷。此外,連接池的方式可以有效保護(hù)和控制資源的使用利赋,而直連的方式無法限制jedis對象的個(gè)數(shù)水评,并且可能存在連接泄漏的情況。
??????Jedis提供了JedisPool這個(gè)類作為Jedis的連接池媚送,同時(shí)使用Apache的通用對象池工具common-pool作為資源的管理工具中燥。JedisPoolConfig繼承了GenericObjectPoolConfig,提供了很多參數(shù)配置:
- maxActive: 控制一個(gè)pool可分配多少個(gè)jedis實(shí)例,如果pool已經(jīng)分配了maxActive個(gè)jedis實(shí)例,此時(shí)pool的狀態(tài)為exhausted季希。
- maxIdle: 控制一個(gè)pool最多有多少個(gè)狀態(tài)為idle的jedis實(shí)例褪那。(如果超出了這個(gè)閾值,會close掉超出的連接)
- whenExhaustedAction: 表示當(dāng)pool中的jedis實(shí)例被allocated完時(shí)式塌,pool要采取的操作:默認(rèn)有三種:
??????WHEN_EXHAUSTED_FAIL : 表示無jedis實(shí)例時(shí),直接拋出NoSuchElementException友浸,
??????WHEN_EXHAUSTED_BLOCK 表示阻塞住峰尝,達(dá)到maxWait時(shí)拋出,JedisConnectionException
??????WHEN_EXHAUSTED_GROW 表示新建一個(gè)jedis實(shí)例收恢,設(shè)置的maxActive無用武学。 - maxwait: 表示borrow一個(gè)jedis實(shí)例時(shí),最大的等待時(shí)間伦意,如果超出等待時(shí)間火窒,直接拋出jedisConnectionException。
- testOnBorrow:在borrow一個(gè)jedis實(shí)例時(shí)驮肉,是否提前進(jìn)行validate操作熏矿;如果為true,則使用redisPING PONG命令測試redis連接是否有效,保證得到的jedis實(shí)例均是可用的离钝;
- testOnReturn:在return給pool時(shí)票编,是否提前進(jìn)行validate操作;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class TestJedis {
public static final Logger logger = LoggerFactory.getLogger(TestJedis.class);
// Jedispool
JedisCommands jedisCommands;
JedisPool jedisPool;
// common-pool 連接池配置卵渴,
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
String ip = "192.168.58.99";
int port = 6379;
int timeout = 2000;
public TestJedis() {
// 初始化jedis
// 設(shè)置配置
jedisPoolConfig.setMaxTotal(1024);
jedisPoolConfig.setMaxIdle(100);
jedisPoolConfig.setMaxWaitMillis(100);
jedisPoolConfig.setTestOnBorrow(false);
jedisPoolConfig.setTestOnReturn(true);
// 初始化JedisPool
jedisPool = new JedisPool(jedisPoolConfig, ip, port, timeout);
//
Jedis jedis = jedisPool.getResource();
jedisCommands = jedis;
}
public void setValue(String key, String value) {
this.jedisCommands.set(key, value);
}
public String getValue(String key) {
return this.jedisCommands.get(key);
}
public static void main(String[] args) {
TestJedis testJedis = new TestJedis();
testJedis.setValue("testJedisKey", "testJedisValue");
logger.info("get value from redis:{}",testJedis.getValue("testJedisKey"));
}
}
對于jedis對象池的原理可參考我的上一篇文章jedis對象池
3.2 jedis pipeline
?????我們知道redis提供了mget慧域、mset方法,但沒有提供mdel方法浪读,如果要實(shí)現(xiàn)這個(gè)功能昔榴,可以借助Pipeline來模擬批量刪除,Jedis支持Pipeline特性碘橘,可以通過jedis實(shí)現(xiàn)互订。
public void mdel(List<String> keys) {
Jedis jedis = new Jedis("192.168.58.99");
Pipeline pipeline = jedis.pipelined();
for (String key : keys) {
pipeline.del(key); // 此時(shí)命令并非真正執(zhí)行
}
// 真正執(zhí)行命令
pipeline.sync(); // 除了pipline.sync(),還可以使用pipeline.syncAndReturnAll()將pipeline的命令進(jìn)行返回。
}