diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java index d9d7334e06..15edae5f4a 100644 --- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java @@ -19,6 +19,7 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_ * * @author 芋道源码 */ +@Deprecated public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook { @Override diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java index 7f12ac5205..fbc2cc0fda 100644 --- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.spring.core.RocketMQTemplate; @@ -20,7 +19,7 @@ public class TenantRocketMQInitializer implements BeanPostProcessor { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof DefaultRocketMQListenerContainer) { DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; - initTenantConsumer(container.getConsumer()); + initTenantConsumer(container); } else if (bean instanceof RocketMQTemplate) { RocketMQTemplate template = (RocketMQTemplate) bean; initTenantProducer(template.getProducer()); @@ -39,15 +38,22 @@ public class TenantRocketMQInitializer implements BeanPostProcessor { producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook()); } - private void initTenantConsumer(DefaultMQPushConsumer consumer) { + private void initTenantConsumer(DefaultRocketMQListenerContainer container) { + DefaultMQPushConsumer consumer = container.getConsumer(); if (consumer == null) { return; } - DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl(); - if (consumerImpl == null) { - return; + + switch (container.getConsumeMode()) { + case ORDERLY: + consumer.setMessageListener(new TenantRocketMQListenerOrderly(container)); + break; + case CONCURRENTLY: + consumer.setMessageListener(new TenantRocketMQListenerConcurrently(container)); + break; + default: + throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } - consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook()); } } \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQListenerConcurrently.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQListenerConcurrently.java new file mode 100644 index 0000000000..88be0052d3 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQListenerConcurrently.java @@ -0,0 +1,60 @@ +package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; + +import java.util.List; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * RocketMQ 消息队列的多租户 {@link MessageListenerConcurrently} 实现类,支持批量消费消息的租户上下文设置 + * 默认实现 {@link DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently} + * + * @author changhao.ni + */ +@Slf4j +public class TenantRocketMQListenerConcurrently implements MessageListenerConcurrently { + + private final DefaultRocketMQListenerContainer container; + + TenantRocketMQListenerConcurrently(DefaultRocketMQListenerContainer container) { + this.container = container; + } + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + for (MessageExt messageExt : msgs) { + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + + // 设置租户编号 + String tenantId = messageExt.getUserProperty(HEADER_TENANT_ID); + if (StrUtil.isNotEmpty(tenantId)) { + TenantContextHolder.setTenantId(Long.parseLong(tenantId)); + } + + container.handleMessage(messageExt); + + long costTime = System.currentTimeMillis() - now; + log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } catch (Exception e) { + log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); + context.setDelayLevelWhenNextConsume(container.getDelayLevelWhenNextConsume()); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } finally { + // 清理租户编号 + TenantContextHolder.clear(); + } + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } +} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQListenerOrderly.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQListenerOrderly.java new file mode 100644 index 0000000000..7f3c9c728a --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQListenerOrderly.java @@ -0,0 +1,60 @@ +package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; + +import java.util.List; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * RocketMQ 消息队列的多租户 {@link MessageListenerOrderly} 实现类,支持批量消费消息的租户上下文设置 + * 默认实现 {@link DefaultRocketMQListenerContainer.DefaultMessageListenerOrderly} + * + * @author changhao.ni + */ +@Slf4j +public class TenantRocketMQListenerOrderly implements MessageListenerOrderly { + + private final DefaultRocketMQListenerContainer container; + + TenantRocketMQListenerOrderly(DefaultRocketMQListenerContainer container) { + this.container = container; + } + + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + for (MessageExt messageExt : msgs) { + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + + // 设置租户编号 + String tenantId = messageExt.getUserProperty(HEADER_TENANT_ID); + if (StrUtil.isNotEmpty(tenantId)) { + TenantContextHolder.setTenantId(Long.parseLong(tenantId)); + } + + container.handleMessage(messageExt); + + long costTime = System.currentTimeMillis() - now; + log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } catch (Exception e) { + log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); + context.setSuspendCurrentQueueTimeMillis(container.getSuspendCurrentQueueTimeMillis()); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } finally { + // 清理租户编号 + TenantContextHolder.clear(); + } + } + + return ConsumeOrderlyStatus.SUCCESS; + } +}