From 4be18af236ac036741c6adba23fa58d06f17a73c Mon Sep 17 00:00:00 2001 From: puhui999 Date: Thu, 20 Feb 2025 18:21:52 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E3=80=91IoT:=20=E5=9F=BA=E4=BA=8E=20guava=20=E5=AF=B9=20produc?= =?UTF-8?q?er=20=E5=81=9A=20cache?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execute/IotRocketMQDataBridgeExecute.java | 69 ++++++++++++++----- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java index 3e02e6960a..11ca2339cf 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java @@ -3,6 +3,9 @@ package cn.iocoder.yudao.module.iot.service.rule.execute; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; @@ -11,7 +14,9 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.stereotype.Component; +import java.time.Duration; import java.time.LocalDateTime; +import java.util.concurrent.Executors; /** * RocketMQ 的 {@link IotDataBridgeExecute} 实现类 @@ -22,6 +27,36 @@ import java.time.LocalDateTime; @Slf4j public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { + /** + * 针对 {@link IotDataBridgeDO.RocketMQConfig} 的 DefaultMQProducer 缓存 + */ + private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() + // 只阻塞当前数据加载线程,其他线程返回旧值 + .refreshAfterWrite(Duration.ofMinutes(10)) + // 增加移除监听器,自动关闭 producer + .removalListener(notification -> { + DefaultMQProducer producer = (DefaultMQProducer) notification.getValue(); + if (producer != null) { + try { + producer.shutdown(); + log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey()); + } catch (Exception e) { + log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e); + } + } + }) + // 通过 asyncReloading 实现全异步加载,包括 refreshAfterWrite 被阻塞的加载线程 + .build(CacheLoader.asyncReloading(new CacheLoader() { + @Override + public DefaultMQProducer load(IotDataBridgeDO.RocketMQConfig config) throws Exception { + DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); + producer.setNamesrvAddr(config.getNameServer()); + producer.start(); + log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); + return producer; + } + }, Executors.newCachedThreadPool())); + @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { // 1.1 校验数据桥接的类型 == ROCKETMQ @@ -33,14 +68,9 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { } private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { - // 1.1 创建生产者实例,指定生产者组名 - DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); - // TODO @puhui999:可以考虑,基于 guava 做 cache,使用 config 作为 key,然后假个 listener 超时,销毁 producer try { - // 1.2 设置 NameServer 地址 - producer.setNamesrvAddr(config.getNameServer()); - // 1.3 启动生产者 - producer.start(); + // 1. 获取或创建 Producer + DefaultMQProducer producer = PRODUCER_CACHE.get(config); // 2.1 创建消息对象,指定Topic、Tag和消息体 Message msg = new Message( @@ -58,18 +88,22 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { } } catch (Exception e) { log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e); - } finally { - // 3. 关闭生产者 - producer.shutdown(); } } // TODO @芋艿:测试代码,后续清理 public static void main(String[] args) { - // 1. 创建 IotRocketMQDataBridgeExecute 实例 + // 1. 创建一个共享的实例 IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute(); - // 2. 创建测试消息 + // 2. 创建共享的配置 + IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig(); + config.setNameServer("127.0.0.1:9876"); + config.setGroup("test-group"); + config.setTopic("test-topic"); + config.setTags("test-tag"); + + // 3. 创建共享的消息 IotDeviceMessage message = IotDeviceMessage.builder() .requestId("TEST-001") .productKey("testProduct") @@ -82,14 +116,11 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { .tenantId(1L) .build(); - // 3. 创建 RocketMQ 配置 - IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig(); - config.setNameServer("127.0.0.1:9876"); - config.setGroup("test-group"); - config.setTopic("test-topic"); - config.setTags("test-tag"); + // 4. 执行两次测试,验证缓存 + log.info("[main][第一次执行,应该会创建新的 producer]"); + action.executeRocketMQ(message, config); - // 4. 执行测试 + log.info("[main][第二次执行,应该会复用缓存的 producer]"); action.executeRocketMQ(message, config); }