【代码评审】IoT:数据桥梁的执行器抽离

This commit is contained in:
YunaiV 2025-02-22 18:31:09 +08:00
parent 672247dbe4
commit f76843573e
4 changed files with 12 additions and 11 deletions

View File

@ -7,7 +7,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService; import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService;
import cn.iocoder.yudao.module.iot.service.rule.execute.IotDataBridgeExecute; import cn.iocoder.yudao.module.iot.service.rule.action.databridge.IotDataBridgeExecute;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -19,9 +19,6 @@ import java.util.List;
* *
* @author 芋道源码 * @author 芋道源码
*/ */
// TODO @芋艿优化因为 bridge 会比较多所以可以考虑在 rule 新建一个 bridge package然后定义一个 bridgehandler它有
// 1. input 方法output 方法
// 2. build 方法用于有状态的连接例如说 mqtcpwebsocket
@Component @Component
@Slf4j @Slf4j
public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
@ -49,7 +46,7 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
return; return;
} }
// 2.1 执行数据桥接操作 // 2. 执行数据桥接操作
dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge)); dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge));
} }

View File

@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.service.rule.execute; package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;

View File

@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.service.rule.execute; package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.util.http.HttpUtils; import cn.iocoder.yudao.framework.common.util.http.HttpUtils;

View File

@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.service.rule.execute; package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
@ -30,12 +30,13 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
/** /**
* 针对 {@link IotDataBridgeDO.RocketMQConfig} DefaultMQProducer 缓存 * 针对 {@link IotDataBridgeDO.RocketMQConfig} DefaultMQProducer 缓存
*/ */
// TODO @puhui999因为 kafka 之类也存在这个情况是不是得搞个抽象类提供一个 initProducer closeProducer 方法
private final LoadingCache<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer> PRODUCER_CACHE = CacheBuilder.newBuilder() private final LoadingCache<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer> PRODUCER_CACHE = CacheBuilder.newBuilder()
// 只阻塞当前数据加载线程其他线程返回旧值 .refreshAfterWrite(Duration.ofMinutes(10)) // TODO puhui999应该是 read 30 分钟哈
.refreshAfterWrite(Duration.ofMinutes(10))
// 增加移除监听器自动关闭 producer // 增加移除监听器自动关闭 producer
.removalListener(notification -> { .removalListener(notification -> {
DefaultMQProducer producer = (DefaultMQProducer) notification.getValue(); DefaultMQProducer producer = (DefaultMQProducer) notification.getValue();
// TODO puhui999if return更简短哈
if (producer != null) { if (producer != null) {
try { try {
producer.shutdown(); producer.shutdown();
@ -45,8 +46,10 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
} }
} }
}) })
// TODO @puhui999就同步哈不用异步处理
// 通过 asyncReloading 实现全异步加载包括 refreshAfterWrite 被阻塞的加载线程 // 通过 asyncReloading 实现全异步加载包括 refreshAfterWrite 被阻塞的加载线程
.build(CacheLoader.asyncReloading(new CacheLoader<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer>() { .build(CacheLoader.asyncReloading(new CacheLoader<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer>() {
@Override @Override
public DefaultMQProducer load(IotDataBridgeDO.RocketMQConfig config) throws Exception { public DefaultMQProducer load(IotDataBridgeDO.RocketMQConfig config) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
@ -55,6 +58,7 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
return producer; return producer;
} }
}, Executors.newCachedThreadPool())); }, Executors.newCachedThreadPool()));
@Override @Override
@ -120,7 +124,7 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
log.info("[main][第一次执行,应该会创建新的 producer]"); log.info("[main][第一次执行,应该会创建新的 producer]");
action.executeRocketMQ(message, config); action.executeRocketMQ(message, config);
log.info("[main][第二次执行,应该会复用缓存的 producer]"); log.info("[main][第二次执行,应该会复用缓存的 producer]");
action.executeRocketMQ(message, config); action.executeRocketMQ(message, config);
} }