深入剖析Redis客戶端Jedis的特性和原理

一、開篇

Redis作為目前通用的快取選型,因其高性能而倍受歡迎。Redis的2.x版本僅支援單機模式,從3.0版本開始引入集群模式。

Redis的Java生態的客戶端當中包含Jedis、Redisson、Lettuce,不同的客戶端具備不同的能力是使用方式,本文主要分析Jedis客戶端。

Jedis客戶端同時支援單機模式、分片模式、集群模式的訪問模式,通過構建Jedis類對象實現單機模式下的數據訪問,通過構建ShardedJedis類對象實現分片模式的數據訪問,通過構建JedisCluster類對象實現集群模式下的數據訪問。

Jedis客戶端支援單命令和Pipeline方式訪問Redis集群,通過Pipeline的方式能夠提高集群訪問的效率。

本文的整體分析基於Jedis的3.5.0版本進行分析,相關源碼均參考此版本。

二、Jedis訪問模式對比

Jedis客戶端操作Redis主要分為三種模式,分表是單機模式、分片模式、集群模式。

  • 單機模式主要是創建Jedis對象來操作單節點的Redis,只適用於訪問單個Redis節點。

  • 分片模式(ShardedJedis)主要是通過創建ShardedJedisPool對象來訪問分片模式的多個Redis節點,是Redis沒有集群功能之前客戶端實現的一個數據分散式方案,本質上是客戶端通過一致性哈希來實現數據分散式存儲。

  • 集群模式(JedisCluster)主要是通過創建JedisCluster對象來訪問集群模式下的多個Redis節點,是Redis3.0引入集群模式後客戶端實現的集群訪問訪問,本質上是通過引入槽(slot)概念以及通過CRC16哈希槽演算法來實現數據分散式存儲。

單機模式不涉及任何分片的思想,所以我們著重分析分片模式和集群模式的理念。

2.1 分片模式

  • 分片模式本質屬於基於客戶端的分片,在客戶端實現如何根據一個key找到Redis集群中對應的節點的方案。

  • Jedis的客戶端分片模式採用一致性Hash來實現,一致性Hash演算法的好處是當Redis節點進行增減時只會影響新增或刪除節點前後的小部分數據,相對於取模等演算法來說對數據的影響範圍較小。

  • Redis在大部分場景下作為快取進行使用,所以不用考慮數據丟失致使快取穿透造成的影響,在Redis節點增減時可以不用考慮部分數據無法命中的問題。

分片模式的整體應用如下圖所示,核心在於客戶端的一致性Hash策略。

(引用自:www.cnblogs.com)

2.2 集群模式

集群模式本質屬於伺服器分片技術,由Redis集群本身提供分片功能,從Redis 3.0版本開始正式提供。

集群的原理是:一個 Redis 集群包含16384 個哈希槽(Hash slot), Redis保存的每個鍵都屬於這16384個哈希槽的其中一個, 集群使用公式CRC16(key)%16384 來計算鍵 key 屬於哪個槽, 其中 CRC16(key) 語句用於計算鍵key的CRC16校驗和 。

集群中的每個節點負責處理一部分哈希槽。舉個例子, 一個集群可以有三個哈希槽, 其中:

  • 節點 A 負責處理 0 號至 5500 號哈希槽。

  • 節點 B 負責處理 5501 號至 11000 號哈希槽。

  • 節點 C 負責處理 11001 號至 16383 號哈希槽。

Redis在集群模式下對於key的讀寫過程首先將對應的key值進行CRC16計算得到對應的哈希值,將哈希值對槽位總數取模映射到對應的槽位,最終映射到對應的節點進行讀寫。以命令set(“key”, “value”)為例子,它會使用CRC16演算法對key進行計算得到哈希值28989,然後對16384進行取模得到12605,最後找到12605對應的Redis節點,最終跳轉到該節點執行set命令。

集群模式的整體應用如下圖所示,核心在於集群哈希槽的設計以及重定向命令。

(引用自:www.jianshu.com)

三、Jedis的基礎用法

// Jedis單機模式的訪問
public void main(String[] args) {
    // 創建Jedis對象
    jedis = new Jedis("localhost", 6379);
    // 執行hmget操作
    jedis.hmget("foobar", "foo");
    // 關閉Jedis對象
    jedis.close();
}
 
// Jedis分片模式的訪問
public void main(String[] args) {
    HostAndPort redis1 = HostAndPortUtil.getRedisServers().get(0);
    HostAndPort redis2 = HostAndPortUtil.getRedisServers().get(1);
    List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(2);
    JedisShardInfo shard1 = new JedisShardInfo(redis1);
    JedisShardInfo shard2 = new JedisShardInfo(redis2);
    // 創建ShardedJedis對象
    ShardedJedis shardedJedis = new ShardedJedis(shards);
    // 通過ShardedJedis對象執行set操作
    shardedJedis.set("a", "bar");
}
 
// Jedis集群模式的訪問
public void main(String[] args) {
    // 構建redis的集群池
    Set<HostAndPort> nodes = new HashSet<>();
    nodes.add(new HostAndPort("127.0.0.1", 7001));
    nodes.add(new HostAndPort("127.0.0.1", 7002));
    nodes.add(new HostAndPort("127.0.0.1", 7003));
 
    // 創建JedisCluster
    JedisCluster cluster = new JedisCluster(nodes);
 
    // 執行JedisCluster對象中的方法
    cluster.set("cluster-test", "my jedis cluster test");
    String result = cluster.get("cluster-test");
}

Jedis通過創建Jedis的類對象來實現單機模式下的數據訪問,通過構建JedisCluster類對象來實現集群模式下的數據訪問。

要理解Jedis的訪問Redis的整個過程,可以通過先理解單機模式下的訪問流程,在這個基礎上再分析集群模式的訪問流程會比較合適。

四、Jedis單機模式的訪問

Jedis訪問單機模式Redis的整體流程圖如下所示,從圖中可以看出核心的流程包含Jedis對象的創建以及通過Jedis對象實現Redis的訪問。

熟悉Jedis訪問單機Redis的過程,本身就是需要了解Jedis的創建過程以及執行Redis命令的過程。

  • Jedis的創建過程核心在於創建Jedis對象以及Jedis內部變數Client對象。

  • Jedis訪問Redis的過程在於通過Jedis內部的Client對象訪問Redis。

4.1 創建過程

Jedis本身的類關係圖如下圖所示,從圖中我們能夠看到Jedis繼承自BinaryJedis類。

在BinaryJedis類中存在和Redis對接的Client類對象,Jedis通過父類的BinaryJedis的Client對象實現Redis的讀寫。

Jedis類在創建過程中通過父類BinaryJedis創建了Client對象,而了解Client對象是進一步理解訪問過程的關鍵。

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
 
  protected JedisPoolAbstract dataSource = null;
 
  public Jedis(final String host, final int port) {
    // 創建父類BinaryJedis對象
    super(host, port);
  }
}
 
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
 
  // 訪問redis的Client對象
  protected Client client = null;
 
  public BinaryJedis(final String host, final int port) {
    // 創建Client對象訪問redis
    client = new Client(host, port);
  }
}

Client類的類關係圖如下圖所示,Client對象繼承自BinaryClient和Connection類。在BinaryClient類中存在Redis訪問密碼等相關參數,在Connection類在存在訪問Redis的socket對象以及對應的輸入輸出流。本質上Connection是和Redis進行通訊的核心類。

Client類在創建過程中初始化核心父類Connection對象,而Connection是負責和Redis直接進行通訊。

public class Client extends BinaryClient implements Commands {
  public Client(final String host, final int port) {
    super(host, port);
  }
}
 
public class BinaryClient extends Connection {
  // 存儲和Redis連接的相關資訊
  private boolean isInMulti;
  private String user;
  private String password;
  private int db;
  private boolean isInWatch;
 
  public BinaryClient(final String host, final int port) {
    super(host, port);
  }
}
 
public class Connection implements Closeable {
  // 管理和Redis連接的socket資訊及對應的輸入輸出流
  private JedisSocketFactory jedisSocketFactory;
  private Socket socket;
  private RedisOutputStream outputStream;
  private RedisInputStream inputStream;
  private int infiniteSoTimeout = 0;
  private boolean broken = false;
 
  public Connection(final String host, final int port, final boolean ssl,
      SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier) {
    // 構建DefaultJedisSocketFactory來創建和Redis連接的Socket對象
    this(new DefaultJedisSocketFactory(host, port, Protocol.DEFAULT_TIMEOUT,
        Protocol.DEFAULT_TIMEOUT, ssl, sslSocketFactory, sslParameters, hostnameVerifier));
  }
}

4.2 訪問過程

以Jedis執行set命令為例,整個過程如下:

  • Jedis的set操作是通過Client的set操作來實現的。

  • Client的set操作是通過父類Connection的sendCommand來實現。

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
  @Override
  public String set(final String key, final String value) {
    checkIsInMultiOrPipeline();
    // client執行set操作
    client.set(key, value);
    return client.getStatusCodeReply();
  }
}
 
public class Client extends BinaryClient implements Commands {
  @Override
  public void set(final String key, final String value) {
    // 執行set命令
    set(SafeEncoder.encode(key), SafeEncoder.encode(value));
  }
}
 
public class BinaryClient extends Connection {
  public void set(final byte[] key, final byte[] value) {
    // 發送set指令
    sendCommand(SET, key, value);
  }
}
 
public class Connection implements Closeable {
  public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      // socket連接redis
      connect();
      // 按照redis的協議發送命令
      Protocol.sendCommand(outputStream, cmd, args);
    } catch (JedisConnectionException ex) {
    }
  }
}

五、Jedis分片模式的訪問

基於前面已經介紹的Redis分片模式的一致性Hash的原理來理解Jedis的分片模式的訪問。

關於Redis分片模式的概念:Redis在3.0版本之前沒有集群模式的概念,這導致單節點能夠存儲的數據有限,通過Redis的客戶端如Jedis在客戶端通過一致性Hash演算法來實現數據的分片存儲。

本質上Redis的分片模式跟Redis本身沒有任何關係,只是通過客戶端來解決單節點數據有限存儲的問題。

ShardedJedis訪問Redis的核心在於構建對象的時候初始化一致性Hash對象,構建一致性Hash經典的Hash值和node的映射關係。構建完映射關係後執行set等操作就是Hash值到node的定址過程,定址完成後直接進行單節點的操作。

5.1 創建過程

ShardedJedis的創建過程在於父類的Sharded中關於一致性Hash相關的初始化過程,核心在於構建一致性的虛擬節點以及虛擬節點和Redis節點的映射關係。

源碼中最核心的部分程式碼在於根據根據權重映射成未160個虛擬節點,通過虛擬節點來定位到具體的Redis節點。

public class Sharded<R, S extends ShardInfo<R>> {
 
  public static final int DEFAULT_WEIGHT = 1;
  // 保存虛擬節點和redis的node節點的映射關係
  private TreeMap<Long, S> nodes;
  // hash演算法
  private final Hashing algo;
  // 保存redis節點和訪問該節點的Jedis的連接資訊
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();
 
  public Sharded(List<S> shards, Hashing algo) {
    this.algo = algo;
    initialize(shards);
  }
 
  private void initialize(List<S> shards) {
    nodes = new TreeMap<>();
    // 遍歷每個redis的節點並設置hash值到節點的映射關係
    for (int i = 0; i != shards.size(); ++i) {
      final S shardInfo = shards.get(i);
      // 根據權重映射成未160個虛擬節點
      int N =  160 * shardInfo.getWeight();
      if (shardInfo.getName() == null) for (int n = 0; n < N; n++) {
        // 構建hash值和節點映射關係
        nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
      }
      else for (int n = 0; n < N; n++) {
        nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo);
      }
      // 保存每個節點的訪問對象
      resources.put(shardInfo, shardInfo.createResource());
    }
  }
}

5.2 訪問過程

ShardedJedis的訪問過程就是一致性Hash的計算過程,核心的邏輯就是:通過Hash演算法對訪問的key進行Hash計算生成Hash值,根據Hash值獲取對應Redis節點,根據對應的Redis節點獲取對應的訪問對象Jedis。

獲取訪問對象Jedis之後就可以直接進行命令操作。

public class Sharded<R, S extends ShardInfo<R>> {
 
  public static final int DEFAULT_WEIGHT = 1;
  private TreeMap<Long, S> nodes;
  private final Hashing algo;
  // 保存redis節點和訪問該節點的Jedis的連接資訊
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();
 
  public R getShard(String key) {
    // 根據redis節點找到對應的訪問對象Jedis
    return resources.get(getShardInfo(key));
  }
 
  public S getShardInfo(String key) {
    return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
  }
 
  public S getShardInfo(byte[] key) {
    // 針對訪問的key生成對應的hash值
    // 根據hash值找到對應的redis節點
    SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
    if (tail.isEmpty()) {
      return nodes.get(nodes.firstKey());
    }
    return tail.get(tail.firstKey());
  }
}

六、Jedis集群模式的訪問

基於前面介紹的Redis的集群原理來理解Jedis的集群模式的訪問。

Jedis能夠實現key和哈希槽的定位的核心機制在於哈希槽和Redis節點的映射,而這個發現過程基於Redis的cluster slot命令。

關於Redis集群操作的命令:Redis通過cluster slots會返回Redis集群的整體狀況。返回每一個Redis節點的資訊包含:

  • 哈希槽起始編號

  • 哈希槽結束編號

  • 哈希槽對應master節點,節點使用IP/Port表示

  • master節點的第一個副本

  • master節點的第二個副本

127.0.0.1:30001> cluster slots
1) 1) (integer) 0 // 開始槽位
   2) (integer) 5460 // 結束槽位
   3) 1) "127.0.0.1" // master節點的host
      2) (integer) 30001 // master節點的port
      3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" // 節點的編碼
   4) 1) "127.0.0.1" // slave節點的host
      2) (integer) 30004 // slave節點的port
      3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" // 節點的編碼
2) 1) (integer) 5461
   2) (integer) 10922
   3) 1) "127.0.0.1"
      2) (integer) 30002
      3) "c9d93d9f2c0c524ff34cc11838c2003d8c29e013"
   4) 1) "127.0.0.1"
      2) (integer) 30005
      3) "faadb3eb99009de4ab72ad6b6ed87634c7ee410f"
3) 1) (integer) 10923
   2) (integer) 16383
   3) 1) "127.0.0.1"
      2) (integer) 30003
      3) "044ec91f325b7595e76dbcb18cc688b6a5b434a1"
   4) 1) "127.0.0.1"
      2) (integer) 30006
      3) "58e6e48d41228013e5d9c1c37c5060693925e97e"

Jedis訪問集群模式Redis的整體流程圖如下所示,從圖中可以看出核心的流程包含JedisCluster對象的創建以及通過JedisCluster對象實現Redis的訪問。

JedisCluster對象的創建核心在於創建JedisClusterInfoCache對象並通過集群發現來建立slot和集群節點的映射關係。

JedisCluster對Redis集群的訪問在於獲取key所在的Redis節點並通過Jedis對象進行訪問。

6.1 創建過程

JedisCluster的類關係如下圖所示,在圖中可以看到核心變數JedisSlotBasedConnectionHandler對象。

JedisCluster的父類BinaryJedisCluster創建了JedisSlotBasedConnectionHandler對象,該對象負責和Redis的集群進行通訊。

public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands,
    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
  public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
      int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
 
    // 訪問父類BinaryJedisCluster
    super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig,
        ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
  }
}
 
public class BinaryJedisCluster implements BinaryJedisClusterCommands,
    MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
  public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
      int maxAttempts, String user, String password, String clientName, GenericObjectPoolConfig poolConfig,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
 
    // 創建JedisSlotBasedConnectionHandler對象
    this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,
        connectionTimeout, soTimeout, user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
 
    this.maxAttempts = maxAttempts;
  }
}

JedisSlotBasedConnectionHandler的核心在於創建並初始化JedisClusterInfoCache對象,該對象快取了Redis集群的資訊。

JedisClusterInfoCache對象的初始化過程通過initializeSlotsCache來完成,主要目的用於實現集群節點和槽位發現。

public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
  public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig,
      int connectionTimeout, int soTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {
 
    super(nodes, poolConfig, connectionTimeout, soTimeout, user, password, clientName,
        ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
  }
}
 
public abstract class JedisClusterConnectionHandler implements Closeable {
  public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig,
      int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {
 
    // 創建JedisClusterInfoCache對象
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, infiniteSoTimeout,
        user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
 
    // 初始化jedis的Slot資訊
    initializeSlotsCache(nodes, connectionTimeout, soTimeout, infiniteSoTimeout,
        user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
  }
 
 
  private void initializeSlotsCache(Set<HostAndPort> startNodes,
      int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
    for (HostAndPort hostAndPort : startNodes) {
 
      try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
          soTimeout, infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier)) {
 
        // 通過discoverClusterNodesAndSlots進行集群發現
        cache.discoverClusterNodesAndSlots(jedis);
        return;
      } catch (JedisConnectionException e) {
      }
    }
  }
}

JedisClusterInfoCache的nodes用來保存Redis集群的節點資訊,slots用來保存槽位和集群節點的資訊。

nodes和slots維持的對象都是JedisPool對象,該對象維持了和Redis的連接資訊。集群的發現過程由discoverClusterNodesAndSlots來實現,本質是執行Redis的集群發現命令cluster slots實現的。

public class JedisClusterInfoCache {
  // 負責保存redis集群的節點資訊
  private final Map<String, JedisPool> nodes = new HashMap<>();
  // 負責保存redis的槽位和redis節點的映射關係
  private final Map<Integer, JedisPool> slots = new HashMap<>();
 
  // 負責集群的發現邏輯
  public void discoverClusterNodesAndSlots(Jedis jedis) {
    w.lock();
 
    try {
      reset();
      List<Object> slots = jedis.clusterSlots();
 
      for (Object slotInfoObj : slots) {
        List<Object> slotInfo = (List<Object>) slotInfoObj;
 
        if (slotInfo.size() <= MASTER_NODE_INDEX) {
          continue;
        }
        // 獲取redis節點對應的槽位資訊
        List<Integer> slotNums = getAssignedSlotArray(slotInfo);
 
        // hostInfos
        int size = slotInfo.size();
        for (int i = MASTER_NODE_INDEX; i < size; i++) {
          List<Object> hostInfos = (List<Object>) slotInfo.get(i);
          if (hostInfos.isEmpty()) {
            continue;
          }
 
          HostAndPort targetNode = generateHostAndPort(hostInfos);
          // 負責保存redis節點資訊
          setupNodeIfNotExist(targetNode);
          if (i == MASTER_NODE_INDEX) {
            // 負責保存槽位和redis節點的映射關係
            assignSlotsToNode(slotNums, targetNode);
          }
        }
      }
    } finally {
      w.unlock();
    }
  }
 
  public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
    w.lock();
    try {
      JedisPool targetPool = setupNodeIfNotExist(targetNode);
      // 保存槽位和對應的JedisPool對象
      for (Integer slot : targetSlots) {
        slots.put(slot, targetPool);
      }
    } finally {
      w.unlock();
    }
  }
 
  public JedisPool setupNodeIfNotExist(HostAndPort node) {
    w.lock();
    try {
      // 生產redis節點對應的nodeKey
      String nodeKey = getNodeKey(node);
      JedisPool existingPool = nodes.get(nodeKey);
      if (existingPool != null) return existingPool;
      // 生產redis節點對應的JedisPool
      JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
          connectionTimeout, soTimeout, infiniteSoTimeout, user, password, 0, clientName,
          ssl, sslSocketFactory, sslParameters, hostnameVerifier);
      // 保存redis節點的key和對應的JedisPool對象
      nodes.put(nodeKey, nodePool);
      return nodePool;
    } finally {
      w.unlock();
    }
  }
}

JedisPool的類關係如下圖所示,其中內部internalPool是通過apache common pool來實現的池化。

JedisPool內部的internalPool通過JedisFactory的makeObject來創建Jedis對象。

每個Redis節點都會對應一個JedisPool對象,通過JedisPool來管理Jedis的申請釋放復用等。

public class JedisPool extends JedisPoolAbstract {
 
  public JedisPool() {
    this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT);
  }
}
 
public class JedisPoolAbstract extends Pool<Jedis> {
 
  public JedisPoolAbstract() {
    super();
  }
}
 
public abstract class Pool<T> implements Closeable {
  protected GenericObjectPool<T> internalPool;
 
  public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
    if (this.internalPool != null) {
      try {
        closeInternalPool();
      } catch (Exception e) {
      }
    }
    this.internalPool = new GenericObjectPool<>(factory, poolConfig);
  }
}
 
class JedisFactory implements PooledObjectFactory<Jedis> {
   
  @Override
  public PooledObject<Jedis> makeObject() throws Exception {
    // 創建Jedis對象
    final HostAndPort hp = this.hostAndPort.get();
    final Jedis jedis = new Jedis(hp.getHost(), hp.getPort(), connectionTimeout, soTimeout,
        infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
 
    try {
      // Jedis對象連接
      jedis.connect();
      if (user != null) {
        jedis.auth(user, password);
      } else if (password != null) {
        jedis.auth(password);
      }
      if (database != 0) {
        jedis.select(database);
      }
      if (clientName != null) {
        jedis.clientSetname(clientName);
      }
    } catch (JedisException je) {
      jedis.close();
      throw je;
    }
    // 將Jedis對象包裝成DefaultPooledObject進行返回
    return new DefaultPooledObject<>(jedis);
  }
}

6.2 訪問過程

JedisCluster訪問Redis的過程通過JedisClusterCommand來實現重試機制,最終通過Jedis對象來實現訪問。從實現的角度來說JedisCluster是在Jedis之上封裝了一層,進行集群節點定位以及重試機制等。

以set命令為例,整個訪問通過JedisClusterCommand實現如下:

  • 計算key所在的Redis節點。

  • 獲取Redis節點對應的Jedis對象。

  • 通過Jedis對象進行set操作。

public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands,
    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
 
  @Override
  public String set(final String key, final String value, final SetParams params) {
    return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
      @Override
      public String execute(Jedis connection) {
        return connection.set(key, value, params);
      }
    }.run(key);
  }
}

JedisClusterCommand的run方法核心主要定位Redis的key所在的Redis節點,然後獲取與該節點對應的Jedis對象進行訪問。

在Jedis對象訪問異常後,JedisClusterCommand會進行重試操作並按照一定策略執行renewSlotCache方法進行重集群節點重發現動作。

public abstract class JedisClusterCommand<T> {
  public T run(String key) {
    // 針對key進行槽位的計算
    return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
  }
   
  private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
 
    Jedis connection = null;
    try {
 
      if (redirect != null) {
        connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
        if (redirect instanceof JedisAskDataException) {
          connection.asking();
        }
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
          // 根據slot去獲取Jedis對象
          connection = connectionHandler.getConnectionFromSlot(slot);
        }
      }
      // 執行真正的Redis的命令
      return execute(connection);
    } catch (JedisNoReachableClusterNodeException jnrcne) {
      throw jnrcne;
    } catch (JedisConnectionException jce) {
 
      releaseConnection(connection);
      connection = null;
 
      if (attempts <= 1) {
        // 保證最後兩次機會去重新刷新槽位和節點的對應的資訊
        this.connectionHandler.renewSlotCache();
      }
      // 按照重試次數進行重試操作
      return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
    } catch (JedisRedirectionException jre) {
      // 針對返回Move命令立即觸發重新刷新槽位和節點的對應資訊
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }
 
      releaseConnection(connection);
      connection = null;
 
      return runWithRetries(slot, attempts - 1, false, jre);
    } finally {
      releaseConnection(connection);
    }
  }
}

JedisSlotBasedConnectionHandler的cache對象維持了slot和node的映射關係,通過getConnectionFromSlot方法來獲取該slot對應的Jedis對象。

public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
 
  protected final JedisClusterInfoCache cache;
 
  @Override
  public Jedis getConnectionFromSlot(int slot) {
    // 獲取槽位對應的JedisPool對象
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      // 從JedisPool對象中獲取Jedis對象
      return connectionPool.getResource();
    } else {
      // 獲取失敗就重新刷新槽位資訊
      renewSlotCache();
      connectionPool = cache.getSlotPool(slot);
      if (connectionPool != null) {
        return connectionPool.getResource();
      } else {
        //no choice, fallback to new connection to random node
        return getConnection();
      }
    }
  }
}

七、Jedis的Pipeline實現

Pipeline的技術核心思想是將多個命令發送到伺服器而不用等待回復,最後在一個步驟中讀取該答覆。這種模式的好處在於節省了請求響應這種模式的網路開銷。

Redis的普通命令如set和Pipeline批量操作的核心的差別在於set命令的操作會直接發送請求到Redis並同步等待結果返回,而Pipeline的操作會發送請求但不立即同步等待結果返回,具體的實現可以從Jedis的源碼一探究竟。

原生的Pipeline在集群模式下相關的key必須Hash到同一個節點才能生效,原因在於Pipeline下的Client對象只能其中的一個節點建立了連接。

在集群模式下歸屬於不同節點的key能夠使用Pipeline就需要針對每個key保存對應的節點的client對象,在最後執行獲取數據的時候一併獲取。本質上可以認為在單節點的Pipeline的基礎上封裝成一個集群式的Pipeline。

7.1 Pipeline用法分析

Pipeline訪問單節點的Redis的時候,通過Jedis對象的Pipeline方法返回Pipeline對象,其他的命令操作通過該Pipeline對象進行訪問。

Pipeline從使用角度來分析,會批量發送多個命令並最後統一使用syncAndReturnAll來一次性返回結果。

public void pipeline() {
    jedis = new Jedis(hnp.getHost(), hnp.getPort(), 500);
    Pipeline p = jedis.pipelined();
    // 批量發送命令到redis
    p.set("foo", "bar");
    p.get("foo");
    // 同步等待響應結果
    List<Object> results = p.syncAndReturnAll();
 
    assertEquals(2, results.size());
    assertEquals("OK", results.get(0));
    assertEquals("bar", results.get(1));
 }
 
 
public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {
 
  @Override
  public Response<String> set(final String key, final String value) {
    // 發送命令
    getClient(key).set(key, value);
    // pipeline的getResponse只是把待響應的請求聚合到pipelinedResponses對象當中
    return getResponse(BuilderFactory.STRING);
  }
}
 
 
public class Queable {
 
  private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
  protected <T> Response<T> getResponse(Builder<T> builder) {
    Response<T> lr = new Response<>(builder);
    // 統一保存到響應隊列當中
    pipelinedResponses.add(lr);
    return lr;
  }
}
 
 
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
 
  public List<Object> syncAndReturnAll() {
    if (getPipelinedResponseLength() > 0) {
      // 根據批量發送命令的個數即需要批量返回命令的個數,通過client對象進行批量讀取
      List<Object> unformatted = client.getMany(getPipelinedResponseLength());
      List<Object> formatted = new ArrayList<>();
      for (Object o : unformatted) {
        try {
          // 格式化每個返回的結果並最終保存在列表中進行返回
          formatted.add(generateResponse(o).get());
        } catch (JedisDataException e) {
          formatted.add(e);
        }
      }
      return formatted;
    } else {
      return java.util.Collections.<Object> emptyList();
    }
  }
}

普通set命令發送請求給Redis後立即通過getStatusCodeReply來獲取響應結果,所以這是一種請求響應的模式。

getStatusCodeReply在獲取響應結果的時候會通過flush()命令強制發送報文到Redis服務端然後通過讀取響應結果。

public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
 
  @Override
  public String set(final byte[] key, final byte[] value) {
    checkIsInMultiOrPipeline();
    // 發送命令
    client.set(key, value);
    // 等待請求響應
    return client.getStatusCodeReply();
  }
}
 
 
public class Connection implements Closeable {
  public String getStatusCodeReply() {
    // 通過flush立即發送請求
    flush();
    // 處理響應請求
    final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
    if (null == resp) {
      return null;
    } else {
      return SafeEncoder.encode(resp);
    }
  }
}
 
 
public class Connection implements Closeable {
  protected void flush() {
    try {
      // 針對輸出流進行flush操作保證報文的發出
      outputStream.flush();
    } catch (IOException ex) {
      broken = true;
      throw new JedisConnectionException(ex);
    }
  }
}

八、結束語

Jedis作為Redis官方首選的Java客戶端開發包,支援絕大部分的Redis的命令,也是日常中使用較多的Redis客戶端。

了解了Jedis的實現原理,除了能夠支援Redis的日常操作外,還能更好的應對Redis的額外操作諸如擴容時的技術選型。

通過介紹Jedis針對單機模式、分配模式、集群模式三種場景訪問方式,讓大家有個從宏觀到微觀的理解過程,掌握Jedis的核心思想並更好的應用到實踐當中。

作者:vivo互聯網伺服器團隊-Wang Zhi