Redis 客戶端 Jedis 的特性和原理

Redis 作為目前通用的緩存選型录煤,因其高性能而倍受歡迎搏予。Redis 的 2.x 版本僅支持單機模式借宵,從 3.0 版本開始引入集群模式橘荠。

Redis 的 Java 生態(tài)的客戶端當中包含 Jedis屿附、Redisson、Lettuce砾医,不同的客戶端具備不同的能力是使用方式拿撩,本文主要分析 Jedis 客戶端衣厘。

Jedis 客戶端同時支持單機模式如蚜、分片模式、集群模式的訪問模式影暴,通過構(gòu)建 Jedis 類對象實現(xiàn)單機模式下的數(shù)據(jù)訪問错邦,通過構(gòu)建 ShardedJedis 類對象實現(xiàn)分片模式的數(shù)據(jù)訪問,通過構(gòu)建 JedisCluster 類對象實現(xiàn)集群模式下的數(shù)據(jù)訪問型宙。

Jedis 客戶端支持單命令和 Pipeline 方式訪問 Redis 集群撬呢,通過 Pipeline 的方式能夠提高集群訪問的效率。

本文的整體分析基于 Jedis 的 3.5.0 版本進行分析妆兑,相關(guān)源碼均參考此版本魂拦。

一、Jedis 訪問模式對比

Jedis 客戶端操作 Redis 主要分為三種模式搁嗓,分表是單機模式芯勘、分片模式、集群模式腺逛。

  • 單機模式主要是創(chuàng)建 Jedis 對象來操作單節(jié)點的 Redis荷愕,只適用于訪問單個 Redis 節(jié)點。
  • 分片模式(ShardedJedis)主要是通過創(chuàng)建 ShardedJedisPool 對象來訪問分片模式的多個 Redis 節(jié)點棍矛,是 Redis 沒有集群功能之前客戶端實現(xiàn)的一個數(shù)據(jù)分布式方案安疗,本質(zhì)上是客戶端通過一致性哈希來實現(xiàn)數(shù)據(jù)分布式存儲。
  • 集群模式(JedisCluster)主要是通過創(chuàng)建 JedisCluster 對象來訪問集群模式下的多個 Redis 節(jié)點够委,是 Redis3.0 引入集群模式后客戶端實現(xiàn)的集群訪問訪問荐类,本質(zhì)上是通過引入槽(slot)概念以及通過 CRC16 哈希槽算法來實現(xiàn)數(shù)據(jù)分布式存儲。

單機模式不涉及任何分片的思想茁帽,所以我們著重分析分片模式和集群模式的理念玉罐。

1.1 分片模式

  • 分片模式本質(zhì)屬于基于客戶端的分片,在客戶端實現(xiàn)如何根據(jù)一個 key 找到 Redis 集群中對應的節(jié)點的方案脐雪。
  • Jedis 的客戶端分片模式采用一致性 Hash 來實現(xiàn)厌小,一致性 Hash 算法的好處是當 Redis 節(jié)點進行增減時只會影響新增或刪除節(jié)點前后的小部分數(shù)據(jù),相對于取模等算法來說對數(shù)據(jù)的影響范圍較小战秋。
  • Redis 在大部分場景下作為緩存進行使用璧亚,所以不用考慮數(shù)據(jù)丟失致使緩存穿透造成的影響,在 Redis 節(jié)點增減時可以不用考慮部分數(shù)據(jù)無法命中的問題。

分片模式的整體應用如下圖所示癣蟋,核心在于客戶端的一致性 Hash 策略透硝。


1.2 集群模式

集群模式本質(zhì)屬于服務器分片技術(shù),由 Redis 集群本身提供分片功能疯搅,從 Redis 3.0 版本開始正式提供濒生。

集群的原理是:一個 Redis 集群包含 16384 個哈希槽(Hash slot), Redis 保存的每個鍵都屬于這 16384 個哈希槽的其中一個幔欧, 集群使用公式 CRC16(key)%16384 來計算鍵 key 屬于哪個槽罪治, 其中 CRC16(key) 語句用于計算鍵 key 的 CRC16 校驗和 。

集群中的每個節(jié)點負責處理一部分哈希槽礁蔗。舉個例子觉义, 一個集群可以有三個哈希槽, 其中:

  • 節(jié)點 A 負責處理 0 號至 5500 號哈希槽浴井。
  • 節(jié)點 B 負責處理 5501 號至 11000 號哈希槽晒骇。
  • 節(jié)點 C 負責處理 11001 號至 16383 號哈希槽。

Redis 在集群模式下對于 key 的讀寫過程首先將對應的 key 值進行 CRC16 計算得到對應的哈希值磺浙,將哈希值對槽位總數(shù)取模映射到對應的槽位洪囤,最終映射到對應的節(jié)點進行讀寫。以命令 set("key", "value")為例子撕氧,它會使用 CRC16 算法對 key 進行計算得到哈希值 28989瘤缩,然后對 16384 進行取模得到 12605,最后找到 12605 對應的 Redis 節(jié)點呵曹,最終跳轉(zhuǎn)到該節(jié)點執(zhí)行 set 命令款咖。

集群模式的整體應用如下圖所示,核心在于集群哈希槽的設(shè)計以及重定向命令奄喂。

二铐殃、Jedis 的基礎(chǔ)用法

// Jedis單機模式的訪問
public void main(String[] args) {
    // 創(chuàng)建Jedis對象
    jedis = new Jedis("localhost", 6379);
    // 執(zhí)行hmget操作
    jedis.hmget("foobar", "foo");
    // 關(guān)閉Jedis對象
    jedis.close();
}
 
// Jedis分片模式的訪問
public void main(String[] args) {
    HostAndPort redis1 = HostAndPortUtil.getRedisServers().get(0);
    HostAndPort redis2 = HostAndPortUtil.getRedisServers().get(1);
    List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(2);
    JedisShardInfo shard1 = new JedisShardInfo(redis1);
    JedisShardInfo shard2 = new JedisShardInfo(redis2);
    // 創(chuàng)建ShardedJedis對象
    ShardedJedis shardedJedis = new ShardedJedis(shards);
    // 通過ShardedJedis對象執(zhí)行set操作
    shardedJedis.set("a", "bar");
}
 
// Jedis集群模式的訪問
public void main(String[] args) {
    // 構(gòu)建redis的集群池
    Set<HostAndPort> nodes = new HashSet<>();
    nodes.add(new HostAndPort("127.0.0.1", 7001));
    nodes.add(new HostAndPort("127.0.0.1", 7002));
    nodes.add(new HostAndPort("127.0.0.1", 7003));
 
    // 創(chuàng)建JedisCluster
    JedisCluster cluster = new JedisCluster(nodes);
 
    // 執(zhí)行JedisCluster對象中的方法
    cluster.set("cluster-test", "my jedis cluster test");
    String result = cluster.get("cluster-test");
}

Jedis 通過創(chuàng)建 Jedis 的類對象來實現(xiàn)單機模式下的數(shù)據(jù)訪問,通過構(gòu)建 JedisCluster 類對象來實現(xiàn)集群模式下的數(shù)據(jù)訪問跨新。

要理解 Jedis 的訪問 Redis 的整個過程富腊,可以通過先理解單機模式下的訪問流程,在這個基礎(chǔ)上再分析集群模式的訪問流程會比較合適域帐。

三赘被、Jedis 單機模式的訪問

Jedis 訪問單機模式 Redis 的整體流程圖如下所示,從圖中可以看出核心的流程包含 Jedis 對象的創(chuàng)建以及通過 Jedis 對象實現(xiàn) Redis 的訪問肖揣。

熟悉 Jedis 訪問單機 Redis 的過程民假,本身就是需要了解 Jedis 的創(chuàng)建過程以及執(zhí)行 Redis 命令的過程。

  • Jedis 的創(chuàng)建過程核心在于創(chuàng)建 Jedis 對象以及 Jedis 內(nèi)部變量 Client 對象龙优。
  • Jedis 訪問 Redis 的過程在于通過 Jedis 內(nèi)部的 Client 對象訪問 Redis羊异。


3.1 創(chuàng)建過程

Jedis 本身的類關(guān)系圖如下圖所示,從圖中我們能夠看到 Jedis 繼承自 BinaryJedis 類。

在 BinaryJedis 類中存在和 Redis 對接的 Client 類對象野舶,Jedis 通過父類的 BinaryJedis 的 Client 對象實現(xiàn) Redis 的讀寫易迹。



Jedis 類在創(chuàng)建過程中通過父類 BinaryJedis 創(chuàng)建了 Client 對象,而了解 Client 對象是進一步理解訪問過程的關(guān)鍵平道。

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
 
  protected JedisPoolAbstract dataSource = null;
 
  public Jedis(final String host, final int port) {
    // 創(chuàng)建父類BinaryJedis對象
    super(host, port);
  }
}
 
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
 
  // 訪問redis的Client對象
  protected Client client = null;
 
  public BinaryJedis(final String host, final int port) {
    // 創(chuàng)建Client對象訪問redis
    client = new Client(host, port);
  }
}

Client 類的類關(guān)系圖如下圖所示睹欲,Client 對象繼承自 BinaryClient 和 Connection 類。在 BinaryClient 類中存在 Redis 訪問密碼等相關(guān)參數(shù)一屋,在 Connection 類在存在訪問 Redis 的 socket 對象以及對應的輸入輸出流窘疮。本質(zhì)上 Connection 是和 Redis 進行通信的核心類。



Client 類在創(chuàng)建過程中初始化核心父類 Connection 對象陆淀,而 Connection 是負責和 Redis 直接進行通信考余。

public class Client extends BinaryClient implements Commands {
  public Client(final String host, final int port) {
    super(host, port);
  }
}
 
public class BinaryClient extends Connection {
  // 存儲和Redis連接的相關(guān)信息
  private boolean isInMulti;
  private String user;
  private String password;
  private int db;
  private boolean isInWatch;
 
  public BinaryClient(final String host, final int port) {
    super(host, port);
  }
}
 
public class Connection implements Closeable {
  // 管理和Redis連接的socket信息及對應的輸入輸出流
  private JedisSocketFactory jedisSocketFactory;
  private Socket socket;
  private RedisOutputStream outputStream;
  private RedisInputStream inputStream;
  private int infiniteSoTimeout = 0;
  private boolean broken = false;
 
  public Connection(final String host, final int port, final boolean ssl,
      SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier) {
    // 構(gòu)建DefaultJedisSocketFactory來創(chuàng)建和Redis連接的Socket對象
    this(new DefaultJedisSocketFactory(host, port, Protocol.DEFAULT_TIMEOUT,
        Protocol.DEFAULT_TIMEOUT, ssl, sslSocketFactory, sslParameters, hostnameVerifier));
  }
}

3.2 訪問過程

以 Jedis 執(zhí)行 set 命令為例先嬉,整個過程如下:

  • Jedis 的 set 操作是通過 Client 的 set 操作來實現(xiàn)的轧苫。
  • Client 的 set 操作是通過父類 Connection 的 sendCommand 來實現(xiàn)。
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
  @Override
  public String set(final String key, final String value) {
    checkIsInMultiOrPipeline();
    // client執(zhí)行set操作
    client.set(key, value);
    return client.getStatusCodeReply();
  }
}
 
public class Client extends BinaryClient implements Commands {
  @Override
  public void set(final String key, final String value) {
    // 執(zhí)行set命令
    set(SafeEncoder.encode(key), SafeEncoder.encode(value));
  }
}
 
public class BinaryClient extends Connection {
  public void set(final byte[] key, final byte[] value) {
    // 發(fā)送set指令
    sendCommand(SET, key, value);
  }
}
 
public class Connection implements Closeable {
  public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      // socket連接redis
      connect();
      // 按照redis的協(xié)議發(fā)送命令
      Protocol.sendCommand(outputStream, cmd, args);
    } catch (JedisConnectionException ex) {
    }
  }
}

四疫蔓、Jedis 分片模式的訪問

基于前面已經(jīng)介紹的 Redis 分片模式的一致性 Hash 的原理來理解 Jedis 的分片模式的訪問含懊。

關(guān)于 Redis 分片模式的概念:Redis 在 3.0 版本之前沒有集群模式的概念,這導致單節(jié)點能夠存儲的數(shù)據(jù)有限衅胀,通過 Redis 的客戶端如 Jedis 在客戶端通過一致性 Hash 算法來實現(xiàn)數(shù)據(jù)的分片存儲岔乔。

本質(zhì)上 Redis 的分片模式跟 Redis 本身沒有任何關(guān)系,只是通過客戶端來解決單節(jié)點數(shù)據(jù)有限存儲的問題滚躯。

ShardedJedis 訪問 Redis 的核心在于構(gòu)建對象的時候初始化一致性 Hash 對象雏门,構(gòu)建一致性 Hash 經(jīng)典的 Hash 值和 node 的映射關(guān)系。構(gòu)建完映射關(guān)系后執(zhí)行 set 等操作就是 Hash 值到 node 的尋址過程掸掏,尋址完成后直接進行單節(jié)點的操作茁影。

4.1 創(chuàng)建過程

ShardedJedis 的創(chuàng)建過程在于父類的 Sharded 中關(guān)于一致性 Hash 相關(guān)的初始化過程,核心在于構(gòu)建一致性的虛擬節(jié)點以及虛擬節(jié)點和 Redis 節(jié)點的映射關(guān)系丧凤。

源碼中最核心的部分代碼在于根據(jù)根據(jù)權(quán)重映射成未 160 個虛擬節(jié)點募闲,通過虛擬節(jié)點來定位到具體的 Redis 節(jié)點。

public class Sharded<R, S extends ShardInfo<R>> {
 
  public static final int DEFAULT_WEIGHT = 1;
  // 保存虛擬節(jié)點和redis的node節(jié)點的映射關(guān)系
  private TreeMap<Long, S> nodes;
  // hash算法
  private final Hashing algo;
  // 保存redis節(jié)點和訪問該節(jié)點的Jedis的連接信息
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();
 
  public Sharded(List<S> shards, Hashing algo) {
    this.algo = algo;
    initialize(shards);
  }
 
  private void initialize(List<S> shards) {
    nodes = new TreeMap<>();
    // 遍歷每個redis的節(jié)點并設(shè)置hash值到節(jié)點的映射關(guān)系
    for (int i = 0; i != shards.size(); ++i) {
      final S shardInfo = shards.get(i);
      // 根據(jù)權(quán)重映射成未160個虛擬節(jié)點
      int N =  160 * shardInfo.getWeight();
      if (shardInfo.getName() == null) for (int n = 0; n < N; n++) {
        // 構(gòu)建hash值和節(jié)點映射關(guān)系
        nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
      }
      else for (int n = 0; n < N; n++) {
        nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo);
      }
      // 保存每個節(jié)點的訪問對象
      resources.put(shardInfo, shardInfo.createResource());
    }
  }
}

4.2 訪問過程

ShardedJedis 的訪問過程就是一致性 Hash 的計算過程愿待,核心的邏輯就是:通過 Hash 算法對訪問的 key 進行 Hash 計算生成 Hash 值浩螺,根據(jù) Hash 值獲取對應 Redis 節(jié)點,根據(jù)對應的 Redis 節(jié)點獲取對應的訪問對象 Jedis仍侥。

獲取訪問對象 Jedis 之后就可以直接進行命令操作要出。

public class Sharded<R, S extends ShardInfo<R>> {
 
  public static final int DEFAULT_WEIGHT = 1;
  private TreeMap<Long, S> nodes;
  private final Hashing algo;
  // 保存redis節(jié)點和訪問該節(jié)點的Jedis的連接信息
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();
 
  public R getShard(String key) {
    // 根據(jù)redis節(jié)點找到對應的訪問對象Jedis
    return resources.get(getShardInfo(key));
  }
 
  public S getShardInfo(String key) {
    return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
  }
 
  public S getShardInfo(byte[] key) {
    // 針對訪問的key生成對應的hash值
    // 根據(jù)hash值找到對應的redis節(jié)點
    SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
    if (tail.isEmpty()) {
      return nodes.get(nodes.firstKey());
    }
    return tail.get(tail.firstKey());
  }
}

五、Jedis 集群模式的訪問

基于前面介紹的 Redis 的集群原理來理解 Jedis 的集群模式的訪問农渊。

Jedis 能夠?qū)崿F(xiàn) key 和哈希槽的定位的核心機制在于哈希槽和 Redis 節(jié)點的映射患蹂,而這個發(fā)現(xiàn)過程基于 Redis 的 cluster slot 命令。

關(guān)于 Redis 集群操作的命令:Redis 通過 cluster slots 會返回 Redis 集群的整體狀況。返回每一個 Redis 節(jié)點的信息包含:

  • 哈希槽起始編號
  • 哈希槽結(jié)束編號
  • 哈希槽對應 master 節(jié)點况脆,節(jié)點使用 IP/Port 表示
  • master 節(jié)點的第一個副本
  • master 節(jié)點的第二個副本
127.0.0.1:30001> cluster slots
1) 1) (integer) 0 // 開始槽位
   2) (integer) 5460 // 結(jié)束槽位
   3) 1) "127.0.0.1" // master節(jié)點的host
      2) (integer) 30001 // master節(jié)點的port
      3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" // 節(jié)點的編碼
   4) 1) "127.0.0.1" // slave節(jié)點的host
      2) (integer) 30004 // slave節(jié)點的port
      3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" // 節(jié)點的編碼
2) 1) (integer) 5461
   2) (integer) 10922
   3) 1) "127.0.0.1"
      2) (integer) 30002
      3) "c9d93d9f2c0c524ff34cc11838c2003d8c29e013"
   4) 1) "127.0.0.1"
      2) (integer) 30005
      3) "faadb3eb99009de4ab72ad6b6ed87634c7ee410f"
3) 1) (integer) 10923
   2) (integer) 16383
   3) 1) "127.0.0.1"
      2) (integer) 30003
      3) "044ec91f325b7595e76dbcb18cc688b6a5b434a1"
   4) 1) "127.0.0.1"
      2) (integer) 30006
      3) "58e6e48d41228013e5d9c1c37c5060693925e97e"

Jedis 訪問集群模式 Redis 的整體流程圖如下所示饭宾,從圖中可以看出核心的流程包含 JedisCluster 對象的創(chuàng)建以及通過 JedisCluster 對象實現(xiàn) Redis 的訪問。

JedisCluster 對象的創(chuàng)建核心在于創(chuàng)建 JedisClusterInfoCache 對象并通過集群發(fā)現(xiàn)來建立 slot 和集群節(jié)點的映射關(guān)系格了。

JedisCluster 對 Redis 集群的訪問在于獲取 key 所在的 Redis 節(jié)點并通過 Jedis 對象進行訪問看铆。

5.1 創(chuàng)建過程

JedisCluster 的類關(guān)系如下圖所示,在圖中可以看到核心變量 JedisSlotBasedConnectionHandler 對象盛末。


JedisCluster 的父類 BinaryJedisCluster 創(chuàng)建了 JedisSlotBasedConnectionHandler 對象弹惦,該對象負責和 Redis 的集群進行通信。

public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands,
    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
  public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
      int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
 
    // 訪問父類BinaryJedisCluster
    super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig,
        ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
  }
}
 
public class BinaryJedisCluster implements BinaryJedisClusterCommands,
    MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
  public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
      int maxAttempts, String user, String password, String clientName, GenericObjectPoolConfig poolConfig,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
 
    // 創(chuàng)建JedisSlotBasedConnectionHandler對象
    this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,
        connectionTimeout, soTimeout, user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
 
    this.maxAttempts = maxAttempts;
  }
}

JedisSlotBasedConnectionHandler 的核心在于創(chuàng)建并初始化 JedisClusterInfoCache 對象悄但,該對象緩存了 Redis 集群的信息棠隐。

JedisClusterInfoCache 對象的初始化過程通過 initializeSlotsCache 來完成,主要目的用于實現(xiàn)集群節(jié)點和槽位發(fā)現(xiàn)檐嚣。

public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
  public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig,
      int connectionTimeout, int soTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {
 
    super(nodes, poolConfig, connectionTimeout, soTimeout, user, password, clientName,
        ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
  }
}
 
public abstract class JedisClusterConnectionHandler implements Closeable {
  public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig,
      int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {
 
    // 創(chuàng)建JedisClusterInfoCache對象
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, infiniteSoTimeout,
        user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
 
    // 初始化jedis的Slot信息
    initializeSlotsCache(nodes, connectionTimeout, soTimeout, infiniteSoTimeout,
        user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
  }
 
 
  private void initializeSlotsCache(Set<HostAndPort> startNodes,
      int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
    for (HostAndPort hostAndPort : startNodes) {
 
      try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
          soTimeout, infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier)) {
 
        // 通過discoverClusterNodesAndSlots進行集群發(fā)現(xiàn)
        cache.discoverClusterNodesAndSlots(jedis);
        return;
      } catch (JedisConnectionException e) {
      }
    }
  }
}

JedisClusterInfoCache 的 nodes 用來保存 Redis 集群的節(jié)點信息助泽,slots 用來保存槽位和集群節(jié)點的信息。

nodes 和 slots 維持的對象都是 JedisPool 對象嚎京,該對象維持了和 Redis 的連接信息嗡贺。集群的發(fā)現(xiàn)過程由 discoverClusterNodesAndSlots 來實現(xiàn),本質(zhì)是執(zhí)行 Redis 的集群發(fā)現(xiàn)命令 cluster slots 實現(xiàn)的鞍帝。

public class JedisClusterInfoCache {
  // 負責保存redis集群的節(jié)點信息
  private final Map<String, JedisPool> nodes = new HashMap<>();
  // 負責保存redis的槽位和redis節(jié)點的映射關(guān)系
  private final Map<Integer, JedisPool> slots = new HashMap<>();
 
  // 負責集群的發(fā)現(xiàn)邏輯
  public void discoverClusterNodesAndSlots(Jedis jedis) {
    w.lock();
 
    try {
      reset();
      List<Object> slots = jedis.clusterSlots();
 
      for (Object slotInfoObj : slots) {
        List<Object> slotInfo = (List<Object>) slotInfoObj;
 
        if (slotInfo.size() <= MASTER_NODE_INDEX) {
          continue;
        }
        // 獲取redis節(jié)點對應的槽位信息
        List<Integer> slotNums = getAssignedSlotArray(slotInfo);
 
        // hostInfos
        int size = slotInfo.size();
        for (int i = MASTER_NODE_INDEX; i < size; i++) {
          List<Object> hostInfos = (List<Object>) slotInfo.get(i);
          if (hostInfos.isEmpty()) {
            continue;
          }
 
          HostAndPort targetNode = generateHostAndPort(hostInfos);
          // 負責保存redis節(jié)點信息
          setupNodeIfNotExist(targetNode);
          if (i == MASTER_NODE_INDEX) {
            // 負責保存槽位和redis節(jié)點的映射關(guān)系
            assignSlotsToNode(slotNums, targetNode);
          }
        }
      }
    } finally {
      w.unlock();
    }
  }
 
  public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
    w.lock();
    try {
      JedisPool targetPool = setupNodeIfNotExist(targetNode);
      // 保存槽位和對應的JedisPool對象
      for (Integer slot : targetSlots) {
        slots.put(slot, targetPool);
      }
    } finally {
      w.unlock();
    }
  }
 
  public JedisPool setupNodeIfNotExist(HostAndPort node) {
    w.lock();
    try {
      // 生產(chǎn)redis節(jié)點對應的nodeKey
      String nodeKey = getNodeKey(node);
      JedisPool existingPool = nodes.get(nodeKey);
      if (existingPool != null) return existingPool;
      // 生產(chǎn)redis節(jié)點對應的JedisPool
      JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
          connectionTimeout, soTimeout, infiniteSoTimeout, user, password, 0, clientName,
          ssl, sslSocketFactory, sslParameters, hostnameVerifier);
      // 保存redis節(jié)點的key和對應的JedisPool對象
      nodes.put(nodeKey, nodePool);
      return nodePool;
    } finally {
      w.unlock();
    }
  }
}

JedisPool 的類關(guān)系如下圖所示诫睬,其中內(nèi)部 internalPool 是通過 apache common pool 來實現(xiàn)的池化。



JedisPool 內(nèi)部的 internalPool 通過 JedisFactory 的 makeObject 來創(chuàng)建 Jedis 對象帕涌。

每個 Redis 節(jié)點都會對應一個 JedisPool 對象摄凡,通過 JedisPool 來管理 Jedis 的申請釋放復用等。

public class JedisPool extends JedisPoolAbstract {
 
  public JedisPool() {
    this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT);
  }
}
 
public class JedisPoolAbstract extends Pool<Jedis> {
 
  public JedisPoolAbstract() {
    super();
  }
}
 
public abstract class Pool<T> implements Closeable {
  protected GenericObjectPool<T> internalPool;
 
  public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
    if (this.internalPool != null) {
      try {
        closeInternalPool();
      } catch (Exception e) {
      }
    }
    this.internalPool = new GenericObjectPool<>(factory, poolConfig);
  }
}
 
class JedisFactory implements PooledObjectFactory<Jedis> {
   
  @Override
  public PooledObject<Jedis> makeObject() throws Exception {
    // 創(chuàng)建Jedis對象
    final HostAndPort hp = this.hostAndPort.get();
    final Jedis jedis = new Jedis(hp.getHost(), hp.getPort(), connectionTimeout, soTimeout,
        infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
 
    try {
      // Jedis對象連接
      jedis.connect();
      if (user != null) {
        jedis.auth(user, password);
      } else if (password != null) {
        jedis.auth(password);
      }
      if (database != 0) {
        jedis.select(database);
      }
      if (clientName != null) {
        jedis.clientSetname(clientName);
      }
    } catch (JedisException je) {
      jedis.close();
      throw je;
    }
    // 將Jedis對象包裝成DefaultPooledObject進行返回
    return new DefaultPooledObject<>(jedis);
  }
}

5.1 訪問過程

JedisCluster 訪問 Redis 的過程通過 JedisClusterCommand 來實現(xiàn)重試機制蚓曼,最終通過 Jedis 對象來實現(xiàn)訪問亲澡。從實現(xiàn)的角度來說 JedisCluster 是在 Jedis 之上封裝了一層,進行集群節(jié)點定位以及重試機制等辟躏。

以 set 命令為例谷扣,整個訪問通過 JedisClusterCommand 實現(xiàn)如下:

  • 計算 key 所在的 Redis 節(jié)點。
  • 獲取 Redis 節(jié)點對應的 Jedis 對象捎琐。
  • 通過 Jedis 對象進行 set 操作会涎。
public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands,
    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
 
  @Override
  public String set(final String key, final String value, final SetParams params) {
    return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
      @Override
      public String execute(Jedis connection) {
        return connection.set(key, value, params);
      }
    }.run(key);
  }
}

JedisClusterCommand 的 run 方法核心主要定位 Redis 的 key 所在的 Redis 節(jié)點,然后獲取與該節(jié)點對應的 Jedis 對象進行訪問瑞凑。

在 Jedis 對象訪問異常后末秃,JedisClusterCommand 會進行重試操作并按照一定策略執(zhí)行 renewSlotCache 方法進行重集群節(jié)點重發(fā)現(xiàn)動作。

public abstract class JedisClusterCommand<T> {
  public T run(String key) {
    // 針對key進行槽位的計算
    return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
  }
   
  private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
 
    Jedis connection = null;
    try {
 
      if (redirect != null) {
        connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
        if (redirect instanceof JedisAskDataException) {
          connection.asking();
        }
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
          // 根據(jù)slot去獲取Jedis對象
          connection = connectionHandler.getConnectionFromSlot(slot);
        }
      }
      // 執(zhí)行真正的Redis的命令
      return execute(connection);
    } catch (JedisNoReachableClusterNodeException jnrcne) {
      throw jnrcne;
    } catch (JedisConnectionException jce) {
 
      releaseConnection(connection);
      connection = null;
 
      if (attempts <= 1) {
        // 保證最后兩次機會去重新刷新槽位和節(jié)點的對應的信息
        this.connectionHandler.renewSlotCache();
      }
      // 按照重試次數(shù)進行重試操作
      return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
    } catch (JedisRedirectionException jre) {
      // 針對返回Move命令立即觸發(fā)重新刷新槽位和節(jié)點的對應信息
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }
 
      releaseConnection(connection);
      connection = null;
 
      return runWithRetries(slot, attempts - 1, false, jre);
    } finally {
      releaseConnection(connection);
    }
  }
}

JedisSlotBasedConnectionHandler 的 cache 對象維持了 slot 和 node 的映射關(guān)系籽御,通過 getConnectionFromSlot 方法來獲取該 slot 對應的 Jedis 對象练慕。

public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
 
  protected final JedisClusterInfoCache cache;
 
  @Override
  public Jedis getConnectionFromSlot(int slot) {
    // 獲取槽位對應的JedisPool對象
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      // 從JedisPool對象中獲取Jedis對象
      return connectionPool.getResource();
    } else {
      // 獲取失敗就重新刷新槽位信息
      renewSlotCache();
      connectionPool = cache.getSlotPool(slot);
      if (connectionPool != null) {
        return connectionPool.getResource();
      } else {
        //no choice, fallback to new connection to random node
        return getConnection();
      }
    }
  }
}

六惰匙、 Jedis 的 Pipeline 實現(xiàn)

Pipeline 的技術(shù)核心思想是將多個命令發(fā)送到服務器而不用等待回復,最后在一個步驟中讀取該答復铃将。這種模式的好處在于節(jié)省了請求響應這種模式的網(wǎng)絡(luò)開銷项鬼。

Redis 的普通命令如 set 和 Pipeline 批量操作的核心的差別在于 set 命令的操作會直接發(fā)送請求到 Redis 并同步等待結(jié)果返回,而 Pipeline 的操作會發(fā)送請求但不立即同步等待結(jié)果返回劲阎,具體的實現(xiàn)可以從 Jedis 的源碼一探究竟绘盟。

原生的 Pipeline 在集群模式下相關(guān)的 key 必須 Hash 到同一個節(jié)點才能生效,原因在于 Pipeline 下的 Client 對象只能其中的一個節(jié)點建立了連接悯仙。

在集群模式下歸屬于不同節(jié)點的 key 能夠使用 Pipeline 就需要針對每個 key 保存對應的節(jié)點的 client 對象龄毡,在最后執(zhí)行獲取數(shù)據(jù)的時候一并獲取。本質(zhì)上可以認為在單節(jié)點的 Pipeline 的基礎(chǔ)上封裝成一個集群式的 Pipeline锡垄。

6.1 Pipeline 用法分析

Pipeline 訪問單節(jié)點的 Redis 的時候沦零,通過 Jedis 對象的 Pipeline 方法返回 Pipeline 對象,其他的命令操作通過該 Pipeline 對象進行訪問货岭。

Pipeline 從使用角度來分析路操,會批量發(fā)送多個命令并最后統(tǒng)一使用 syncAndReturnAll 來一次性返回結(jié)果。

public void pipeline() {
    jedis = new Jedis(hnp.getHost(), hnp.getPort(), 500);
    Pipeline p = jedis.pipelined();
    // 批量發(fā)送命令到redis
    p.set("foo", "bar");
    p.get("foo");
    // 同步等待響應結(jié)果
    List<Object> results = p.syncAndReturnAll();
 
    assertEquals(2, results.size());
    assertEquals("OK", results.get(0));
    assertEquals("bar", results.get(1));
 }
 
 
public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {
 
  @Override
  public Response<String> set(final String key, final String value) {
    // 發(fā)送命令
    getClient(key).set(key, value);
    // pipeline的getResponse只是把待響應的請求聚合到pipelinedResponses對象當中
    return getResponse(BuilderFactory.STRING);
  }
}
 
 
public class Queable {
 
  private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
  protected <T> Response<T> getResponse(Builder<T> builder) {
    Response<T> lr = new Response<>(builder);
    // 統(tǒng)一保存到響應隊列當中
    pipelinedResponses.add(lr);
    return lr;
  }
}
 
 
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
 
  public List<Object> syncAndReturnAll() {
    if (getPipelinedResponseLength() > 0) {
      // 根據(jù)批量發(fā)送命令的個數(shù)即需要批量返回命令的個數(shù)茴她,通過client對象進行批量讀取
      List<Object> unformatted = client.getMany(getPipelinedResponseLength());
      List<Object> formatted = new ArrayList<>();
      for (Object o : unformatted) {
        try {
          // 格式化每個返回的結(jié)果并最終保存在列表中進行返回
          formatted.add(generateResponse(o).get());
        } catch (JedisDataException e) {
          formatted.add(e);
        }
      }
      return formatted;
    } else {
      return java.util.Collections.<Object> emptyList();
    }
  }
}

普通 set 命令發(fā)送請求給 Redis 后立即通過 getStatusCodeReply 來獲取響應結(jié)果寻拂,所以這是一種請求響應的模式。

getStatusCodeReply 在獲取響應結(jié)果的時候會通過 flush()命令強制發(fā)送報文到 Redis 服務端然后通過讀取響應結(jié)果丈牢。

public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
 
  @Override
  public String set(final byte[] key, final byte[] value) {
    checkIsInMultiOrPipeline();
    // 發(fā)送命令
    client.set(key, value);
    // 等待請求響應
    return client.getStatusCodeReply();
  }
}
 
 
public class Connection implements Closeable {
  public String getStatusCodeReply() {
    // 通過flush立即發(fā)送請求
    flush();
    // 處理響應請求
    final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
    if (null == resp) {
      return null;
    } else {
      return SafeEncoder.encode(resp);
    }
  }
}
 
 
public class Connection implements Closeable {
  protected void flush() {
    try {
      // 針對輸出流進行flush操作保證報文的發(fā)出
      outputStream.flush();
    } catch (IOException ex) {
      broken = true;
      throw new JedisConnectionException(ex);
    }
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市瞄沙,隨后出現(xiàn)的幾起案子己沛,更是在濱河造成了極大的恐慌,老刑警劉巖距境,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件申尼,死亡現(xiàn)場離奇詭異,居然都是意外死亡垫桂,警方通過查閱死者的電腦和手機师幕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來诬滩,“玉大人霹粥,你說我怎么就攤上這事√勰瘢” “怎么了后控?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長空镜。 經(jīng)常有香客問我浩淘,道長捌朴,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任张抄,我火速辦了婚禮砂蔽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘署惯。我一直安慰自己察皇,他們只是感情好,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布泽台。 她就那樣靜靜地躺著什荣,像睡著了一般。 火紅的嫁衣襯著肌膚如雪怀酷。 梳的紋絲不亂的頭發(fā)上稻爬,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音蜕依,去河邊找鬼桅锄。 笑死,一個胖子當著我的面吹牛样眠,可吹牛的內(nèi)容都是我干的友瘤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼檐束,長吁一口氣:“原來是場噩夢啊……” “哼辫秧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起被丧,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤盟戏,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后甥桂,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柿究,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年黄选,在試婚紗的時候發(fā)現(xiàn)自己被綠了蝇摸。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡办陷,死狀恐怖貌夕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情懂诗,我是刑警寧澤蜂嗽,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站殃恒,受9級特大地震影響植旧,放射性物質(zhì)發(fā)生泄漏辱揭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一病附、第九天 我趴在偏房一處隱蔽的房頂上張望问窃。 院中可真熱鬧,春花似錦完沪、人聲如沸域庇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽听皿。三九已至,卻和暖如春宽档,著一層夾襖步出監(jiān)牢的瞬間尉姨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工吗冤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留又厉,地道東北人。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓椎瘟,卻偏偏與公主長得像覆致,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子肺蔚,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容