From 361e50e5de3abd4551e05ba974ffc3c27d26c2c4 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Wed, 30 Apr 2025 19:41:24 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9ARedis=20Stream=20=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E6=B8=85=E7=90=86=20Job=EF=BC=8C=E9=81=BF=E5=85=8D?= =?UTF-8?q?=E5=8D=A0=E7=94=A8=E5=86=85=E5=AD=98=E8=BF=87=E5=A4=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...YudaoRedisMQConsumerAutoConfiguration.java | 12 ++++ .../job/RedisPendingMessageResendJob.java | 6 +- .../job/RedisStreamMessageCleanupJob.java | 72 +++++++++++++++++++ 3 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisStreamMessageCleanupJob.java diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java index d02e84b146..c9ab3e5415 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java @@ -6,6 +6,7 @@ import cn.hutool.system.SystemUtil; import cn.iocoder.yudao.framework.common.enums.DocumentEnum; import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob; +import cn.iocoder.yudao.framework.mq.redis.core.job.RedisStreamMessageCleanupJob; import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; @@ -73,6 +74,17 @@ public class YudaoRedisMQConsumerAutoConfiguration { return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient); } + /** + * 创建 Redis Stream 消息清理任务 + */ + @Bean + @ConditionalOnBean(AbstractRedisStreamMessageListener.class) + public RedisStreamMessageCleanupJob redisStreamMessageCleanupJob(List> listeners, + RedisMQTemplate redisTemplate, + RedissonClient redissonClient) { + return new RedisStreamMessageCleanupJob(listeners, redisTemplate, redissonClient); + } + /** * 创建 Redis Stream 集群消费的容器 * diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java index b84f17c152..cb4e3991f1 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java @@ -23,13 +23,13 @@ import java.util.Objects; @AllArgsConstructor public class RedisPendingMessageResendJob { - private static final String LOCK_KEY = "redis:pending:msg:lock"; + private static final String LOCK_KEY = "redis:stream:pending-message-resend:lock"; /** * 消息超时时间,默认 5 分钟 * * 1. 超时的消息才会被重新投递 - * 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到 + * 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息 5 分钟过期后,再等 1 分钟才会被扫瞄到 */ private static final int EXPIRE_TIME = 5 * 60; @@ -39,7 +39,7 @@ public class RedisPendingMessageResendJob { private final RedissonClient redissonClient; /** - * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题 + * 一分钟执行一次,这里选择每分钟的 35 秒执行,是为了避免整点任务过多的问题 */ @Scheduled(cron = "35 * * * * ?") public void messageResend() { diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisStreamMessageCleanupJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisStreamMessageCleanupJob.java new file mode 100644 index 0000000000..19da84594e --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisStreamMessageCleanupJob.java @@ -0,0 +1,72 @@ +package cn.iocoder.yudao.framework.mq.redis.core.job; + +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.data.redis.core.StreamOperations; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.List; + +/** + * Redis Stream 消息清理任务 + * 用于定期清理已消费的消息,防止内存占用过大 + * + * @see 记一次 redis stream 数据类型内存不释放问题 + * + * @author 芋道源码 + */ +@Slf4j +@AllArgsConstructor +public class RedisStreamMessageCleanupJob { + + private static final String LOCK_KEY = "redis:stream:message-cleanup:lock"; + + /** + * 保留的消息数量,默认保留最近 10000 条消息 + */ + private static final long MAX_COUNT = 10000; + + private final List> listeners; + private final RedisMQTemplate redisTemplate; + private final RedissonClient redissonClient; + + /** + * 每小时执行一次清理任务 + */ + @Scheduled(cron = "0 0 * * * ?") + public void cleanup() { + RLock lock = redissonClient.getLock(LOCK_KEY); + // 尝试加锁 + if (lock.tryLock()) { + try { + execute(); + } catch (Exception ex) { + log.error("[cleanup][执行异常]", ex); + } finally { + lock.unlock(); + } + } + } + + /** + * 执行清理逻辑 + */ + private void execute() { + StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); + listeners.forEach(listener -> { + try { + // 使用 XTRIM 命令清理消息,只保留最近的 MAX_LEN 条消息 + Long trimCount = ops.trim(listener.getStreamKey(), MAX_COUNT, true); + if (trimCount != null && trimCount > 0) { + log.info("[execute][Stream({}) 清理消息数量({})]", listener.getStreamKey(), trimCount); + } + } catch (Exception ex) { + log.error("[execute][Stream({}) 清理异常]", listener.getStreamKey(), ex); + } + }); + } +} \ No newline at end of file