diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java index 4cf1f8f285..a673b538ef 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java @@ -22,7 +22,7 @@ public interface IotRuleSceneAction { * 2. 非空的情况:设备触发 * @param config 配置 */ - void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config); + void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception; /** * 获得类型 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index 6733331cb4..d94922f5d0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -26,10 +26,10 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { @Resource private IotDataBridgeService dataBridgeService; @Resource - private List dataBridgeExecutes; + private List> dataBridgeExecutes; @Override - public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) { + public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception { // 1.1 如果消息为空,直接返回 if (message == null) { return; @@ -47,7 +47,9 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { } // 2. 执行数据桥接操作 - dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge)); + for (IotDataBridgeExecute execute : dataBridgeExecutes) { + execute.execute(message, dataBridge); + } } @Override diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index 52c4483ec7..d26c2dd436 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -1,5 +1,7 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -93,4 +95,20 @@ public abstract class AbstractCacheableDataBridgeExecute imple */ protected abstract void closeProducer(Producer producer) throws Exception; + @Override + @SuppressWarnings({"unchecked"}) + public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥梁类型 + if (!getType().equals(dataBridge.getType())) { + return; + } + + // 1.2 执行对应的数据桥梁发送消息 + try { + execute0(message, (Config) dataBridge.getConfig()); + } catch (Exception e) { + log.error("[execute][桥梁配置 config({}) 对应的 message({}) 发送异常]", dataBridge.getConfig(), message, e); + } + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index 3f842f1f50..ce3d0f1938 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -25,7 +25,7 @@ public interface IotDataBridgeExecute { * @param dataBridge 数据桥梁 */ @SuppressWarnings({"unchecked"}) - default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) throws Exception { // 1.1 校验数据桥梁类型 if (!getType().equals(dataBridge.getType())) { return; @@ -41,6 +41,6 @@ public interface IotDataBridgeExecute { * @param message 设备消息 * @param config 桥梁配置 */ - void execute0(IotDeviceMessage message, Config config); + void execute0(IotDeviceMessage message, Config config) throws Exception; } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 1efcfe9cab..b943eb31af 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -15,7 +15,6 @@ import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Kafka 的 {@link IotDataBridgeExecute} 实现类 @@ -35,20 +34,14 @@ public class IotKafkaMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { - try { - // 1. 获取或创建 KafkaTemplate - KafkaTemplate kafkaTemplate = getProducer(config); + public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) throws Exception { + // 1. 获取或创建 KafkaTemplate + KafkaTemplate kafkaTemplate = getProducer(config); - // 2. 发送消息并等待结果 - kafkaTemplate.send(config.getTopic(), message.toString()) - .get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待 - log.info("[executeKafka][message({}) 发送成功]", message); - } catch (TimeoutException e) { - log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e); - } catch (Exception e) { - log.error("[executeKafka][message({}) config({}) 发送异常]", message, config, e); - } + // 2. 发送消息并等待结果 + kafkaTemplate.send(config.getTopic(), message.toString()) + .get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待 + log.info("[execute0][message({}) 发送成功]", message); } @Override @@ -109,10 +102,10 @@ public class IotKafkaMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.execute0(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.execute0(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 27ebccc399..f99e651795 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -29,23 +29,19 @@ public class IotRabbitMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { - try { - // 1. 获取或创建 Channel - Channel channel = getProducer(config); + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) throws Exception { + // 1. 获取或创建 Channel + Channel channel = getProducer(config); - // 2.1 声明交换机、队列和绑定关系 - channel.exchangeDeclare(config.getExchange(), "direct", true); - channel.queueDeclare(config.getQueue(), true, false, false, null); - channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey()); + // 2.1 声明交换机、队列和绑定关系 + channel.exchangeDeclare(config.getExchange(), "direct", true); + channel.queueDeclare(config.getQueue(), true, false, false, null); + channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey()); - // 2.2 发送消息 - channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, - message.toString().getBytes(StandardCharsets.UTF_8)); - log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config); - } catch (Exception e) { - log.error("[executeRabbitMQ][message({}) config({}) 发送异常]", message, config, e); - } + // 2.2 发送消息 + channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, + message.toString().getBytes(StandardCharsets.UTF_8)); + log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config); } @Override @@ -107,11 +103,12 @@ public class IotRabbitMQDataBridgeExecute extends .build(); // 4. 执行两次测试,验证缓存 - log.info("[main][第一次执行,应该会创建新的 channel]"); - action.execute0(message, config); + // 4. 执行两次测试,验证缓存 + log.info("[main][第一次执行,应该会创建新的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); - log.info("[main][第二次执行,应该会复用缓存的 channel]"); - action.execute0(message, config); + log.info("[main][第二次执行,应该会复用缓存的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java index f60528ef9f..a20334f4fd 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -38,19 +38,15 @@ public class IotRedisStreamMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { - try { - // 1. 获取 RedisTemplate - RedisTemplate redisTemplate = getProducer(config); + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) throws Exception { + // 1. 获取 RedisTemplate + RedisTemplate redisTemplate = getProducer(config); - // 2. 创建并发送 Stream 记录 - ObjectRecord record = StreamRecords.newRecord() - .ofObject(message).withStreamKey(config.getTopic()); - String recordId = String.valueOf(redisTemplate.opsForStream().add(record)); - log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config); - } catch (Exception e) { - log.error("[executeRedisStream][消息发送失败] message: {}, config: {}", message, config, e); - } + // 2. 创建并发送 Stream 记录 + ObjectRecord record = StreamRecords.newRecord() + .ofObject(message).withStreamKey(config.getTopic()); + String recordId = String.valueOf(redisTemplate.opsForStream().add(record)); + log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config); } @Override @@ -126,11 +122,11 @@ public class IotRedisStreamMQDataBridgeExecute extends .build(); // 4. 执行两次测试,验证缓存 - log.info("[main][第一次执行,应该会创建新的 RedisTemplate]"); - action.execute0(message, config); + log.info("[main][第一次执行,应该会创建新的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); - log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]"); - action.execute0(message, config); + log.info("[main][第二次执行,应该会复用缓存的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index a68a6525e4..df413c7503 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -29,27 +29,23 @@ public class IotRocketMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { - try { - // 1. 获取或创建 Producer - DefaultMQProducer producer = getProducer(config); + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) throws Exception { + // 1. 获取或创建 Producer + DefaultMQProducer producer = getProducer(config); - // 2.1 创建消息对象,指定Topic、Tag和消息体 - Message msg = new Message( - config.getTopic(), - config.getTags(), - message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) - ); - // 2.2 发送同步消息并处理结果 - SendResult sendResult = producer.send(msg); - // 2.3 处理发送结果 - if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { - log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); - } else { - log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); - } - } catch (Exception e) { - log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e); + // 2.1 创建消息对象,指定Topic、Tag和消息体 + Message msg = new Message( + config.getTopic(), + config.getTags(), + message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) + ); + // 2.2 发送同步消息并处理结果 + SendResult sendResult = producer.send(msg); + // 2.3 处理发送结果 + if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { + log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); + } else { + log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); } } @@ -93,10 +89,10 @@ public class IotRocketMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.execute0(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.execute0(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } }