Spring Data Redis Stream的使用

一、背景

Stream類型是 redis5之後新增的類型,在這篇文章中,我們實現使用Spring boot data redis來消費Redis Stream中的數據。實現獨立消費和消費組消費。

二、整合步驟

1、引入jar包

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
  </dependency>
</dependencies>

主要是上方的這個包,其他的不相關的包此處省略導入。

2、配置RedisTemplate依賴

@Configuration
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 這個地方不可使用 json 序列化,如果使用的是ObjectRecord傳輸對象時,可能會有問題,會出現一個 java.lang.IllegalArgumentException: Value must not be null! 錯誤
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        return redisTemplate;
    }
}

注意:

此處需要注意 setHashValueSerializer 的序列化的方式,具體注意事項後期再說。

3、準備一個實體對象

這個實體對象是需要發送到Stream中的對象。

@Getter
@Setter
@ToString
public class Book {
    private String title;
    private String author;
    
    public static Book create() {
        com.github.javafaker.Book fakerBook = Faker.instance().book();
        Book book = new Book();
        book.setTitle(fakerBook.title());
        book.setAuthor(fakerBook.author());
        return book;
    }
}

每次調用create方法時,會自動產生一個Book的對象,對象模擬數據是使用javafaker來模擬生成的。

4、編寫一個常量類,配置Stream的名稱

/**
 * 常量
 *
 */
public class Cosntants {
    
    public static final String STREAM_KEY_001 = "stream-001";
    
}

5、編寫一個生產者,向Stream中生產數據

1、編寫一個生產者,向Stream中產生ObjectRecord類型的數據

/**
 * 消息生產者
 
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class StreamProducer {
    
    private final RedisTemplate<String, Object> redisTemplate;
    
    public void sendRecord(String streamKey) {
        Book book = Book.create();
        log.info("產生一本書的資訊:[{}]", book);
        
        ObjectRecord<String, Book> record = StreamRecords.newRecord()
                .in(streamKey)
                .ofObject(book)
                .withId(RecordId.autoGenerate());
        
        RecordId recordId = redisTemplate.opsForStream()
                .add(record);
        
        log.info("返回的record-id:[{}]", recordId);
    }
}

2、每隔5s就生產一個數據到Stream中

/**
 * 周期性的向流中產生消息
 */
@Component
@AllArgsConstructor
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
    
    private final StreamProducer streamProducer;
    
    @Override
    public void run(ApplicationArguments args) {
        Executors.newSingleThreadScheduledExecutor()
                .scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
                        0, 5, TimeUnit.SECONDS);
    }
}

三、獨立消費

獨立消費指的是脫離消費組的直接消費Stream中的消息,是使用 xread方法讀取流中的數據,流中的數據在讀取後並不會被刪除,還是存在的。如果多個程式同時使用xread讀取,都是可以讀取到消息的。

1、實現從頭開始消費-xread實現

此處實現的是從Stream的第一個消息開始消費

package com.huan.study.redis.stream.consumer.xread;

import com.huan.study.redis.constan.Cosntants;
import com.huan.study.redis.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 脫離消費組-直接消費Stream中的數據,可以獲取到Stream中所有的消息
 */
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {
    
    private ThreadPoolExecutor threadPoolExecutor;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    
    private volatile boolean stop = false;
    
    @Override
    public void afterPropertiesSet() {
        
        // 初始化執行緒池
        threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("xread-nonblock-01");
            return thread;
        });
        
        StreamReadOptions streamReadOptions = StreamReadOptions.empty()
                // 如果沒有數據,則阻塞1s 阻塞時間需要小於`spring.redis.timeout`配置的時間
                .block(Duration.ofMillis(1000))
                // 一直阻塞直到獲取數據,可能會報超時異常
                // .block(Duration.ofMillis(0))
                // 1次獲取10個數據
                .count(10);
        
        StringBuilder readOffset = new StringBuilder("0-0");
        threadPoolExecutor.execute(() -> {
            while (!stop) {
                // 使用xread讀取數據時,需要記錄下最後一次讀取到offset,然後當作下次讀取的offset,否則讀取出來的數據會有問題
                List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
                        .read(Book.class, streamReadOptions, StreamOffset.create(Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
                if (CollectionUtils.isEmpty(objectRecords)) {
                    log.warn("沒有獲取到數據");
                    continue;
                }
                for (ObjectRecord<String, Book> objectRecord : objectRecords) {
                    log.info("獲取到的數據資訊 id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
                    readOffset.setLength(0);
                    readOffset.append(objectRecord.getId());
                }
            }
        });
    }
    
    @Override
    public void destroy() throws Exception {
        stop = true;
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
    }
}

注意:

下一次讀取數據時,offset 是上一次最後獲取到的id的值,否則可能會出現漏數據。

2、StreamMessageListenerContainer實現獨立消費

見下方的消費組消費的程式碼

四、消費組消費

1、實現StreamListener介面

實現這個介面的目的是為了,消費Stream中的數據。需要注意在註冊時使用的是streamMessageListenerContainer.receiveAutoAck()還是streamMessageListenerContainer.receive()方法,如果是第二個,則需要手動ack,手動ack的程式碼:redisTemplate.opsForStream().acknowledge("key","group","recordId");

/**
 * 通過監聽器非同步消費
 *
 * @author huan.fu 2021/11/10 - 下午5:51
 */
@Slf4j
@Getter
@Setter
public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Book>> {
    /**
     * 消費者類型:獨立消費、消費組消費
     */
    private String consumerType;
    /**
     * 消費組
     */
    private String group;
    /**
     * 消費組中的某個消費者
     */
    private String consumerName;
    
    public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
        this.consumerType = consumerType;
        this.group = group;
        this.consumerName = consumerName;
    }
    
    private RedisTemplate<String, Object> redisTemplate;
    
    @Override
    public void onMessage(ObjectRecord<String, Book> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        Book value = message.getValue();
        if (StringUtils.isBlank(group)) {
            log.info("[{}]: 接收到一個消息 stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value);
        } else {
            log.info("[{}] group:[{}] consumerName:[{}] 接收到一個消息 stream:[{}],id:[{}],value:[{}]", consumerType,
                    group, consumerName, stream, id, value);
        }
        
        // 當是消費組消費時,如果不是自動ack,則需要在這個地方手動ack
        // redisTemplate.opsForStream()
        //         .acknowledge("key","group","recordId");
    }
}

2、獲取消費或消費消息過程中錯誤的處理

/**
 * StreamPollTask 獲取消息或對應的listener消費消息過程中發生了異常
 *
 * @author huan.fu 2021/11/11 - 下午3:44
 */
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        log.error("發生了異常", t);
    }
}

3、消費組配置

/**
 * redis stream 消費組配置
 *
 * @author huan.fu 2021/11/11 - 下午12:22
 */
@Configuration
public class RedisStreamConfiguration {
    
    @Resource
    private RedisConnectionFactory redisConnectionFactory;
    
    /**
     * 可以同時支援 獨立消費 和 消費者組 消費
     * <p>
     * 可以支援動態的 增加和刪除 消費者
     * <p>
     * 消費組需要預先創建出來
     *
     * @return StreamMessageListenerContainer
     */
    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多獲取多少條消息
                        .batchSize(10)
                        // 運行 Stream 的 poll task
                        .executor(executor)
                        // 可以理解為 Stream Key 的序列化方式
                        .keySerializer(RedisSerializer.string())
                        // 可以理解為 Stream 後方的欄位的 key 的序列化方式
                        .hashKeySerializer(RedisSerializer.string())
                        // 可以理解為 Stream 後方的欄位的 value 的序列化方式
                        .hashValueSerializer(RedisSerializer.string())
                        // Stream 中沒有消息時,阻塞多長時間,需要比 `spring.redis.timeout` 的時間小
                        .pollTimeout(Duration.ofSeconds(1))
                        // ObjectRecord 時,將 對象的 filed 和 value 轉換成一個 Map 比如:將Book對象轉換成map
                        .objectMapper(new ObjectHashMapper())
                        // 獲取消息的過程或獲取到消息給具體的消息者處理的過程中,發生了異常的處理
                        .errorHandler(new CustomErrorHandler())
                        // 將發送到Stream中的Record轉換成ObjectRecord,轉換成具體的類型是這個地方指定的類型
                        .targetType(Book.class)
                        .build();
        
        StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);
        
        // 獨立消費
        String streamKey = Cosntants.STREAM_KEY_001;
        streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
                new AsyncConsumeStreamListener("獨立消費", null, null));
        
        // 消費組A,不自動ack
        // 從消費組中沒有分配給消費者的消息開始消費
        streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消費組消費", "group-a", "consumer-a"));
        // 從消費組中沒有分配給消費者的消息開始消費
        streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消費組消費A", "group-a", "consumer-b"));
        
        // 消費組B,自動ack
        streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-b", "consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消費組消費B", "group-b", "consumer-bb"));
        
        // 如果需要對某個消費者進行個性化配置在調用register方法的時候傳遞`StreamReadRequest`對象
        
        return streamMessageListenerContainer;
    }
}

注意:

提前建立好消費組

127.0.0.1:6379> xgroup create stream-001 group-a $
OK
127.0.0.1:6379> xgroup create stream-001 group-b $
OK

1、獨有消費配置

 streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey), new AsyncConsumeStreamListener("獨立消費", null, null));

不傳遞Consumer即可。

2、配置消費組-不自動ack消息

streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消費組消費A", "group-a", "consumer-b"));

1、需要注意ReadOffset的取值。

2、需要注意group需要提前創建好。

3、配置消費組-自動ack消息

streamMessageListenerContainer.receiveAutoAck()

五、序列化策略

Stream Property Serializer Description
key keySerializer used for Record#getStream()
field hashKeySerializer used for each map key in the payload
value hashValueSerializer used for each map value in the payload

六、ReadOffset策略

消費消息時的Read Offset 策略

ReadOffset策略

Read offset Standalone Consumer Group
Latest Read latest message(讀取最新的消息) Read latest message(讀取最新的消息)
Specific Message Id Use last seen message as the next MessageId
(讀取大於指定的消息id的消息)
Use last seen message as the next MessageId
(讀取大於指定的消息id的消息)
Last Consumed Use last seen message as the next MessageId
(讀取大於指定的消息id的消息)
Last consumed message as per consumer group
(讀取還未分配給消費組中的消費組的消息)

七、注意事項

1、讀取消息的超時時間

當我們使用 StreamReadOptions.empty().block(Duration.ofMillis(1000)) 配置阻塞時間時,這個配置的阻塞時間必須要比 spring.redis.timeout配置的時間短,否則可能會報超時異常。

2、ObjectRecord反序列化錯誤

如果我們在讀取消息時發生如下異常,那麼排查思路如下:

java.lang.IllegalArgumentException: Value must not be null!
	at org.springframework.util.Assert.notNull(Assert.java:201)
	at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
	at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
	at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138)
	at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164)
	at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594)
	at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413)
	at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.java:61)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

1、檢測 RedisTemplateHashValueSerializer的序列化方式,最好不要使用json可以使用RedisSerializer.string()

2、檢查redisTemplate.opsForStream()中配置的HashMapper,默認是ObjectHashMapper這個是把對象欄位和值序列化成byte[]格式。

提供一個可用的配置

# RedisTemplate的hash value 使用string類型的序列化方式
redisTemplate.setHashValueSerializer(RedisSerializer.string());
# 這個方法opsForStream()裡面使用默認的ObjectHashMapper
redisTemplate.opsForStream()

3、使用xread順序讀取數據漏數據

如果我們使用xread讀取數據發現有寫數據漏掉了,這個時候我們需要檢查第二次讀取時配置的StreamOffset是否合法,這個值需要是上一次讀取的最後一個值。

舉例說明:

1、SteamOffset傳遞的是 $ 表示讀取最新的一個數據。

2、處理上一步讀取到的數據,此時另外的生產者又向Stream中插入了幾個數據,這個時候讀取到的數據還沒有處理完。

3、再次讀取Stream中的數據,還是傳遞的$,那麼表示還是讀取最新的數據。那麼在上一步流入到Stream中的數據,這個消費者就讀取不到了,因為它讀取的是最新的數據。

4、StreamMessageListenerContainer的使用

1、可以動態的添加和刪除消費者

2、可以進行消費組消費

3、可以直接獨立消費

4、如果傳輸ObjectRecord的時候,需要注意一下序列化方式。參考上面的程式碼。

八、完整程式碼

//gitee.com/huan1993/spring-cloud-parent/tree/master/redis/redis-stream

九、參考文檔

1、//docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams