Redis的使用(二)
一、redis簡單應用
其實在寫這個redis專題時我想了很久,我覺得redis沒什麼好說的,因為現在是個人都會用redis,但是我在寫netty專題時發現,netty裡面很多東西和概念有很多跟redis的很多應用和底層很相似和可以借鑒的地方,所以後來想想,還是寫個專題來簡單聊聊。按照我以前的習慣在寫應用前我是要寫一篇中間件的安裝,但redis的安裝這次不寫了,因為安裝過於簡單,但是看這專題的朋友記得,我後面所寫的所有內容是基於redis6版本的基礎上進行寫的。如果看過官網的朋友可以知道,redis6和以往版本最大的區別在於他引入了多執行緒IO,對於6以前的單執行緒redis來說,性能瓶頸主要在於網路的 IO 消耗, 所以新版本優化主要有兩個方向:
- 提高網路 IO 性能,典型的實現像使用 DPDK 來替代內核網路棧的方式
- 使用多執行緒充分利用多核,典型的實現像 Memcached
官網://spring.io/projects/spring-data-redis
具體底層實現我會在後面篇幅會寫,這裡就不過多說明,下面就將springboot項目集成redis作一個簡單的過程演示。
引入pom文件
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
修改application.yml文件
spring: redis: # redis資料庫索引(默認為0),我們使用索引為3的資料庫,避免和其他資料庫衝突 database: 0 host: 192.168.0.23 port: 6379 password: 123456 # redis連接超時時間(單位毫秒) timeout: 0 # redis連接池配置 jedis: pool: # 最大可用連接數(默認為8,負數表示無限) max-active: -1 # 最大空閑連接數(默認為8,負數表示無限) max-idle: 2000 # 最小空閑連接數(默認為0,該值只有為正數才有用) min-idle: 1 # 從連接池中獲取連接最大等待時間(默認為-1,單位為毫秒,負數表示無限) max-wait: -1 # 配置空閑連接回收間隔時間,min-idle才會生效,否則不生效 time-between-eviction-runs: 5000
RedisTemplate
@SpringBootTest class SpringRedisApplicationTests { // 注入 RedisTemplate @Autowired private RedisTemplate redisTemplate; // String 類型 @Test void testString () { redisTemplate.opsForValue().set("name", "ljx"); Object name = redisTemplate.opsForValue().get("name"); System.out.println(name); } // Hash 類型 @Test public void testHash () { redisTemplate.opsForHash().put("user1", "name", "clarence"); redisTemplate.opsForHash().put("user1", "age", "25"); Map map = redisTemplate.opsForHash().entries("user1"); System.out.println(map); } // List 類型 @Test public void testList () { redisTemplate.opsForList().leftPushAll("names", "xiaobai", "xiaohei", "xiaolan"); List<String> names = redisTemplate.opsForList().range("names", 0, 3); System.out.println(names); } // Set 類型 @Test public void testSet () { redisTemplate.opsForSet().add("set", "a", "b", "c"); Set<String> set = redisTemplate.opsForSet().members("set"); System.out.println(set); } // SortedSet 類型 @Test public void testSortedSet () { redisTemplate.opsForZSet().add("class", "xiaobai", 90); Set aClass = redisTemplate.opsForZSet().rangeByScore("class", 90, 100); System.out.println(aClass); Set<ZSetOperations.TypedTuple<String>> set = new HashSet<> (); set.add(new DefaultTypedTuple<> ("xiaohei", 88.0)); set.add(new DefaultTypedTuple<>("xiaohui", 94.0)); set.add(new DefaultTypedTuple<>("xiaolan", 84.0)); set.add(new DefaultTypedTuple<>("xiaolv", 82.0)); set.add(new DefaultTypedTuple<>("xiaohong", 99.0)); redisTemplate.opsForZSet().add("class", set); Set aClass1 = redisTemplate.opsForZSet().range("class", 0, 6); System.out.println(aClass1); } }
二、序列化
1、默認是 JdkSerializationRedisSerializer
- RedisTemplate 可以接收任意 Object 作為值寫入 Redis,不過在寫入前會把 Object 序列化為位元組形式,默認是採用 JDK 序列化,上面的 demo 運行後得到的結果如下:
- 缺點:可讀性差;記憶體佔用較大
2.1、添加配置文件,使用 String 序列化、Json 序列化
Redis 配置文件
@Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { // 創建 RedisTemplate 對象 RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); // 設置連接工廠 redisTemplate.setConnectionFactory(connectionFactory); // 設置 Key 的序列化 - String 序列化 RedisSerializer.string() => StringRedisSerializer.UTF_8 redisTemplate.setKeySerializer( RedisSerializer.string()); redisTemplate.setHashKeySerializer(RedisSerializer.string()); // 設置 Value 的序列化 - JSON 序列化 RedisSerializer.json() => GenericJackson2JsonRedisSerializer redisTemplate.setValueSerializer(RedisSerializer.json()); redisTemplate.setHashValueSerializer(RedisSerializer.json()); // 返回 return redisTemplate; } }
引入 Jackson 依賴
<!--Jackson依賴--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.4</version> </dependency>
添加 User 實體類
@Data @NoArgsConstructor @AllArgsConstructor public class User { private String name; private Integer age; }
案例
@SpringBootTest public class RedisDemoApplicationTest2 { // 注入 RedisTemplate @Autowired private RedisTemplate<String, Object> redisTemplate; @Test void testString() { redisTemplate.opsForValue().set("name", "小白"); Object name = redisTemplate.opsForValue().get("name"); System.out.println(name); } @Test void testSaveUser() { redisTemplate.opsForValue().set("user", new User("小白", 23)); User user = (User) redisTemplate.opsForValue().get("user"); System.out.println(user); } }
運行結果

StringRedisTemplate
- 從上述 Demo 的運行結果可以看到,為了在反序列化時知道對象的類型,Json 序列化會將類的 class 類型寫入 json 結果中存入 Redis,會帶來額外的記憶體開銷
- 為了節省記憶體空間,我們並不會使用 json 序列化器來處理 value,而是統一使用 String 序列化器,要求只能存儲 String 類型的 key 和 value。當需要存儲 Java 對象時,手動完成對象的序列化和反序列化
- spring 提供了一個 StringRedisTemplate 類,其 key 和 value 的序列化方式默認為 String 方式
引入 fastjson 依賴
<!--fastjson依賴--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency>
案例
@SpringBootTest public class RedisDemoApplicationTest2 { // 注入 RedisTemplate @Autowired private RedisTemplate<String, Object> redisTemplate; @Test void testString() { redisTemplate.opsForValue().set("name", "小白"); Object name = redisTemplate.opsForValue().get("name"); System.out.println(name); } @Test void testSaveUser() { redisTemplate.opsForValue().set("user", new User("小白", 23)); User user = (User) redisTemplate.opsForValue().get("user"); System.out.println(user); } }
三、redis的請求通訊協議
上面簡單的演示了下redis的操作,接下來首先詳細了解一下Redis Serialization Protocol(Redis序列化協議),這個是Redis提供的一種,客戶端和Redis服務端通訊傳輸的編碼協議,服務端收到後,會基於這個約定編碼進行解碼。首先打開Wireshark工具,對VMnet8這個網路進行抓包(沒有這工具可以自己下個),先在連接工具加一個假數據
-
打開Wireshark工具,對VMnet8這個網路進行抓包
-
增加過濾條件
ip.dst_host==ip and tcp.port in {6379}
使用RDM工具連接到Redis Server進行key-value操作,比如執行 set name ljx通過Wireshark工具監控數據包內容,可以通過上圖看到實際發出的數據包是:*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\nljx
其中開頭的*3這個數字中代表參數個數,我是set name ljx,所以表示三個參數;接著就是$3表示屬性長度,$表示包含了3個字元。客戶端和伺服器發送的命令或數據一律以 \r\n (CRLF回車+換行)結尾。了解了這格式的意思接下來自己實現一個java客戶端就非常容易了。
3.1、客戶端實現
在手寫之前先看下Jedis源碼是怎麼實現的手寫客戶端程式碼,在Jedis中就有的,先看一下Jedis內部的實現源碼:
protected Connection sendCommand(Protocol.Command cmd, byte[]... args) { try { this.connect();//建立Socket連接 Protocol.sendCommand(this.outputStream, cmd, args);//封裝報文並將報文寫入流中 ++this.pipelinedCommands; return this; } catch (JedisConnectionException var6) { JedisConnectionException ex = var6; try { String errorMessage = Protocol.readErrorLineIfPossible(this.inputStream); if (errorMessage != null && errorMessage.length() > 0) { ex = new JedisConnectionException(errorMessage, ex.getCause()); } } catch (Exception var5) { } this.broken = true; throw ex; } }
這段源碼並不難找,使用Jedis的set方法,然後一直跟進去就可以。最終方法的位置是redis.clients.jedis.Connection.sebdCommand()。
從這個方法的內部實現就可以看出來其實就是通過Socket建立tcp連接,然後將命令和數據轉換成RESP協議規範的報文格式,最後通過Socket將數據傳入過去。知道這些對於自己寫一個Jedis客戶端是不是就有思路啦。基於對源碼的借鑒,簡易的Jedis實現如下:
public class CustomJedis { public static void main(String[] args) throws IOException { //建立socket連接 Socket socket = new Socket(); InetSocketAddress socketAddress = new InetSocketAddress("106.12.75.86", 6379); socket.connect(socketAddress, 10000); //獲取scoket輸出流,將報文轉換成byte[]傳入流中 OutputStream outputStream = socket.getOutputStream(); outputStream.write(command()); //獲取返回的輸出流,並列印輸出數據 InputStream inputStream = socket.getInputStream(); byte[] buffer = new byte[1024]; inputStream.read(buffer); System.out.println("返回執行結果:" + new String(buffer)); } //組裝報文資訊 private static byte[] command() { return "*3\r\n$3\r\nSET\r\n$9\r\nuser:name\r\n$6\r\nitcrud\r\n".getBytes(); } }
但是這裡需要注意,上面的實現方式是直接建立socket連接,Redis很多時候是設置密碼認證的,如果這樣的話上面的程式碼就需要改動啦。
改動後如下:
public class CustomJedisProd { public static void main(String[] args) throws IOException { Socket socket = new Socket(); InetSocketAddress socketAddress = new InetSocketAddress("106.12.75.86", 6379); socket.connect(socketAddress, 10000); OutputStream outputStream = socket.getOutputStream(); //驗證密碼 outputStream.write(auth()); InputStream inputStream = socket.getInputStream(); byte[] buffer = new byte[1024]; inputStream.read(buffer); System.out.println("返回執行結果:" + new String(buffer)); //發送數據 outputStream.write(command()); inputStream.read(buffer); System.out.println("返回執行結果:" + new String(buffer)); inputStream.close(); outputStream.close(); } //驗證 private static byte[] auth(){ return "*2\r\n$4\r\nAUTH\r\n$6\r\n123456\r\n".getBytes(); } //組裝報文資訊 private static byte[] command() { return "*3\r\n$3\r\nSET\r\n$9\r\nuser:name\r\n$6\r\nitcrud\r\n".getBytes(); } }
運行結果
這樣一個最簡單版本就實現了,但是這裡面的編碼是寫死的,每次報問要自己組裝太麻煩,下面來進一步優化下:
定義常量池
public class CommandConstant { public static final String START="*"; public static final String LENGTH="$"; public static final String LINE="\r\n"; public enum CommandEnum{ SET, GET } }
CustomClientSocket用來建立網路通訊連接,並且發送數據指定到RedisServer。
public class CustomerRedisClientSocket { private Socket socket; private InputStream inputStream; private OutputStream outputStream; public CustomerRedisClientSocket(String ip,int port,String password){ try { socket=new Socket(ip,port); inputStream=socket.getInputStream(); outputStream=socket.getOutputStream(); outputStream.write ( password.getBytes ()); } catch (IOException e) { e.printStackTrace(); } } public void send(String cmd){ try { outputStream.write(cmd.getBytes()); } catch (IOException e) { e.printStackTrace(); } } public String read(){ byte[] bytes=new byte[1024]; int count=0; try { count=inputStream.read(bytes); } catch (IOException e) { e.printStackTrace(); } return new String(bytes,0,count); } }
封裝客戶端
public class CustomerRedisClient { private CustomerRedisClientSocket customerRedisClientSocket; public CustomerRedisClient(String host,int port,String password) { customerRedisClientSocket=new CustomerRedisClientSocket(host,port,password ("AUTH",password)); } public String password(String key,String value){ convertToCommand(null,key.getBytes(),value.getBytes()); return convertToCommand(null,key.getBytes(),value.getBytes()); } public String set(String key,String value){ customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.SET,key.getBytes(),value.getBytes())); return customerRedisClientSocket.read(); //在等待返回結果的時候,是阻塞的 } public String get(String key){ customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.GET,key.getBytes())); return customerRedisClientSocket.read(); } public static String convertToCommand(CommandConstant.CommandEnum commandEnum,byte[]... bytes){ StringBuilder stringBuilder=new StringBuilder(); if (commandEnum==null){ stringBuilder.append(CommandConstant.START).append(bytes.length).append(CommandConstant.LINE); }else{ stringBuilder.append(CommandConstant.START).append(bytes.length+1).append(CommandConstant.LINE); stringBuilder.append(CommandConstant.LENGTH).append(commandEnum.toString().length()).append(CommandConstant.LINE); stringBuilder.append(commandEnum.toString()).append(CommandConstant.LINE); } for (byte[] by:bytes){ stringBuilder.append(CommandConstant.LENGTH).append(by.length).append(CommandConstant.LINE); stringBuilder.append(new String(by)).append(CommandConstant.LINE); } return stringBuilder.toString(); } }
測試方法
public class MainClient { public static void main(String[] args) { CustomerRedisClient customerRedisClient=new CustomerRedisClient("124.71.33.75",6379,"ghy20200707redis"); System.out.println(customerRedisClient.set("name","ljx")); System.out.println(customerRedisClient.get("ljx")); } }
結果
所有事物理解了本質後,實現起來其實一點都不難,通過上面兩次優化,就實現了一個自己版本的客戶端,但是實際開發過程中,客戶端我們不用自己開發,官方推薦了以下三種客戶端
四、Reids的java客戶端
4.1、客戶端對比

Jedis api 在線網址://tool.oschina.net/uploads/apidocs/redis/clients/jedis/Jedis.html
redisson 官網地址://redisson.org/
redisson git項目地址://github.com/redisson/redisson
lettuce 官網地址://lettuce.io/
lettuce git項目地址://github.com/lettuce-io/lettuce-core
首先,在spring boot2之後,對redis連接的支援,默認就採用了lettuce。這就一定程度說明了lettuce 和Jedis的優劣。
4.2、各種客戶端對比
4.2.1、概念
- Jedis:是老牌的Redis的Java實現客戶端,提供了比較全面的Redis命令的支援,
- Redisson:實現了分散式和可擴展的Java數據結構。
- Lettuce:高級Redis客戶端,用於執行緒安全同步,非同步和響應使用,支援集群,Sentinel,管道和編碼器。
4.2.2、優點
- Jedis:比較全面的提供了Redis的操作特性
- Redisson:促使使用者對Redis的關注分離,提供很多分散式相關操作服務,例如,分散式鎖,分散式集合,可通過Redis支援延遲隊列
- Lettuce:基於Netty框架的事件驅動的通訊層,其方法調用是非同步的。Lettuce的API是執行緒安全的,所以可以操作單個Lettuce連接來完成各種操作
4.2.3、可伸縮
- Jedis:使用阻塞的I/O,且其方法調用都是同步的,程式流需要等到sockets處理完I/O才能執行,不支援非同步。Jedis客戶端實例不是執行緒安全的,所以需要通過連接池來使用Jedis。
- Redisson:基於Netty框架的事件驅動的通訊層,其方法調用是非同步的。Redisson的API是執行緒安全的,所以可以操作單個Redisson連接來完成各種操作
- Lettuce:基於Netty框架的事件驅動的通訊層,其方法調用是非同步的。Lettuce的API是執行緒安全的,所以可以操作單個Lettuce連接來完成各種操作
- lettuce能夠支援redis4,需要java8及以上。
- lettuce是基於netty實現的與redis進行同步和非同步的通訊。
4.2.4、lettuce和jedis比較
- jedis使直接連接redis server,如果在多執行緒環境下是非執行緒安全的,這個時候只有使用連接池,為每個jedis實例增加物理連接 ;
- lettuce的連接是基於Netty的,連接實例(StatefulRedisConnection)可以在多個執行緒間並發訪問,StatefulRedisConnection是執行緒安全的,所以一個連接實例可以滿足多執行緒環境下的並發訪問,當然這也是可伸縮的設計,一個連接實例不夠的情況也可以按需增加連接實例。
- Redisson實現了分散式和可擴展的Java數據結構,和Jedis相比,功能較為簡單,不支援字元串操作,不支援排序、事務、管道、分區等Redis特性。Redisson的宗旨是促進使用者對Redis的關注分離,從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.7.0</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency>
public class JedisTest { private Jedis jedis; @BeforeEach void setUp(){ // 1、建立連接 jedis = new Jedis("ip", 6379); // 2、設置密碼 jedis.auth("123456"); // 3、選擇庫 jedis.select(0); } @Test public void testString(){ // 存入數據 String result = jedis.set("name", "張三"); System.out.println("result = " + result); // 獲取數據 String name = jedis.get("name"); System.out.println(name); } @Test public void testHash(){ // 插入 hash 數據 jedis.hset("user:1", "name", "lisi"); jedis.hset("user:1", "age", "21"); // 獲取 Map<String, String> map = jedis.hgetAll("user:1"); System.out.println(map); } @AfterEach void closeJedis(){ if(jedis != null){ jedis.close(); } } }
4.2.6、集成Lettuce
引入pom
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
配置yml
#Redis配置 spring: redis: database: 6 #Redis索引0~15,默認為0 host: 127.0.0.1 port: 6379 password: #密碼(默認為空) lettuce: # 這裡標明使用lettuce配置 pool: max-active: 8 #連接池最大連接數(使用負值表示沒有限制) max-wait: -1ms #連接池最大阻塞等待時間(使用負值表示沒有限制) max-idle: 5 #連接池中的最大空閑連接 min-idle: 0 #連接池中的最小空閑連接 timeout: 10000ms #連接超時時間(毫秒)
添加Redisson的配置參數讀取類RedisConfig
@Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { /** * RedisTemplate配置 * @param connectionFactory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory connectionFactory) { // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(connectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer ());//key序列化 redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer ());//value序列化 redisTemplate.afterPropertiesSet(); return redisTemplate; } }
編寫測試類RedisTest
@SpringBootTest class SpringRedisApplicationTests { // 注入 RedisTemplate @Autowired private RedisTemplate redisTemplate; // String 類型 @Test void testString () { redisTemplate.opsForValue().set("name", "ljx"); Object name = redisTemplate.opsForValue().get("name"); System.out.println(name); } // Hash 類型 @Test public void testHash () { redisTemplate.opsForHash().put("user1", "name", "clarence"); redisTemplate.opsForHash().put("user1", "age", "25"); Map map = redisTemplate.opsForHash().entries("user1"); System.out.println(map); } // List 類型 @Test public void testList () { redisTemplate.opsForList().leftPushAll("names", "xiaobai", "xiaohei", "xiaolan"); List<String> names = redisTemplate.opsForList().range("names", 0, 3); System.out.println(names); } // Set 類型 @Test public void testSet () { redisTemplate.opsForSet().add("set", "a", "b", "c"); Set<String> set = redisTemplate.opsForSet().members("set"); System.out.println(set); } // SortedSet 類型 @Test public void testSortedSet () { redisTemplate.opsForZSet().add("class", "xiaobai", 90); Set aClass = redisTemplate.opsForZSet().rangeByScore("class", 90, 100); System.out.println(aClass); Set<ZSetOperations.TypedTuple<String>> set = new HashSet<> (); set.add(new DefaultTypedTuple<> ("xiaohei", 88.0)); set.add(new DefaultTypedTuple<>("xiaohui", 94.0)); set.add(new DefaultTypedTuple<>("xiaolan", 84.0)); set.add(new DefaultTypedTuple<>("xiaolv", 82.0)); set.add(new DefaultTypedTuple<>("xiaohong", 99.0)); redisTemplate.opsForZSet().add("class", set); Set aClass1 = redisTemplate.opsForZSet().range("class", 0, 6); System.out.println(aClass1); } }
4.2.7、集成Redisson
引入pom
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.16.0</version> </dependency>
yml文件我就不寫了,和上面一樣,下面簡單寫下測試類
@Configuration public class RedissonConfig { @Autowired private RedisProperties redisProperties; @Bean public RedissonClient redissonClient() { Config config = new Config(); String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + ""); config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword()); config.useSingleServer().setDatabase(3); return Redisson.create(config); } }
@RestController @RequestMapping("/redisson") public class RedissonController { @Autowired private StringRedisTemplate stringRedisTemplate; @GetMapping("/save") public String save(){ stringRedisTemplate.opsForValue().set("key","redisson"); return "save ok"; } @GetMapping("/get") public String get(){ return stringRedisTemplate.opsForValue().get("key"); } }