JedisCluster使用pipeline操作Redis Cluster最詳細從0到1實現過程

公眾號文章鏈接://mp.weixin.qq.com/s/6fMsG009RukLW954UUndbw

前言

2020年4月30日,Redis 6.0.0正式發布,標誌著redis從此告別單執行緒。在此之前,在大數據生產環境中使用的是一個30個節點的Codis集群,SparkStreaming以此作為快取,QPS高峰大概在2000w/s。

因為Codis不再更新迭代,於是在Redis 6.0.6版本發布的時候搭建了Redis Cluster,新的應用將不再使用Codis。之前連接Codis使用的Java客戶端是Jedis,通過Pipeline方式批次執行命令,以此來提高效率。而Redis Cluster的客戶端JedisCluster沒有提供Pipeline方式,只能單條執行命令,於是開始考慮其他的Java客戶端。

這裡備選了兩個客戶端:lettuceRedisson

pipeline原理

這裡先說一下Jedis的pipeline的原理。通過pipeline對redis的所有操作命令,都會先放到一個List中,當pipeline直接執行或者通過jedis.close()調用sync()的時候,所有的命令都會一次性地發送到客戶端,並且每個操作命令返回一個response,通過get來獲取操作結果。

lettuce

lettuce提供了async非同步方式來實現pipeline的功能,來測試一下是否可按批次處理命令。

測試程式碼:

public static void main(String[] args) throws Exception {
        RedisURI uri = RedisURI.builder()
                .withHost("47.102.xxx.xxx")
                .withPassword("Redis6.0.6".toCharArray())
                .withPort(10001)
                .build();
        RedisClusterClient client = RedisClusterClient.create(uri);
        StatefulRedisClusterConnection<String, String> connect = client.connect();
        RedisAdvancedClusterAsyncCommands<String, String> async = connect.async();
        // 斷點1
        async.set("key1", "v1");
        Thread.sleep(1000 * 3);
        // 斷點2
        async.set("key2", "v2");
        // 斷點3
        async.flushCommands();
        Thread.sleep(1000 * 3);
        connect.close();
        client.shutdown();
}

在程式中設置三個斷點。如果是pipeline的話,只有執行完斷點3,兩條set命令才會執行。
運行結果:
運行結果
結果表明還未到flushCommands(),第一個set命令已經執行。到這你可能就會以為lettuce其實還是逐條命令執行,只是開啟了非同步請求模式。其實不然,在lettuce非同步操作中,默認開啟了命令自動刷新功能,所以給你的假象還是逐條執行,在此需要禁用自動刷新來開啟pipeline功能。

在set()之前加上一行程式碼:

async.setAutoFlushCommands(false);

運行結果:
運行結果

Redisson

redisson提供了batch來實現pipeline的功能。

測試程式碼:

 Config config = new Config();
 config.useClusterServers()
       .addNodeAddress("redis://47.102.219.86:10001")
       .setPassword("[email protected]");
 RedissonClient redisson = Redisson.create(config);
 RBatch batch = redisson.createBatch();
 String key = "test";
 for (int i = 1; i < 3; i++) {
      batch.getMap(key + i).putAsync(String.valueOf(i), String.valueOf(i));
	}
 // 打上斷點
 batch.execute();
 redisson.shutdown();

這裡我們在execute()處打上斷點,debug運行程式。
運行結果:
執行結果
結果表明Redisson會將命令放在一個batch中,當執行execute()時,會將命令一次性發送到redis執行。雖然Redisson實現了pipeline的功能,但是我最後還是放棄了它。原因很簡單,它的方法不像jedis和lettuce一樣簡單明了,和redis的操作命令相差太多,導致使用起來比較繁瑣。

Jedis Cluster Pipeline

原因

開頭也提到了,Jedis對Redis Cluster提供了JedisCluster客戶端,但是沒有Pipeline模式,那麼JedisCluster為什麼不支援Pipeline?

在redis中一共有16384個Slot,每個節點負責一部分Slot,當對Key進行操作時,redis會通過CRC16計算出key對應的Slot,將Key映射到Slot所在節點上執行操作。

因為不同Key映射的節點不同,所以JedisCluster需要持有Redis Cluster每個節點的連接才能執行操作,而Pipeline是面向於一個redis連接的執行模式,所以JedisCluster無法支援Pipeline。

那麼我們自己有沒有辦法利用JedisCluster去封裝一個具有Pipeline模式的客戶端?

思路

剛剛提到,JedisCluster會持有Redis Cluster所有節點的連接。那麼,如果我們可以獲取到所有節點的連接,對每個節點的連接都開啟Pipeline。首先計算出每個Key所在的Slot,再找到Slot對應節點,就可以將Key放到對應節點連接的Pipeline上,這樣不就實現了集群版的Pipeline了么!

我們要做的工作就是找到對應關係,將每個Key分配到對應的節點連接中。

秉著不重複造輪子的觀點,我們先看看JedisCluster是如何執行命令的?

JedisCluster

先寫樣例,並在get()處打斷點。
JedisCluster
run()

CRC16

進入run(),可以看到JedisClusterCRC16提供了getSlot()方法,可以計算出Key所在的Slot
run()

run()裡面調用了runWithRetries(),這是核心方法之一,Step into

// 據方法調用參數刪除了部分程式碼
private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
    Jedis connection = null;
    try {
    	// false
    	if (tryRandomNode) {
      		connection = connectionHandler.getConnection();
    	} else {
	    	// 重點:從方法名看,是根據slot來獲取jedis連接!!
      		connection = connectionHandler.getConnectionFromSlot(slot);
    	}
      	return execute(connection);
    } catch (JedisNoReachableClusterNodeException jnrcne) {
      throw jnrcne;
    } catch (JedisConnectionException jce) {
      // 釋放連接
      releaseConnection(connection);
      connection = null;
      if (attempts <= 1) {
      	// 刷新slots
        this.connectionHandler.renewSlotCache();
      }
      return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
    } 
  }

從runWithRetries()可以看到,JedisCluster通過調用getConnectionFromSlot(slot)來獲取jedis連接,這裡實現了Slot和Jedis的關係

那麼connectionHandler為什麼可以提供redis連接?

connectionHandler

查看connectionHandler變數資訊

connectionHandler
可以看到它有一個JedisClusterInfoCache類型的成員變數cache,cache有兩個HashMap類型的成員變數nodes和slots,nodes保存節點和JedisPool的映射關係,slots保存16384個slot和JedisPool的映射關係,這裡slot和節點實現了映射關係

接著看一下getConnectionFromSlot()

getConnectionFromSlot()
getSlotPool()
可以看出,cache調用getSlotPool(),從成員變數slots中通過slot取到了相應節點的JedisPool。

簡單的畫一下流程圖:

流程圖

至此,所有輪子都已經具備,開始造車。
Pipeline

實現Pipeline

我們只要獲取到connectionHandler變數,就可以使用它的成員變數cache來獲取Jedis。

connectionHandler是JedisCluster的成員變數,在其父類BinaryJedisCluster中找到了此變數。
BinaryJedisCluster
cache是connectionHandler的成員變數,在其父類JedisClusterConnectionHandler找到了此變數。
JedisClusterConnectionHandler
connectionHandler和cache都是protected變數,外部類無法直接訪問,所以需要定義子類訪問變數。

自定義ConnectionHandler

目的:使用cache保存的Cluster資訊,用其來獲取JedisPool。

public class JedisSlotConnectionHandlerImp extends JedisSlotBasedConnectionHandler implements Serializable {
    public JedisSlotConnectionHandlerImp(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
        super(nodes, poolConfig, connectionTimeout, soTimeout, password);
    }

    // 自定義通過slot獲取JedisPool的方法
    // 為了保證後面一個JedisPool只取一個Jedis
    public JedisPool getJedisPoolFromSlot(int slot) {
        JedisPool jedisPool = cache.getSlotPool(slot);
        if (jedisPool != null) {
            return jedisPool;
        } else {
            renewSlotCache();
            jedisPool = cache.getSlotPool(slot);
            if (jedisPool != null) {
                return jedisPool;
            } else {
                throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
            }
        }
    }
}

自定義ClusterPipeline

目的:使用connectionHandler來建立key、slot以及JedisPool之間關係映射

public class JedisClusterPipeline extends JedisCluster implements Serializable {
    // 覆蓋父類中的connectionHandler
    protected JedisSlotConnectionHandlerImp connectionHandler;
    public JedisClusterPipeline(HashSet node, int connectionTimeout, int soTimeout, int maxAttempts, String password, GenericObjectPoolConfig poolConfig) {
        super(node, connectionTimeout, soTimeout, maxAttempts, password, poolConfig);
        connectionHandler = new JedisSlotConnectionHandlerImp(node, poolConfig, connectionTimeout, soTimeout, password);
    }
	// 通過key轉換成slot,再獲取JedisPool
    public JedisPool getJedisPoolFromSlot(String key) {
        return connectionHandler.getJedisPoolFromSlot(JedisClusterCRC16.getSlot(key));
    }
}

使用

使用自定義的JedisClusterPipeline,需要自己實現set、get、hget等方法來覆蓋父類JedisCluster對應的方法。最初的目的是應用於Spark將維度資訊存入Redis Cluster,當時是用scala面向RDD的partition實現了集群版的hmset()方法。

這裡臨時用Java實現一下Pipeline的set()方法。

實現set()

public class JedisClusterPipelineCommand {
    /**
     * 自定義的pipeline模式set方法
     * @param key 存放的key
     * @param value 存放的value
     * @param clusterPipeline 用來獲取JedisPool
     * @param pipelines 建立JedisPool和pipeline映射,保證一個JedisPool只開啟一個pipeline
     * @param jedisMap 建立pipeline和Jedis映射,用來釋放Jedis
     * @param nums 記錄每個pipeline放入key的條數
     * @param threshold pipeline進行sync的閾值
     */
    public static void setByPipeline(String key, String value, JedisClusterPipeline clusterPipeline, ConcurrentHashMap<JedisPool, Pipeline> pipelines, ConcurrentHashMap<Pipeline, Jedis> jedisMap,  ConcurrentHashMap<Pipeline, Integer> nums, int threshold) {
        JedisPool jedisPool = clusterPipeline.getJedisPoolFromSlot(key);
        // 查看對應節點是否已經開啟了pipeline
        Pipeline pipeline = pipelines.get(jedisPool);
        if (pipeline == null) {
            Jedis jedis = jedisPool.getResource();
            pipeline = jedis.pipelined();
            // 構建映射關係,保證每個節點只有一個jedis來開啟pipeline
            jedisMap.put(pipeline, jedis);
            pipelines.put(jedisPool, pipeline);
            nums.put(pipeline, 0);
        }else {
            int num = nums.get(pipeline);
            nums.put(pipeline, num + 1);
            if (num % threshold == 0) {
                pipeline.sync();
            }
        }
        pipeline.set(key, value);
    }

    /**
     * 釋放jedis並強制pipeline sync
     */
    public static void releaseConnection(ConcurrentHashMap<Pipeline, Jedis> jedisMap) {
        for (Jedis jedis : jedisMap.values()) {
            jedis.close();
        }
    }
}

執行類

    public static void main(String[] args) throws Exception {
        JedisPoolConfig config = new JedisPoolConfig();
        HashSet jedisClusterNodes = new java.util.HashSet<HostAndPort>();
        jedisClusterNodes.add(new HostAndPort("47.102.xxx.xx", 10001));
        JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(jedisClusterNodes, 1000, 1000, 10, "Redis6", config);
        ConcurrentHashMap<JedisPool, Pipeline> pipelines = new ConcurrentHashMap<>();
        ConcurrentHashMap<Pipeline, Jedis> jedisMap = new ConcurrentHashMap<>();
        ConcurrentHashMap<Pipeline, Integer> nums = new ConcurrentHashMap<>();
        for (int i = 0; i < 1000; i++) {
            JedisClusterPipelineCommand.setByPipeline("k" + i, "v" + i, jedisClusterPipeline, pipelines, jedisMap, nums, 100 );
        }
        JedisClusterPipelineCommand.releaseConnection(jedisMap);
    }

執行結果
執行結果

性能測試

本機環境1000條數據

  • pipeline模式:2.32s

  • JedisCluster:68.6s

Spark on Yarn 128w條 Hash

  • 1Core 1G Pipeline:18s

本機環境測試結果受限於網路和主機配置,僅供比較參考。

結語

最後選擇自己實現pipeline,首先是因為比較了解pipeline的原理,說白了就是用習慣了。其次是在本機測試letttuce時,出現了一些意料之外的問題,目前還在探索中。下一步的工作就是慢慢的將Pipeline其他的方法實現,逐步優化,用於生產。


寫的都是日常工作中的親身實踐,處於自己的角度從0寫到1,保證能夠真正讓大家看懂。

文章會在公眾號 [入門到放棄之路] 首發,期待你的關注。

公眾號