From f76843573ee25546b829b5a1209bba9b2ffcb12f Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sat, 22 Feb 2025 18:31:09 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E8=AF=84=E5=AE=A1?= =?UTF-8?q?=E3=80=91IoT=EF=BC=9A=E6=95=B0=E6=8D=AE=E6=A1=A5=E6=A2=81?= =?UTF-8?q?=E7=9A=84=E6=89=A7=E8=A1=8C=E5=99=A8=E6=8A=BD=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rule/action/IotRuleSceneDataBridgeAction.java | 7 ++----- .../databridge}/IotDataBridgeExecute.java | 2 +- .../databridge}/IotHttpDataBridgeExecute.java | 2 +- .../databridge}/IotRocketMQDataBridgeExecute.java | 12 ++++++++---- 4 files changed, 12 insertions(+), 11 deletions(-) rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/{execute => action/databridge}/IotDataBridgeExecute.java (92%) rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/{execute => action/databridge}/IotHttpDataBridgeExecute.java (98%) rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/{execute => action/databridge}/IotRocketMQDataBridgeExecute.java (92%) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index f87cd60011..6733331cb4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -7,7 +7,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService; -import cn.iocoder.yudao.module.iot.service.rule.execute.IotDataBridgeExecute; +import cn.iocoder.yudao.module.iot.service.rule.action.databridge.IotDataBridgeExecute; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -19,9 +19,6 @@ import java.util.List; * * @author 芋道源码 */ -// TODO @芋艿:【优化】因为 bridge 会比较多,所以可以考虑在 rule 下,新建一个 bridge 的 package,然后定义一个 bridgehandler,它有: -// 1. input 方法、output 方法 -// 2. build 方法,用于有状态的连接,例如说 mq、tcp、websocket @Component @Slf4j public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { @@ -49,7 +46,7 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { return; } - // 2.1 执行数据桥接操作 + // 2. 执行数据桥接操作 dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge)); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java similarity index 92% rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotDataBridgeExecute.java rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index 8979d21150..cd00f4f3e7 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.service.rule.execute; +package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java similarity index 98% rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotHttpDataBridgeExecute.java rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java index 861fadd6c0..76f1b793f6 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotHttpDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.service.rule.execute; +package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.common.util.http.HttpUtils; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java similarity index 92% rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 11ca2339cf..626208e187 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.service.rule.execute; +package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; @@ -30,12 +30,13 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { /** * 针对 {@link IotDataBridgeDO.RocketMQConfig} 的 DefaultMQProducer 缓存 */ + // TODO @puhui999:因为 kafka 之类也存在这个情况,是不是得搞个抽象类。提供一个 initProducer,和 closeProducer 方法 private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() - // 只阻塞当前数据加载线程,其他线程返回旧值 - .refreshAfterWrite(Duration.ofMinutes(10)) + .refreshAfterWrite(Duration.ofMinutes(10)) // TODO puhui999:应该是 read 30 分钟哈 // 增加移除监听器,自动关闭 producer .removalListener(notification -> { DefaultMQProducer producer = (DefaultMQProducer) notification.getValue(); + // TODO puhui999:if return,更简短哈 if (producer != null) { try { producer.shutdown(); @@ -45,8 +46,10 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { } } }) + // TODO @puhui999:就同步哈,不用异步处理。 // 通过 asyncReloading 实现全异步加载,包括 refreshAfterWrite 被阻塞的加载线程 .build(CacheLoader.asyncReloading(new CacheLoader() { + @Override public DefaultMQProducer load(IotDataBridgeDO.RocketMQConfig config) throws Exception { DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); @@ -55,6 +58,7 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); return producer; } + }, Executors.newCachedThreadPool())); @Override @@ -120,7 +124,7 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { log.info("[main][第一次执行,应该会创建新的 producer]"); action.executeRocketMQ(message, config); - log.info("[main][第二次执行,应该会复用缓存的 producer]"); + log.info("[main][第二次执行,应该会复用缓存的 producer]"); action.executeRocketMQ(message, config); }