Pre Merge pull request !983 from changhao.ni/master-jdk17
This commit is contained in:
commit
33b1c53eac
|
@ -19,6 +19,7 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_
|
|||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Deprecated
|
||||
public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
|
||||
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
|
@ -20,7 +19,7 @@ public class TenantRocketMQInitializer implements BeanPostProcessor {
|
|||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (bean instanceof DefaultRocketMQListenerContainer) {
|
||||
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
|
||||
initTenantConsumer(container.getConsumer());
|
||||
initTenantConsumer(container);
|
||||
} else if (bean instanceof RocketMQTemplate) {
|
||||
RocketMQTemplate template = (RocketMQTemplate) bean;
|
||||
initTenantProducer(template.getProducer());
|
||||
|
@ -39,15 +38,22 @@ public class TenantRocketMQInitializer implements BeanPostProcessor {
|
|||
producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
|
||||
}
|
||||
|
||||
private void initTenantConsumer(DefaultMQPushConsumer consumer) {
|
||||
private void initTenantConsumer(DefaultRocketMQListenerContainer container) {
|
||||
DefaultMQPushConsumer consumer = container.getConsumer();
|
||||
if (consumer == null) {
|
||||
return;
|
||||
}
|
||||
DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
|
||||
if (consumerImpl == null) {
|
||||
return;
|
||||
|
||||
switch (container.getConsumeMode()) {
|
||||
case ORDERLY:
|
||||
consumer.setMessageListener(new TenantRocketMQListenerOrderly(container));
|
||||
break;
|
||||
case CONCURRENTLY:
|
||||
consumer.setMessageListener(new TenantRocketMQListenerConcurrently(container));
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
|
||||
}
|
||||
consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||
|
||||
/**
|
||||
* RocketMQ 消息队列的多租户 {@link MessageListenerConcurrently} 实现类,支持批量消费消息的租户上下文设置
|
||||
* 默认实现 {@link DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently}
|
||||
*
|
||||
* @author changhao.ni
|
||||
*/
|
||||
@Slf4j
|
||||
public class TenantRocketMQListenerConcurrently implements MessageListenerConcurrently {
|
||||
|
||||
private final DefaultRocketMQListenerContainer container;
|
||||
|
||||
TenantRocketMQListenerConcurrently(DefaultRocketMQListenerContainer container) {
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
|
||||
for (MessageExt messageExt : msgs) {
|
||||
log.debug("received msg: {}", messageExt);
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
// 设置租户编号
|
||||
String tenantId = messageExt.getUserProperty(HEADER_TENANT_ID);
|
||||
if (StrUtil.isNotEmpty(tenantId)) {
|
||||
TenantContextHolder.setTenantId(Long.parseLong(tenantId));
|
||||
}
|
||||
|
||||
container.handleMessage(messageExt);
|
||||
|
||||
long costTime = System.currentTimeMillis() - now;
|
||||
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
|
||||
} catch (Exception e) {
|
||||
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
|
||||
context.setDelayLevelWhenNextConsume(container.getDelayLevelWhenNextConsume());
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||
} finally {
|
||||
// 清理租户编号
|
||||
TenantContextHolder.clear();
|
||||
}
|
||||
}
|
||||
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||
|
||||
/**
|
||||
* RocketMQ 消息队列的多租户 {@link MessageListenerOrderly} 实现类,支持批量消费消息的租户上下文设置
|
||||
* 默认实现 {@link DefaultRocketMQListenerContainer.DefaultMessageListenerOrderly}
|
||||
*
|
||||
* @author changhao.ni
|
||||
*/
|
||||
@Slf4j
|
||||
public class TenantRocketMQListenerOrderly implements MessageListenerOrderly {
|
||||
|
||||
private final DefaultRocketMQListenerContainer container;
|
||||
|
||||
TenantRocketMQListenerOrderly(DefaultRocketMQListenerContainer container) {
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
|
||||
for (MessageExt messageExt : msgs) {
|
||||
log.debug("received msg: {}", messageExt);
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
// 设置租户编号
|
||||
String tenantId = messageExt.getUserProperty(HEADER_TENANT_ID);
|
||||
if (StrUtil.isNotEmpty(tenantId)) {
|
||||
TenantContextHolder.setTenantId(Long.parseLong(tenantId));
|
||||
}
|
||||
|
||||
container.handleMessage(messageExt);
|
||||
|
||||
long costTime = System.currentTimeMillis() - now;
|
||||
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
|
||||
} catch (Exception e) {
|
||||
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
|
||||
context.setSuspendCurrentQueueTimeMillis(container.getSuspendCurrentQueueTimeMillis());
|
||||
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
||||
} finally {
|
||||
// 清理租户编号
|
||||
TenantContextHolder.clear();
|
||||
}
|
||||
}
|
||||
|
||||
return ConsumeOrderlyStatus.SUCCESS;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue