From 3b54deb989630f21b7313ac398b4e1f417172fe4 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Mon, 3 Mar 2025 12:48:07 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E3=80=91IoT:=20=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE=E6=A1=A5?= =?UTF-8?q?=E6=A2=81=E7=9A=84=E6=89=A7=E8=A1=8C=E5=99=A8=E5=87=8F=E5=B0=91?= =?UTF-8?q?=E5=AD=90=E7=B1=BB=E4=BB=A3=E7=A0=81=E5=86=97=E4=BD=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AbstractCacheableDataBridgeExecute.java | 2 +- .../databridge/IotDataBridgeExecute.java | 28 +++++++++++++++++-- .../databridge/IotHttpDataBridgeExecute.java | 14 ++++------ .../IotKafkaMQDataBridgeExecute.java | 16 ++++------- .../IotRabbitMQDataBridgeExecute.java | 17 +++++------ .../IotRedisStreamMQDataBridgeExecute.java | 18 ++++-------- .../IotRocketMQDataBridgeExecute.java | 16 ++++------- 7 files changed, 56 insertions(+), 55 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index ec0c8bea97..52c4483ec7 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -33,7 +33,7 @@ import java.time.Duration; * @author HUIHUI */ @Slf4j -public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { +public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { /** * Producer 缓存 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index a10f75103c..3f842f1f50 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -9,9 +9,14 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; * * @author HUIHUI */ -public interface IotDataBridgeExecute { +public interface IotDataBridgeExecute { - // TODO @huihui:要不搞个 getType?然后 execute0 由子类实现。这样,子类的 executeRedisStream ,其实就是 execute0 了。 + /** + * 获取数据桥梁类型 + * + * @return 数据桥梁类型 + */ + Integer getType(); /** * 执行数据桥梁操作 @@ -19,6 +24,23 @@ public interface IotDataBridgeExecute { * @param message 设备消息 * @param dataBridge 数据桥梁 */ - void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge); + @SuppressWarnings({"unchecked"}) + default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥梁类型 + if (!getType().equals(dataBridge.getType())) { + return; + } + + // 1.2 执行对应的数据桥梁发送消息 + execute0(message, (Config) dataBridge.getConfig()); + } + + /** + * 【真正】执行数据桥梁操作 + * + * @param message 设备消息 + * @param config 桥梁配置 + */ + void execute0(IotDeviceMessage message, Config config); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java index 27b8bc6bba..ffe2c5b803 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java @@ -25,23 +25,19 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_ */ @Component @Slf4j -public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { +public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { @Resource private RestTemplate restTemplate; @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == HTTP - if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行 HTTP 请求 - executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.HTTP.getType(); } + @Override @SuppressWarnings({"unchecked", "deprecation"}) - private void executeHttp(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) { + public void execute0(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) { String url = null; HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase()); HttpEntity requestEntity = null; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 33aa744a85..1efcfe9cab 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -30,16 +30,12 @@ public class IotKafkaMQDataBridgeExecute extends private static final Duration SEND_TIMEOUT = Duration.ofMillis(10); @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1. 校验数据桥梁的类型 == KAFKA - if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) { - return; - } - // 2. 执行 Kafka 发送消息 - executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.KAFKA.getType(); } - private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { try { // 1. 获取或创建 KafkaTemplate KafkaTemplate kafkaTemplate = getProducer(config); @@ -113,10 +109,10 @@ public class IotKafkaMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.executeKafka(message, config); + action.execute0(message, config); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.executeKafka(message, config); + action.execute0(message, config); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 0918a5546d..27ebccc399 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -22,17 +22,14 @@ import java.time.LocalDateTime; public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { + @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == RABBITMQ - if (!IotDataBridgTypeEnum.RABBITMQ.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行 RabbitMQ 发送消息 - executeRabbitMQ(message, (IotDataBridgeDO.RabbitMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.RABBITMQ.getType(); } - private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { try { // 1. 获取或创建 Channel Channel channel = getProducer(config); @@ -111,10 +108,10 @@ public class IotRabbitMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 channel]"); - action.executeRabbitMQ(message, config); + action.execute0(message, config); log.info("[main][第二次执行,应该会复用缓存的 channel]"); - action.executeRabbitMQ(message, config); + action.execute0(message, config); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java index 77b688606a..f60528ef9f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -33,18 +33,12 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute> { @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁类型 - if (!IotDataBridgTypeEnum.REDIS_STREAM.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行消息发送 - executeRedisStream(message, (IotDataBridgeDO.RedisStreamMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.REDIS_STREAM.getType(); } - @SuppressWarnings("unchecked") - // TODO @huihui:try catch 交给父类来做,子类不处理异常 - private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { try { // 1. 获取 RedisTemplate RedisTemplate redisTemplate = getProducer(config); @@ -133,10 +127,10 @@ public class IotRedisStreamMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 RedisTemplate]"); - action.executeRedisStream(message, config); + action.execute0(message, config); log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]"); - action.executeRedisStream(message, config); + action.execute0(message, config); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 061bbfc69f..a68a6525e4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -24,16 +24,12 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == ROCKETMQ - if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行 RocketMQ 发送消息 - executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.ROCKETMQ.getType(); } - private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { try { // 1. 获取或创建 Producer DefaultMQProducer producer = getProducer(config); @@ -97,10 +93,10 @@ public class IotRocketMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.executeRocketMQ(message, config); + action.execute0(message, config); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.executeRocketMQ(message, config); + action.execute0(message, config); } }