SpringBoot集成Redis Stream:从基础配置到消费组实战

张开发
2026/4/21 15:19:59 15 分钟阅读

分享文章

SpringBoot集成Redis Stream:从基础配置到消费组实战
1. 为什么需要Redis StreamRedis Stream是Redis 5.0引入的一种新的数据结构它专门为消息队列场景设计。相比传统的List、Pub/Sub等方案Stream提供了更强大的功能消息持久化不像Pub/Sub那样消息发送后就消失消费组支持多个消费者可以组成消费组实现消息的负载均衡消息回溯可以重新消费历史消息ACK机制确保消息被正确处理我在实际项目中遇到过这样的场景需要处理大量设备上报的状态数据既要保证数据不丢失又要能并行处理提高吞吐量。传统的RabbitMQ方案部署维护成本较高而Redis Stream完美解决了这个问题。2. 环境准备与基础配置2.1 引入必要依赖首先在pom.xml中添加Spring Data Redis和连接池依赖dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-data-redis/artifactId /dependency dependency groupIdorg.apache.commons/groupId artifactIdcommons-pool2/artifactId /dependency2.2 配置Redis连接在application.yml中配置Redis连接信息spring: redis: host: 127.0.0.1 port: 6379 database: 0 timeout: 15000 lettuce: pool: max-idle: 50 min-idle: 10 max-active: 300 max-wait: -12.3 配置RedisTemplate创建一个配置类来定制RedisTemplateConfiguration public class RedisConfig { Bean public RedisTemplateString, Object redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplateString, Object template new RedisTemplate(); template.setConnectionFactory(connectionFactory); // Key序列化 template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); // Value序列化 template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); return template; } }这里我推荐使用Jackson序列化而不是JdkSerializationRedisSerializer因为后者会产生不可读的二进制数据调试起来很不方便。3. 消息生产与消费基础3.1 发送简单消息我们先从最简单的字符串消息开始Autowired private RedisTemplateString, Object redisTemplate; public void sendSimpleMessage() { MapString, String message new HashMap(); message.put(content, Hello Redis Stream); StringRecord record StreamRecords.string(message) .withStreamKey(mystream) .withId(RecordId.autoGenerate()); RecordId recordId redisTemplate.opsForStream().add(record); log.info(消息发送成功ID: {}, recordId.getValue()); }3.2 发送复杂对象实际项目中我们经常需要发送自定义对象Data public class OrderEvent { private String orderId; private BigDecimal amount; private LocalDateTime createTime; } public void sendObjectMessage() { OrderEvent event new OrderEvent(); event.setOrderId(UUID.randomUUID().toString()); event.setAmount(new BigDecimal(99.99)); event.setCreateTime(LocalDateTime.now()); ObjectRecordString, OrderEvent record StreamRecords.newRecord() .in(order_stream) .ofObject(event) .withId(RecordId.autoGenerate()); redisTemplate.opsForStream().add(record); }3.3 简单消费消息对于不需要消费组的简单场景可以直接读取消息public ListMapRecordString, String, String readMessages() { return redisTemplate.opsForStream() .read(StreamOffset.fromStart(mystream)); }4. 消费组实战4.1 创建消费组消费组是Redis Stream的核心特性我们先创建消费组public void createConsumerGroup(String streamKey, String groupName) { try { redisTemplate.opsForStream().createGroup(streamKey, groupName); } catch (RedisSystemException e) { if (e.getCause() instanceof RedisBusyException) { log.info(消费组已存在: {}, groupName); } else { throw e; } } }这里有个坑要注意如果消费组已存在会抛出RedisBusyException需要捕获处理。4.2 消费组消息处理配置消费组监听容器Configuration public class StreamConfig { Autowired private RedisConnectionFactory redisConnectionFactory; Bean public StreamMessageListenerContainerString, MapRecordString, String, String container() { StreamMessageListenerContainer.StreamMessageListenerContainerOptionsString, MapRecordString, String, String options StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .batchSize(10) .serializer(new StringRedisSerializer()) .build(); return StreamMessageListenerContainer.create(redisConnectionFactory, options); } Bean public Subscription subscription(StreamMessageListenerContainerString, MapRecordString, String, String container) { return container.receive( Consumer.from(mygroup, consumer1), StreamOffset.create(mystream, ReadOffset.lastConsumed()), new MessageListener()); } }实现消息监听器public class MessageListener implements StreamListenerString, MapRecordString, String, String { Autowired private RedisTemplateString, Object redisTemplate; Override public void onMessage(MapRecordString, String, String message) { try { // 业务处理 processMessage(message); // 确认消息 redisTemplate.opsForStream().acknowledge(mygroup, message); } catch (Exception e) { log.error(消息处理失败: {}, message, e); } } }4.3 处理未ACK的消息在实际运行中可能会因为各种原因导致消息未被确认。我们需要定期检查并处理这些消息Scheduled(fixedRate 60000) public void handlePendingMessages() { PendingMessages pending redisTemplate.opsForStream() .pending(mystream, mygroup, Range.unbounded(), 100); pending.forEach(message - { // 重新处理消息 MapRecordString, String, String record redisTemplate.opsForStream() .range(mystream, Range.closed(message.getIdAsString(), message.getIdAsString())) .get(0); // 再次尝试处理 try { processMessage(record); redisTemplate.opsForStream().acknowledge(mygroup, record); } catch (Exception e) { log.error(重试处理失败: {}, record, e); } }); }5. 生产环境注意事项5.1 性能优化建议批量操作尽量使用批量读取和确认合理设置pollTimeout根据业务特点调整连接池配置根据并发量调整max-active等参数序列化选择评估性能和可读性的平衡5.2 常见问题排查消息堆积检查消费者处理速度考虑增加消费者重复消费确保业务逻辑幂等连接泄漏检查是否正确关闭连接序列化错误确保生产者和消费者使用相同的序列化方式5.3 监控指标建议监控以下关键指标消息生产速率消息消费速率未ACK消息数量消费组延迟6. 完整示例项目结构一个典型的项目结构如下src/main/java ├── config │ ├── RedisConfig.java │ └── StreamConfig.java ├── controller │ └── MessageController.java ├── listener │ └── OrderMessageListener.java ├── model │ └── OrderEvent.java └── service ├── MessageProducer.java └── StreamMonitor.java在实际项目中我通常会这样组织代码将Redis配置放在config包消息监听器按业务领域划分生产者和监控服务单独封装使用DTO对象传递消息7. 高级特性探索7.1 消息截断Redis Stream支持消息截断可以控制流的大小public void trimStream(String streamKey, long maxLength) { redisTemplate.opsForStream().trim(streamKey, maxLength); }7.2 消费者组信息查询可以查询消费组和消费者的状态public void printGroupInfo(String streamKey) { StreamInfo.XInfoGroups groups redisTemplate.opsForStream().groups(streamKey); groups.forEach(group - { log.info(Group: {}, Consumers: {}, Pending: {}, group.groupName(), group.consumerCount(), group.pendingCount()); }); }7.3 消费者负载均衡多个消费者可以自动实现负载均衡Bean public Subscription subscription2(StreamMessageListenerContainerString, MapRecordString, String, String container) { return container.receive( Consumer.from(mygroup, consumer2), StreamOffset.create(mystream, ReadOffset.lastConsumed()), new MessageListener()); }8. 与Kafka的对比在消息中间件选型时Redis Stream经常被拿来与Kafka比较部署复杂度Redis更简单功能丰富度Kafka更专业性能Redis在低延迟场景表现更好持久化Kafka的持久化能力更强根据我的经验对于中小型项目特别是已经使用Redis的项目Redis Stream是一个很好的轻量级选择。但对于需要严格保证消息顺序、持久化和高吞吐的场景还是应该考虑Kafka。

更多文章