From 61ea09488ed624d1f15fc01b8ffa98aa717484b7 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Mon, 3 Mar 2025 12:22:19 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=91IoT:=20=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=A1=A5=E6=A2=81=E6=89=A7=E8=A1=8C=E5=99=A8=E6=8A=BD=E8=B1=A1?= =?UTF-8?q?=E7=B1=BB=E5=A2=9E=E5=8A=A0=E6=B3=9B=E5=9E=8B=EF=BC=8C=E5=87=8F?= =?UTF-8?q?=E5=B0=91=E5=AD=90=E7=B1=BB=E7=B1=BB=E5=9E=8B=E5=BC=BA=E8=BD=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AbstractCacheableDataBridgeExecute.java | 50 +++++++++++++------ .../IotKafkaMQDataBridgeExecute.java | 24 ++++----- .../IotRabbitMQDataBridgeExecute.java | 41 +++++++-------- .../IotRedisStreamMQDataBridgeExecute.java | 32 +++++------- .../IotRocketMQDataBridgeExecute.java | 18 +++---- 5 files changed, 81 insertions(+), 84 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index ebd0f87764..ec0c8bea97 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; import lombok.extern.slf4j.Slf4j; import java.time.Duration; @@ -15,22 +16,36 @@ import java.time.Duration; /** * 带缓存功能的数据桥梁执行器抽象类 * + * 该类提供了一个通用的缓存机制,用于管理各类数据桥接的生产者(Producer)实例。 + * + * 主要特点: + * - 基于Guava Cache实现高效的生产者实例缓存管理 + * - 自动处理生产者的生命周期(创建、获取、关闭) + * - 支持30分钟未访问自动过期清理机制 + * - 异常处理与日志记录,便于问题排查 + * + * 子类需要实现: + * - initProducer(Config) - 初始化特定类型的生产者实例 + * - closeProducer(Producer) - 关闭生产者实例并释放资源 + * + * @param 配置信息类型,用于初始化生产者 + * @param 生产者类型,负责将数据发送到目标系统 * @author HUIHUI */ @Slf4j -public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { +public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { - // TODO @huihui:AbstractCacheableDataBridgeExecute 这样,下面的 Object, Object 就有了类型;另外 IotDataBridgeDO.Config 可以替代一个 Object 哇, /** * Producer 缓存 */ - private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() - .expireAfterAccess(Duration.ofMinutes(30)) - .removalListener(notification -> { - Object producer = notification.getValue(); + private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofMinutes(30)) // 30 分钟未访问就提前过期 + .removalListener((RemovalListener) notification -> { + Producer producer = notification.getValue(); if (producer == null) { return; } + try { closeProducer(producer); log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey()); @@ -38,15 +53,18 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e); } }) - .build(new CacheLoader() { - + .build(new CacheLoader() { @Override - public Object load(Object config) throws Exception { - Object producer = initProducer(config); - log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); - return producer; + public Producer load(Config config) throws Exception { + try { + Producer producer = initProducer(config); + log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); + return producer; + } catch (Exception e) { + log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 创建启动失败]", config, e); + throw e; // 抛出异常,触发缓存加载失败机制 + } } - }); /** @@ -55,7 +73,7 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg * @param config 配置信息 * @return 生产者对象 */ - protected Object getProducer(Object config) throws Exception { + protected Producer getProducer(Config config) throws Exception { return PRODUCER_CACHE.get(config); } @@ -66,13 +84,13 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg * @return 生产者对象 * @throws Exception 如果初始化失败 */ - protected abstract Object initProducer(Object config) throws Exception; + protected abstract Producer initProducer(Config config) throws Exception; /** * 关闭生产者 * * @param producer 生产者对象 */ - protected abstract void closeProducer(Object producer); + protected abstract void closeProducer(Producer producer) throws Exception; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index e95d4ced95..33aa744a85 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -24,7 +24,8 @@ import java.util.concurrent.TimeoutException; */ @Component @Slf4j -public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotKafkaMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute> { private static final Duration SEND_TIMEOUT = Duration.ofMillis(10); @@ -38,11 +39,10 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig()); } - @SuppressWarnings("unchecked") private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { try { // 1. 获取或创建 KafkaTemplate - KafkaTemplate kafkaTemplate = (KafkaTemplate) getProducer(config); + KafkaTemplate kafkaTemplate = getProducer(config); // 2. 发送消息并等待结果 kafkaTemplate.send(config.getTopic(), message.toString()) @@ -56,24 +56,22 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec } @Override - protected Object initProducer(Object config) { - IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config; - + protected KafkaTemplate initProducer(IotDataBridgeDO.KafkaMQConfig config) { // 1.1 构建生产者配置 Map props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 1.2 如果配置了认证信息 - if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) { + if (config.getUsername() != null && config.getPassword() != null) { props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" - + kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";"); + + config.getUsername() + "\" password=\"" + config.getPassword() + "\";"); } // 1.3 如果启用 SSL - if (Boolean.TRUE.equals(kafkaConfig.getSsl())) { + if (Boolean.TRUE.equals(config.getSsl())) { props.put("security.protocol", "SSL"); } @@ -83,10 +81,8 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec } @Override - protected void closeProducer(Object producer) { - if (producer instanceof KafkaTemplate) { - ((KafkaTemplate) producer).destroy(); - } + protected void closeProducer(KafkaTemplate producer) { + producer.destroy(); } // TODO @芋艿:测试代码,后续清理 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 5611873155..0918a5546d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -19,7 +19,8 @@ import java.time.LocalDateTime; */ @Component @Slf4j -public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotRabbitMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute { @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { @@ -34,7 +35,7 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { try { // 1. 获取或创建 Channel - Channel channel = (Channel) getProducer(config); + Channel channel = getProducer(config); // 2.1 声明交换机、队列和绑定关系 channel.exchangeDeclare(config.getExchange(), "direct", true); @@ -52,16 +53,14 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe @Override @SuppressWarnings("resource") - protected Object initProducer(Object config) throws Exception { - IotDataBridgeDO.RabbitMQConfig rabbitConfig = (IotDataBridgeDO.RabbitMQConfig) config; - + protected Channel initProducer(IotDataBridgeDO.RabbitMQConfig config) throws Exception { // 1. 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(rabbitConfig.getHost()); - factory.setPort(rabbitConfig.getPort()); - factory.setVirtualHost(rabbitConfig.getVirtualHost()); - factory.setUsername(rabbitConfig.getUsername()); - factory.setPassword(rabbitConfig.getPassword()); + factory.setHost(config.getHost()); + factory.setPort(config.getPort()); + factory.setVirtualHost(config.getVirtualHost()); + factory.setUsername(config.getUsername()); + factory.setPassword(config.getPassword()); // 2. 创建连接 Connection connection = factory.newConnection(); @@ -71,20 +70,13 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe } @Override - protected void closeProducer(Object producer) { - if (producer instanceof Channel) { - try { - Channel channel = (Channel) producer; - if (channel.isOpen()) { - channel.close(); - } - Connection connection = channel.getConnection(); - if (connection.isOpen()) { - connection.close(); - } - } catch (Exception e) { - log.error("[closeProducer][关闭 RabbitMQ 连接异常]", e); - } + protected void closeProducer(Channel channel) throws Exception { + if (channel.isOpen()) { + channel.close(); + } + Connection connection = channel.getConnection(); + if (connection.isOpen()) { + connection.close(); } } @@ -124,4 +116,5 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe log.info("[main][第二次执行,应该会复用缓存的 channel]"); action.executeRabbitMQ(message, config); } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java index 552fc3425b..77b688606a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -29,7 +29,8 @@ import java.time.LocalDateTime; */ @Component @Slf4j -public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotRedisStreamMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute> { @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { @@ -46,7 +47,7 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { try { // 1. 获取 RedisTemplate - RedisTemplate redisTemplate = (RedisTemplate) getProducer(config); + RedisTemplate redisTemplate = getProducer(config); // 2. 创建并发送 Stream 记录 ObjectRecord record = StreamRecords.newRecord() @@ -59,17 +60,15 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid } @Override - protected Object initProducer(Object config) { - IotDataBridgeDO.RedisStreamMQConfig redisConfig = (IotDataBridgeDO.RedisStreamMQConfig) config; - + protected RedisTemplate initProducer(IotDataBridgeDO.RedisStreamMQConfig config) { // 1.1 创建 Redisson 配置 Config redissonConfig = new Config(); SingleServerConfig serverConfig = redissonConfig.useSingleServer() - .setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort()) - .setDatabase(redisConfig.getDatabase()); + .setAddress("redis://" + config.getHost() + ":" + config.getPort()) + .setDatabase(config.getDatabase()); // 1.2 设置密码(如果有) - if (StrUtil.isNotBlank(redisConfig.getPassword())) { - serverConfig.setPassword(redisConfig.getPassword()); + if (StrUtil.isNotBlank(config.getPassword())) { + serverConfig.setPassword(config.getPassword()); } // TODO @huihui:看看能不能简化一些。按道理说,不用这么多的哈。 @@ -90,17 +89,10 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid } @Override - protected void closeProducer(Object producer) { - // TODO @huihui:try catch 交给父类来做。子类不处理异常 - if (producer instanceof RedisTemplate) { - RedisConnectionFactory factory = ((RedisTemplate) producer).getConnectionFactory(); - try { - if (factory != null) { - ((RedissonConnectionFactory) factory).destroy(); - } - } catch (Exception e) { - log.error("[closeProducer][关闭 redisson 连接异常]", e); - } + protected void closeProducer(RedisTemplate producer) throws Exception { + RedisConnectionFactory factory = producer.getConnectionFactory(); + if (factory != null) { + ((RedissonConnectionFactory) factory).destroy(); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 3b53252539..061bbfc69f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -20,7 +20,8 @@ import java.time.LocalDateTime; */ @Component @Slf4j -public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotRocketMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute { @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { @@ -35,7 +36,7 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { try { // 1. 获取或创建 Producer - DefaultMQProducer producer = (DefaultMQProducer) getProducer(config); + DefaultMQProducer producer = getProducer(config); // 2.1 创建消息对象,指定Topic、Tag和消息体 Message msg = new Message( @@ -57,19 +58,16 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe } @Override - protected Object initProducer(Object config) throws Exception { - IotDataBridgeDO.RocketMQConfig rocketMQConfig = (IotDataBridgeDO.RocketMQConfig) config; - DefaultMQProducer producer = new DefaultMQProducer(rocketMQConfig.getGroup()); - producer.setNamesrvAddr(rocketMQConfig.getNameServer()); + protected DefaultMQProducer initProducer(IotDataBridgeDO.RocketMQConfig config) throws Exception { + DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); + producer.setNamesrvAddr(config.getNameServer()); producer.start(); return producer; } @Override - protected void closeProducer(Object producer) { - if (producer instanceof DefaultMQProducer) { - ((DefaultMQProducer) producer).shutdown(); - } + protected void closeProducer(DefaultMQProducer producer) { + producer.shutdown(); } // TODO @芋艿:测试代码,后续清理 From 3b54deb989630f21b7313ac398b4e1f417172fe4 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Mon, 3 Mar 2025 12:48:07 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=91IoT:=20=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=A1=A5=E6=A2=81=E7=9A=84=E6=89=A7=E8=A1=8C=E5=99=A8=E5=87=8F?= =?UTF-8?q?=E5=B0=91=E5=AD=90=E7=B1=BB=E4=BB=A3=E7=A0=81=E5=86=97=E4=BD=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AbstractCacheableDataBridgeExecute.java | 2 +- .../databridge/IotDataBridgeExecute.java | 28 +++++++++++++++++-- .../databridge/IotHttpDataBridgeExecute.java | 14 ++++------ .../IotKafkaMQDataBridgeExecute.java | 16 ++++------- .../IotRabbitMQDataBridgeExecute.java | 17 +++++------ .../IotRedisStreamMQDataBridgeExecute.java | 18 ++++-------- .../IotRocketMQDataBridgeExecute.java | 16 ++++------- 7 files changed, 56 insertions(+), 55 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index ec0c8bea97..52c4483ec7 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -33,7 +33,7 @@ import java.time.Duration; * @author HUIHUI */ @Slf4j -public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { +public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { /** * Producer 缓存 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index a10f75103c..3f842f1f50 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -9,9 +9,14 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; * * @author HUIHUI */ -public interface IotDataBridgeExecute { +public interface IotDataBridgeExecute { - // TODO @huihui:要不搞个 getType?然后 execute0 由子类实现。这样,子类的 executeRedisStream ,其实就是 execute0 了。 + /** + * 获取数据桥梁类型 + * + * @return 数据桥梁类型 + */ + Integer getType(); /** * 执行数据桥梁操作 @@ -19,6 +24,23 @@ public interface IotDataBridgeExecute { * @param message 设备消息 * @param dataBridge 数据桥梁 */ - void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge); + @SuppressWarnings({"unchecked"}) + default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥梁类型 + if (!getType().equals(dataBridge.getType())) { + return; + } + + // 1.2 执行对应的数据桥梁发送消息 + execute0(message, (Config) dataBridge.getConfig()); + } + + /** + * 【真正】执行数据桥梁操作 + * + * @param message 设备消息 + * @param config 桥梁配置 + */ + void execute0(IotDeviceMessage message, Config config); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java index 27b8bc6bba..ffe2c5b803 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java @@ -25,23 +25,19 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_ */ @Component @Slf4j -public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { +public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { @Resource private RestTemplate restTemplate; @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == HTTP - if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行 HTTP 请求 - executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.HTTP.getType(); } + @Override @SuppressWarnings({"unchecked", "deprecation"}) - private void executeHttp(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) { + public void execute0(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) { String url = null; HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase()); HttpEntity requestEntity = null; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 33aa744a85..1efcfe9cab 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -30,16 +30,12 @@ public class IotKafkaMQDataBridgeExecute extends private static final Duration SEND_TIMEOUT = Duration.ofMillis(10); @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1. 校验数据桥梁的类型 == KAFKA - if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) { - return; - } - // 2. 执行 Kafka 发送消息 - executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.KAFKA.getType(); } - private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { try { // 1. 获取或创建 KafkaTemplate KafkaTemplate kafkaTemplate = getProducer(config); @@ -113,10 +109,10 @@ public class IotKafkaMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.executeKafka(message, config); + action.execute0(message, config); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.executeKafka(message, config); + action.execute0(message, config); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 0918a5546d..27ebccc399 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -22,17 +22,14 @@ import java.time.LocalDateTime; public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { + @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == RABBITMQ - if (!IotDataBridgTypeEnum.RABBITMQ.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行 RabbitMQ 发送消息 - executeRabbitMQ(message, (IotDataBridgeDO.RabbitMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.RABBITMQ.getType(); } - private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { try { // 1. 获取或创建 Channel Channel channel = getProducer(config); @@ -111,10 +108,10 @@ public class IotRabbitMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 channel]"); - action.executeRabbitMQ(message, config); + action.execute0(message, config); log.info("[main][第二次执行,应该会复用缓存的 channel]"); - action.executeRabbitMQ(message, config); + action.execute0(message, config); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java index 77b688606a..f60528ef9f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -33,18 +33,12 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute> { @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁类型 - if (!IotDataBridgTypeEnum.REDIS_STREAM.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行消息发送 - executeRedisStream(message, (IotDataBridgeDO.RedisStreamMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.REDIS_STREAM.getType(); } - @SuppressWarnings("unchecked") - // TODO @huihui:try catch 交给父类来做,子类不处理异常 - private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { try { // 1. 获取 RedisTemplate RedisTemplate redisTemplate = getProducer(config); @@ -133,10 +127,10 @@ public class IotRedisStreamMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 RedisTemplate]"); - action.executeRedisStream(message, config); + action.execute0(message, config); log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]"); - action.executeRedisStream(message, config); + action.execute0(message, config); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 061bbfc69f..a68a6525e4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -24,16 +24,12 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == ROCKETMQ - if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行 RocketMQ 发送消息 - executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgTypeEnum.ROCKETMQ.getType(); } - private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { try { // 1. 获取或创建 Producer DefaultMQProducer producer = getProducer(config); @@ -97,10 +93,10 @@ public class IotRocketMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.executeRocketMQ(message, config); + action.execute0(message, config); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.executeRocketMQ(message, config); + action.execute0(message, config); } } From ce5e64e0aa635be458334af1d27236b102c6db4b Mon Sep 17 00:00:00 2001 From: puhui999 Date: Mon, 3 Mar 2025 13:03:41 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=91IoT:=20=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=A1=A5=E6=A2=81=E7=9A=84=E6=89=A7=E8=A1=8C=E5=99=A8=E5=87=8F?= =?UTF-8?q?=E5=B0=91=E5=AD=90=E7=B1=BB=E4=BB=A3=E7=A0=81=E5=86=97=E4=BD=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rule/action/IotRuleSceneAction.java | 2 +- .../action/IotRuleSceneDataBridgeAction.java | 8 ++-- .../AbstractCacheableDataBridgeExecute.java | 18 +++++++++ .../databridge/IotDataBridgeExecute.java | 4 +- .../IotKafkaMQDataBridgeExecute.java | 25 +++++------- .../IotRabbitMQDataBridgeExecute.java | 35 ++++++++-------- .../IotRedisStreamMQDataBridgeExecute.java | 28 ++++++------- .../IotRocketMQDataBridgeExecute.java | 40 +++++++++---------- 8 files changed, 81 insertions(+), 79 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java index 4cf1f8f285..a673b538ef 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java @@ -22,7 +22,7 @@ public interface IotRuleSceneAction { * 2. 非空的情况:设备触发 * @param config 配置 */ - void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config); + void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception; /** * 获得类型 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index 6733331cb4..d94922f5d0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -26,10 +26,10 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { @Resource private IotDataBridgeService dataBridgeService; @Resource - private List dataBridgeExecutes; + private List> dataBridgeExecutes; @Override - public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) { + public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception { // 1.1 如果消息为空,直接返回 if (message == null) { return; @@ -47,7 +47,9 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { } // 2. 执行数据桥接操作 - dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge)); + for (IotDataBridgeExecute execute : dataBridgeExecutes) { + execute.execute(message, dataBridge); + } } @Override diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index 52c4483ec7..d26c2dd436 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -1,5 +1,7 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -93,4 +95,20 @@ public abstract class AbstractCacheableDataBridgeExecute imple */ protected abstract void closeProducer(Producer producer) throws Exception; + @Override + @SuppressWarnings({"unchecked"}) + public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥梁类型 + if (!getType().equals(dataBridge.getType())) { + return; + } + + // 1.2 执行对应的数据桥梁发送消息 + try { + execute0(message, (Config) dataBridge.getConfig()); + } catch (Exception e) { + log.error("[execute][桥梁配置 config({}) 对应的 message({}) 发送异常]", dataBridge.getConfig(), message, e); + } + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index 3f842f1f50..ce3d0f1938 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -25,7 +25,7 @@ public interface IotDataBridgeExecute { * @param dataBridge 数据桥梁 */ @SuppressWarnings({"unchecked"}) - default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) throws Exception { // 1.1 校验数据桥梁类型 if (!getType().equals(dataBridge.getType())) { return; @@ -41,6 +41,6 @@ public interface IotDataBridgeExecute { * @param message 设备消息 * @param config 桥梁配置 */ - void execute0(IotDeviceMessage message, Config config); + void execute0(IotDeviceMessage message, Config config) throws Exception; } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 1efcfe9cab..b943eb31af 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -15,7 +15,6 @@ import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Kafka 的 {@link IotDataBridgeExecute} 实现类 @@ -35,20 +34,14 @@ public class IotKafkaMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { - try { - // 1. 获取或创建 KafkaTemplate - KafkaTemplate kafkaTemplate = getProducer(config); + public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) throws Exception { + // 1. 获取或创建 KafkaTemplate + KafkaTemplate kafkaTemplate = getProducer(config); - // 2. 发送消息并等待结果 - kafkaTemplate.send(config.getTopic(), message.toString()) - .get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待 - log.info("[executeKafka][message({}) 发送成功]", message); - } catch (TimeoutException e) { - log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e); - } catch (Exception e) { - log.error("[executeKafka][message({}) config({}) 发送异常]", message, config, e); - } + // 2. 发送消息并等待结果 + kafkaTemplate.send(config.getTopic(), message.toString()) + .get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待 + log.info("[execute0][message({}) 发送成功]", message); } @Override @@ -109,10 +102,10 @@ public class IotKafkaMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.execute0(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.execute0(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 27ebccc399..f99e651795 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -29,23 +29,19 @@ public class IotRabbitMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { - try { - // 1. 获取或创建 Channel - Channel channel = getProducer(config); + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) throws Exception { + // 1. 获取或创建 Channel + Channel channel = getProducer(config); - // 2.1 声明交换机、队列和绑定关系 - channel.exchangeDeclare(config.getExchange(), "direct", true); - channel.queueDeclare(config.getQueue(), true, false, false, null); - channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey()); + // 2.1 声明交换机、队列和绑定关系 + channel.exchangeDeclare(config.getExchange(), "direct", true); + channel.queueDeclare(config.getQueue(), true, false, false, null); + channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey()); - // 2.2 发送消息 - channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, - message.toString().getBytes(StandardCharsets.UTF_8)); - log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config); - } catch (Exception e) { - log.error("[executeRabbitMQ][message({}) config({}) 发送异常]", message, config, e); - } + // 2.2 发送消息 + channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, + message.toString().getBytes(StandardCharsets.UTF_8)); + log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config); } @Override @@ -107,11 +103,12 @@ public class IotRabbitMQDataBridgeExecute extends .build(); // 4. 执行两次测试,验证缓存 - log.info("[main][第一次执行,应该会创建新的 channel]"); - action.execute0(message, config); + // 4. 执行两次测试,验证缓存 + log.info("[main][第一次执行,应该会创建新的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); - log.info("[main][第二次执行,应该会复用缓存的 channel]"); - action.execute0(message, config); + log.info("[main][第二次执行,应该会复用缓存的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java index f60528ef9f..a20334f4fd 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -38,19 +38,15 @@ public class IotRedisStreamMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { - try { - // 1. 获取 RedisTemplate - RedisTemplate redisTemplate = getProducer(config); + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) throws Exception { + // 1. 获取 RedisTemplate + RedisTemplate redisTemplate = getProducer(config); - // 2. 创建并发送 Stream 记录 - ObjectRecord record = StreamRecords.newRecord() - .ofObject(message).withStreamKey(config.getTopic()); - String recordId = String.valueOf(redisTemplate.opsForStream().add(record)); - log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config); - } catch (Exception e) { - log.error("[executeRedisStream][消息发送失败] message: {}, config: {}", message, config, e); - } + // 2. 创建并发送 Stream 记录 + ObjectRecord record = StreamRecords.newRecord() + .ofObject(message).withStreamKey(config.getTopic()); + String recordId = String.valueOf(redisTemplate.opsForStream().add(record)); + log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config); } @Override @@ -126,11 +122,11 @@ public class IotRedisStreamMQDataBridgeExecute extends .build(); // 4. 执行两次测试,验证缓存 - log.info("[main][第一次执行,应该会创建新的 RedisTemplate]"); - action.execute0(message, config); + log.info("[main][第一次执行,应该会创建新的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); - log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]"); - action.execute0(message, config); + log.info("[main][第二次执行,应该会复用缓存的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index a68a6525e4..df413c7503 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -29,27 +29,23 @@ public class IotRocketMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { - try { - // 1. 获取或创建 Producer - DefaultMQProducer producer = getProducer(config); + public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) throws Exception { + // 1. 获取或创建 Producer + DefaultMQProducer producer = getProducer(config); - // 2.1 创建消息对象,指定Topic、Tag和消息体 - Message msg = new Message( - config.getTopic(), - config.getTags(), - message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) - ); - // 2.2 发送同步消息并处理结果 - SendResult sendResult = producer.send(msg); - // 2.3 处理发送结果 - if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { - log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); - } else { - log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); - } - } catch (Exception e) { - log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e); + // 2.1 创建消息对象,指定Topic、Tag和消息体 + Message msg = new Message( + config.getTopic(), + config.getTags(), + message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) + ); + // 2.2 发送同步消息并处理结果 + SendResult sendResult = producer.send(msg); + // 2.3 处理发送结果 + if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { + log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); + } else { + log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); } } @@ -93,10 +89,10 @@ public class IotRocketMQDataBridgeExecute extends // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.execute0(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.execute0(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } From 7ab61b6d061801c5a24cd1d974ad382f174da1fe Mon Sep 17 00:00:00 2001 From: puhui999 Date: Mon, 3 Mar 2025 21:50:52 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=91IoT:=20kafka=20=E6=95=B0=E6=8D=AE=E6=A1=A5?= =?UTF-8?q?=E6=A2=81=E6=B6=88=E6=81=AF=E7=AD=89=E5=BE=85=E7=9A=84=E6=9C=80?= =?UTF-8?q?=E5=A4=A7=E6=97=B6=E9=97=B4=E8=B0=83=E6=95=B4=E4=B8=BA=2010=20?= =?UTF-8?q?=E7=A7=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rule/action/databridge/IotKafkaMQDataBridgeExecute.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index b943eb31af..9c125960b4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit; public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute> { - private static final Duration SEND_TIMEOUT = Duration.ofMillis(10); + private static final Duration SEND_TIMEOUT = Duration.ofMillis(10000); // 10 秒超时时间 @Override public Integer getType() { From 831970233c63cd50f056b330f0e675102d2dd9c3 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sat, 8 Mar 2025 10:50:22 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=91IoT:=20=E6=95=B0=E6=8D=AE=E6=A1=A5=E6=A2=81?= =?UTF-8?q?=E7=9A=84=E6=89=A7=E8=A1=8C=E5=99=A8=E6=A0=B9=E6=8D=AE=E5=BC=95?= =?UTF-8?q?=E5=85=A5=E7=9A=84=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=E5=8A=A8?= =?UTF-8?q?=E6=80=81=E5=8A=A0=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../databridge/IotKafkaMQDataBridgeExecute.java | 2 ++ .../databridge/IotRabbitMQDataBridgeExecute.java | 2 ++ .../databridge/IotRocketMQDataBridgeExecute.java | 2 ++ yudao-server/pom.xml | 13 +++++++++++++ 4 files changed, 19 insertions(+) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 9c125960b4..6bcf41ab68 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -21,6 +22,7 @@ import java.util.concurrent.TimeUnit; * * @author HUIHUI */ +@ConditionalOnClass(KafkaTemplate.class) @Component @Slf4j public class IotKafkaMQDataBridgeExecute extends diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index f99e651795..ebe9d8ddd0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -7,6 +7,7 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @@ -17,6 +18,7 @@ import java.time.LocalDateTime; * * @author HUIHUI */ +@ConditionalOnClass(Channel.class) @Component @Slf4j public class IotRabbitMQDataBridgeExecute extends diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index df413c7503..655ad55ebe 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -9,6 +9,7 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @@ -18,6 +19,7 @@ import java.time.LocalDateTime; * * @author HUIHUI */ +@ConditionalOnClass(DefaultMQProducer.class) @Component @Slf4j public class IotRocketMQDataBridgeExecute extends diff --git a/yudao-server/pom.xml b/yudao-server/pom.xml index 492e31db56..2cf04e8eb9 100644 --- a/yudao-server/pom.xml +++ b/yudao-server/pom.xml @@ -114,6 +114,19 @@ yudao-module-iot-biz ${revision} + + + org.apache.rocketmq + rocketmq-spring-boot-starter + + + org.springframework.kafka + spring-kafka + + + org.springframework.boot + spring-boot-starter-amqp + From 415dd435f36d995309f2360289602b2d062311b5 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sat, 8 Mar 2025 11:25:03 +0800 Subject: [PATCH 6/9] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=91IoT:=20=E6=95=B0=E6=8D=AE=E6=A1=A5=E6=A2=81?= =?UTF-8?q?=E7=9A=84=E6=89=A7=E8=A1=8C=E5=99=A8=E6=A0=B9=E6=8D=AE=E5=BC=95?= =?UTF-8?q?=E5=85=A5=E7=9A=84=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=E5=8A=A8?= =?UTF-8?q?=E6=80=81=E5=8A=A0=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rule/action/databridge/IotKafkaMQDataBridgeExecute.java | 2 +- .../rule/action/databridge/IotRabbitMQDataBridgeExecute.java | 2 +- .../rule/action/databridge/IotRocketMQDataBridgeExecute.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 6bcf41ab68..6e3ef67a25 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; * * @author HUIHUI */ -@ConditionalOnClass(KafkaTemplate.class) +@ConditionalOnClass(name = "org.springframework.kafka.core.KafkaTemplate") @Component @Slf4j public class IotKafkaMQDataBridgeExecute extends diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index ebe9d8ddd0..5599144dad 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -18,7 +18,7 @@ import java.time.LocalDateTime; * * @author HUIHUI */ -@ConditionalOnClass(Channel.class) +@ConditionalOnClass(name = "com.rabbitmq.client.Channel") @Component @Slf4j public class IotRabbitMQDataBridgeExecute extends diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 655ad55ebe..542d190ed5 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -19,7 +19,7 @@ import java.time.LocalDateTime; * * @author HUIHUI */ -@ConditionalOnClass(DefaultMQProducer.class) +@ConditionalOnClass(name = "org.apache.rocketmq.client.producer.DefaultMQProducer") @Component @Slf4j public class IotRocketMQDataBridgeExecute extends From ff9267ad759a819ba97cdd19c5264cb47ad23043 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sun, 9 Mar 2025 12:53:54 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E3=80=91IoT:=20=E6=95=B0=E6=8D=AE=E6=A1=A5=E6=A2=81?= =?UTF-8?q?=20CRUD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/iot/enums/ErrorCodeConstants.java | 3 + .../admin/rule/IotDataBridgeController.java | 93 +++++++++++++++++++ .../vo/databridge/IotDataBridgePageReqVO.java | 39 ++++++++ .../vo/databridge/IotDataBridgeRespVO.java | 52 +++++++++++ .../vo/databridge/IotDataBridgeSaveReqVO.java | 37 ++++++++ .../admin/rule/vo/package-info.java | 1 + .../dal/mysql/rule/IotDataBridgeMapper.java | 22 ++++- .../service/rule/IotDataBridgeService.java | 48 ++++++++-- .../rule/IotDataBridgeServiceImpl.java | 67 ++++++++----- .../action/IotRuleSceneDataBridgeAction.java | 2 +- 10 files changed, 334 insertions(+), 30 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/IotDataBridgeController.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgePageReqVO.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/package-info.java diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java index 3d12c4b594..7b84c66fcf 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java @@ -70,4 +70,7 @@ public interface ErrorCodeConstants { // ========== MQTT 通信相关 1-050-009-000 ========== ErrorCode MQTT_TOPIC_ILLEGAL = new ErrorCode(1_050_009_000, "topic illegal"); + // ========== IoT 数据桥梁 1-050-010-000 ========== + ErrorCode DATA_BRIDGE_NOT_EXISTS = new ErrorCode(1_050_010_000, "IoT 数据桥梁不存在"); + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/IotDataBridgeController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/IotDataBridgeController.java new file mode 100644 index 0000000000..2d2f8fb75b --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/IotDataBridgeController.java @@ -0,0 +1,93 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule; + +import cn.iocoder.yudao.framework.apilog.core.annotation.ApiAccessLog; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.common.pojo.PageParam; +import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.framework.common.util.object.BeanUtils; +import cn.iocoder.yudao.framework.excel.core.util.ExcelUtils; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeRespVO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeSaveReqVO; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import jakarta.servlet.http.HttpServletResponse; +import jakarta.validation.Valid; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.io.IOException; +import java.util.List; + +import static cn.iocoder.yudao.framework.apilog.core.enums.OperateTypeEnum.EXPORT; +import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; + +@Tag(name = "管理后台 - IoT 数据桥梁") +@RestController +@RequestMapping("/iot/data-bridge") +@Validated +public class IotDataBridgeController { + + @Resource + private IotDataBridgeService dataBridgeService; + + @PostMapping("/create") + @Operation(summary = "创建IoT 数据桥梁") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:create')") + public CommonResult createDataBridge(@Valid @RequestBody IotDataBridgeSaveReqVO createReqVO) { + return success(dataBridgeService.createDataBridge(createReqVO)); + } + + @PutMapping("/update") + @Operation(summary = "更新IoT 数据桥梁") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:update')") + public CommonResult updateDataBridge(@Valid @RequestBody IotDataBridgeSaveReqVO updateReqVO) { + dataBridgeService.updateDataBridge(updateReqVO); + return success(true); + } + + @DeleteMapping("/delete") + @Operation(summary = "删除IoT 数据桥梁") + @Parameter(name = "id", description = "编号", required = true) + @PreAuthorize("@ss.hasPermission('iot:data-bridge:delete')") + public CommonResult deleteDataBridge(@RequestParam("id") Long id) { + dataBridgeService.deleteDataBridge(id); + return success(true); + } + + @GetMapping("/get") + @Operation(summary = "获得IoT 数据桥梁") + @Parameter(name = "id", description = "编号", required = true, example = "1024") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:query')") + public CommonResult getDataBridge(@RequestParam("id") Long id) { + IotDataBridgeDO dataBridge = dataBridgeService.getDataBridge(id); + return success(BeanUtils.toBean(dataBridge, IotDataBridgeRespVO.class)); + } + + @GetMapping("/page") + @Operation(summary = "获得IoT 数据桥梁分页") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:query')") + public CommonResult> getDataBridgePage(@Valid IotDataBridgePageReqVO pageReqVO) { + PageResult pageResult = dataBridgeService.getDataBridgePage(pageReqVO); + return success(BeanUtils.toBean(pageResult, IotDataBridgeRespVO.class)); + } + + @GetMapping("/export-excel") + @Operation(summary = "导出IoT 数据桥梁 Excel") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:export')") + @ApiAccessLog(operateType = EXPORT) + public void exportDataBridgeExcel(@Valid IotDataBridgePageReqVO pageReqVO, + HttpServletResponse response) throws IOException { + pageReqVO.setPageSize(PageParam.PAGE_SIZE_NONE); + List list = dataBridgeService.getDataBridgePage(pageReqVO).getList(); + // 导出 Excel + ExcelUtils.write(response, "IoT 数据桥梁.xls", "数据", IotDataBridgeRespVO.class, + BeanUtils.toBean(list, IotDataBridgeRespVO.class)); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgePageReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgePageReqVO.java new file mode 100644 index 0000000000..7da87be07a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgePageReqVO.java @@ -0,0 +1,39 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge; + +import cn.iocoder.yudao.framework.common.pojo.PageParam; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.springframework.format.annotation.DateTimeFormat; + +import java.time.LocalDateTime; + +import static cn.iocoder.yudao.framework.common.util.date.DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND; + +@Schema(description = "管理后台 - IoT 数据桥梁分页 Request VO") +@Data +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class IotDataBridgePageReqVO extends PageParam { + + @Schema(description = "桥梁名称", example = "赵六") + private String name; + + @Schema(description = "桥梁描述", example = "随便") + private String description; + + @Schema(description = "桥梁状态", example = "2") + private Integer status; + + @Schema(description = "桥梁方向") + private Integer direction; + + @Schema(description = "桥梁类型", example = "1") + private Integer type; + + @Schema(description = "创建时间") + @DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND) + private LocalDateTime[] createTime; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java new file mode 100644 index 0000000000..85247358c4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java @@ -0,0 +1,52 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge; + +import cn.iocoder.yudao.framework.excel.core.annotations.DictFormat; +import cn.iocoder.yudao.framework.excel.core.convert.DictConvert; +import com.alibaba.excel.annotation.ExcelIgnoreUnannotated; +import com.alibaba.excel.annotation.ExcelProperty; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.time.LocalDateTime; + +@Schema(description = "管理后台 - IoT 数据桥梁 Response VO") +@Data +@ExcelIgnoreUnannotated +public class IotDataBridgeRespVO { + + @Schema(description = "桥梁编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "18564") + @ExcelProperty("桥梁编号") + private Long id; + + @Schema(description = "桥梁名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "赵六") + @ExcelProperty("桥梁名称") + private String name; + + @Schema(description = "桥梁描述", example = "随便") + @ExcelProperty("桥梁描述") + private String description; + + @Schema(description = "桥梁状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "2") + @ExcelProperty(value = "桥梁状态", converter = DictConvert.class) + @DictFormat("common_status") // TODO 代码优化:建议设置到对应的 DictTypeConstants 枚举类中 + private Integer status; + + @Schema(description = "桥梁方向", requiredMode = Schema.RequiredMode.REQUIRED) + @ExcelProperty(value = "桥梁方向", converter = DictConvert.class) + @DictFormat("iot_data_bridg_direction_enum") // TODO 代码优化:建议设置到对应的 DictTypeConstants 枚举类中 + private Integer direction; + + @Schema(description = "桥梁类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + @ExcelProperty(value = "桥梁类型", converter = DictConvert.class) + @DictFormat("iot_data_bridg_type_enum") // TODO 代码优化:建议设置到对应的 DictTypeConstants 枚举类中 + private Integer type; + + @Schema(description = "桥梁配置") + @ExcelProperty("桥梁配置") + private String config; + + @Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED) + @ExcelProperty("创建时间") + private LocalDateTime createTime; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java new file mode 100644 index 0000000000..2fd8b36093 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +@Schema(description = "管理后台 - IoT 数据桥梁新增/修改 Request VO") +@Data +public class IotDataBridgeSaveReqVO { + + @Schema(description = "桥梁编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "18564") + private Long id; + + @Schema(description = "桥梁名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "赵六") + @NotEmpty(message = "桥梁名称不能为空") + private String name; + + @Schema(description = "桥梁描述", example = "随便") + private String description; + + @Schema(description = "桥梁状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "2") + @NotNull(message = "桥梁状态不能为空") + private Integer status; + + @Schema(description = "桥梁方向", requiredMode = Schema.RequiredMode.REQUIRED) + @NotNull(message = "桥梁方向不能为空") + private Integer direction; + + @Schema(description = "桥梁类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + @NotNull(message = "桥梁类型不能为空") + private Integer type; + + @Schema(description = "桥梁配置") + private String config; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/package-info.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/package-info.java new file mode 100644 index 0000000000..a977d86e93 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/package-info.java @@ -0,0 +1 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataBridgeMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataBridgeMapper.java index 076b341f00..1609aec34a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataBridgeMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataBridgeMapper.java @@ -1,9 +1,29 @@ package cn.iocoder.yudao.module.iot.dal.mysql.rule; +import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX; +import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import org.apache.ibatis.annotations.Mapper; +/** + * IoT 数据桥梁 Mapper + * + * @author HUIHUI + */ @Mapper public interface IotDataBridgeMapper extends BaseMapperX { -} + + default PageResult selectPage(IotDataBridgePageReqVO reqVO) { + return selectPage(reqVO, new LambdaQueryWrapperX() + .likeIfPresent(IotDataBridgeDO::getName, reqVO.getName()) + .likeIfPresent(IotDataBridgeDO::getDescription, reqVO.getDescription()) + .eqIfPresent(IotDataBridgeDO::getStatus, reqVO.getStatus()) + .eqIfPresent(IotDataBridgeDO::getDirection, reqVO.getDirection()) + .eqIfPresent(IotDataBridgeDO::getType, reqVO.getType()) + .betweenIfPresent(IotDataBridgeDO::getCreateTime, reqVO.getCreateTime()) + .orderByDesc(IotDataBridgeDO::getId)); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeService.java index cbf0b36c23..f720a1d15d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeService.java @@ -1,20 +1,54 @@ package cn.iocoder.yudao.module.iot.service.rule; +import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeSaveReqVO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import jakarta.validation.Valid; /** - * IoT 数据桥梁的 Service 接口 + * IoT 数据桥梁 Service 接口 * - * @author 芋道源码 + * @author HUIHUI */ public interface IotDataBridgeService { /** - * 获得指定数据桥梁 + * 创建IoT 数据桥梁 * - * @param id 数据桥梁编号 - * @return 数据桥梁 + * @param createReqVO 创建信息 + * @return 编号 */ - IotDataBridgeDO getIotDataBridge(Long id); + Long createDataBridge(@Valid IotDataBridgeSaveReqVO createReqVO); -} + /** + * 更新IoT 数据桥梁 + * + * @param updateReqVO 更新信息 + */ + void updateDataBridge(@Valid IotDataBridgeSaveReqVO updateReqVO); + + /** + * 删除IoT 数据桥梁 + * + * @param id 编号 + */ + void deleteDataBridge(Long id); + + /** + * 获得IoT 数据桥梁 + * + * @param id 编号 + * @return IoT 数据桥梁 + */ + IotDataBridgeDO getDataBridge(Long id); + + /** + * 获得IoT 数据桥梁分页 + * + * @param pageReqVO 分页查询 + * @return IoT 数据桥梁分页 + */ + PageResult getDataBridgePage(IotDataBridgePageReqVO pageReqVO); + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeServiceImpl.java index 7814fd9bbc..9e439fc996 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeServiceImpl.java @@ -1,45 +1,70 @@ package cn.iocoder.yudao.module.iot.service.rule; -import cn.hutool.core.map.MapUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.framework.common.util.object.BeanUtils; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeSaveReqVO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotDataBridgeMapper; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; -import java.util.Objects; +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_BRIDGE_NOT_EXISTS; /** - * IoT 数据桥梁的 Service 实现类 + * IoT 数据桥梁 Service 实现类 * - * @author 芋道源码 + * @author HUIHUI */ @Service @Validated -@Slf4j public class IotDataBridgeServiceImpl implements IotDataBridgeService { @Resource private IotDataBridgeMapper dataBridgeMapper; - // TODO @芋艿:临时测试 @Override - public IotDataBridgeDO getIotDataBridge(Long id) { - if (Objects.equals(id, 1L)) { - IotDataBridgeDO.HttpConfig config = new IotDataBridgeDO.HttpConfig() - .setUrl("http://127.0.0.1:48080/test") -// .setMethod("POST") - .setMethod("GET") - .setQuery(MapUtil.of("aaa", "bbb")) - .setHeaders(MapUtil.of("ccc", "ddd")) - .setBody(JsonUtils.toJsonString(MapUtil.of("eee", "fff"))); - return IotDataBridgeDO.builder().id(1L).name("芋道").description("芋道源码").status(0).direction(1) - .type(IotDataBridgTypeEnum.HTTP.getType()).config(config).build(); + public Long createDataBridge(IotDataBridgeSaveReqVO createReqVO) { + // 插入 + IotDataBridgeDO dataBridge = BeanUtils.toBean(createReqVO, IotDataBridgeDO.class); + dataBridgeMapper.insert(dataBridge); + // 返回 + return dataBridge.getId(); + } + + @Override + public void updateDataBridge(IotDataBridgeSaveReqVO updateReqVO) { + // 校验存在 + validateDataBridgeExists(updateReqVO.getId()); + // 更新 + IotDataBridgeDO updateObj = BeanUtils.toBean(updateReqVO, IotDataBridgeDO.class); + dataBridgeMapper.updateById(updateObj); + } + + @Override + public void deleteDataBridge(Long id) { + // 校验存在 + validateDataBridgeExists(id); + // 删除 + dataBridgeMapper.deleteById(id); + } + + private void validateDataBridgeExists(Long id) { + if (dataBridgeMapper.selectById(id) == null) { + throw exception(DATA_BRIDGE_NOT_EXISTS); } + } + + @Override + public IotDataBridgeDO getDataBridge(Long id) { return dataBridgeMapper.selectById(id); } -} + @Override + public PageResult getDataBridgePage(IotDataBridgePageReqVO pageReqVO) { + return dataBridgeMapper.selectPage(pageReqVO); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index d94922f5d0..b38e181f93 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -36,7 +36,7 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { } // 1.2 获得数据桥梁 Assert.notNull(config.getDataBridgeId(), "数据桥梁编号不能为空"); - IotDataBridgeDO dataBridge = dataBridgeService.getIotDataBridge(config.getDataBridgeId()); + IotDataBridgeDO dataBridge = dataBridgeService.getDataBridge(config.getDataBridgeId()); if (dataBridge == null || dataBridge.getConfig() == null) { log.error("[execute][message({}) config({}) 对应的数据桥梁不存在]", message, config); return; From b1d3b73b6d73d99808a24797831a5cab58213844 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sun, 9 Mar 2025 13:29:12 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=91IoT:=20=E6=95=B0=E6=8D=AE=E6=A1=A5=E6=A2=81?= =?UTF-8?q?=20config=20=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/iot/enums/DictTypeConstants.java | 3 + ...m.java => IotDataBridgeDirectionEnum.java} | 4 +- ...peEnum.java => IotDataBridgeTypeEnum.java} | 25 +- .../vo/databridge/IotDataBridgeRespVO.java | 13 +- .../vo/databridge/IotDataBridgeSaveReqVO.java | 4 +- .../config/IotDataBridgeConfig.java | 32 +++ .../config/IotDataBridgeHttpConfig.java | 36 +++ .../config/IotDataBridgeKafkaMQConfig.java | 35 +++ .../config/IotDataBridgeMqttConfig.java | 34 +++ .../config/IotDataBridgeRabbitMQConfig.java | 46 ++++ .../IotDataBridgeRedisStreamMQConfig.java | 34 +++ .../config/IotDataBridgeRocketMQConfig.java | 39 ++++ .../dal/dataobject/rule/IotDataBridgeDO.java | 217 +----------------- .../databridge/IotHttpDataBridgeExecute.java | 10 +- .../IotKafkaMQDataBridgeExecute.java | 13 +- .../IotRabbitMQDataBridgeExecute.java | 13 +- .../IotRedisStreamMQDataBridgeExecute.java | 13 +- .../IotRocketMQDataBridgeExecute.java | 13 +- 18 files changed, 325 insertions(+), 259 deletions(-) rename yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/{IotDataBridgDirectionEnum.java => IotDataBridgeDirectionEnum.java} (78%) rename yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/{IotDataBridgTypeEnum.java => IotDataBridgeTypeEnum.java} (53%) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeHttpConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeKafkaMQConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeMqttConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRabbitMQConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRedisStreamMQConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRocketMQConfig.java diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java index 04df143bed..fc442d2b35 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java @@ -15,5 +15,8 @@ public class DictTypeConstants { public static final String VALIDATE_TYPE = "iot_validate_type"; public static final String DEVICE_STATE = "iot_device_state"; + public static final String IOT_DATA_BRIDGE_DIRECTION_ENUM = "iot_data_bridge_direction_enum"; + public static final String IOT_DATA_BRIDGE_TYPE_ENUM = "iot_data_bridge_type_enum"; + } diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgDirectionEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeDirectionEnum.java similarity index 78% rename from yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgDirectionEnum.java rename to yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeDirectionEnum.java index ff4993d0ea..eb4b999163 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgDirectionEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeDirectionEnum.java @@ -13,14 +13,14 @@ import java.util.Arrays; */ @RequiredArgsConstructor @Getter -public enum IotDataBridgDirectionEnum implements ArrayValuable { +public enum IotDataBridgeDirectionEnum implements ArrayValuable { INPUT(1), // 输入 OUTPUT(2); // 输出 private final Integer type; - public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgDirectionEnum::getType).toArray(Integer[]::new); + public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgeDirectionEnum::getType).toArray(Integer[]::new); @Override public Integer[] array() { diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeTypeEnum.java similarity index 53% rename from yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java rename to yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeTypeEnum.java index 295f35cffd..25c7e8c1fe 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeTypeEnum.java @@ -13,25 +13,26 @@ import java.util.Arrays; */ @RequiredArgsConstructor @Getter -public enum IotDataBridgTypeEnum implements ArrayValuable { +public enum IotDataBridgeTypeEnum implements ArrayValuable { - HTTP(1), - TCP(2), - WEBSOCKET(3), + HTTP(1, "HTTP"), + TCP(2, "TCP"), + WEBSOCKET(3, "WEBSOCKET"), - MQTT(10), + MQTT(10, "MQTT"), - DATABASE(20), - REDIS_STREAM(21), + DATABASE(20, "DATABASE"), + REDIS_STREAM(21, "REDIS_STREAM"), - ROCKETMQ(30), - RABBITMQ(31), - KAFKA(32) - ; + ROCKETMQ(30, "ROCKETMQ"), + RABBITMQ(31, "RABBITMQ"), + KAFKA(32, "KAFKA"); private final Integer type; - public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgTypeEnum::getType).toArray(Integer[]::new); + private final String name; + + public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgeTypeEnum::getType).toArray(Integer[]::new); @Override public Integer[] array() { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java index 85247358c4..3e50dc4d5b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge; import cn.iocoder.yudao.framework.excel.core.annotations.DictFormat; import cn.iocoder.yudao.framework.excel.core.convert.DictConvert; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig; import com.alibaba.excel.annotation.ExcelIgnoreUnannotated; import com.alibaba.excel.annotation.ExcelProperty; import io.swagger.v3.oas.annotations.media.Schema; @@ -9,6 +10,10 @@ import lombok.Data; import java.time.LocalDateTime; +import static cn.iocoder.yudao.module.iot.enums.DictTypeConstants.IOT_DATA_BRIDGE_DIRECTION_ENUM; +import static cn.iocoder.yudao.module.iot.enums.DictTypeConstants.IOT_DATA_BRIDGE_TYPE_ENUM; +import static cn.iocoder.yudao.module.system.enums.DictTypeConstants.COMMON_STATUS; + @Schema(description = "管理后台 - IoT 数据桥梁 Response VO") @Data @ExcelIgnoreUnannotated @@ -28,22 +33,22 @@ public class IotDataBridgeRespVO { @Schema(description = "桥梁状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "2") @ExcelProperty(value = "桥梁状态", converter = DictConvert.class) - @DictFormat("common_status") // TODO 代码优化:建议设置到对应的 DictTypeConstants 枚举类中 + @DictFormat(COMMON_STATUS) private Integer status; @Schema(description = "桥梁方向", requiredMode = Schema.RequiredMode.REQUIRED) @ExcelProperty(value = "桥梁方向", converter = DictConvert.class) - @DictFormat("iot_data_bridg_direction_enum") // TODO 代码优化:建议设置到对应的 DictTypeConstants 枚举类中 + @DictFormat(IOT_DATA_BRIDGE_DIRECTION_ENUM) private Integer direction; @Schema(description = "桥梁类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") @ExcelProperty(value = "桥梁类型", converter = DictConvert.class) - @DictFormat("iot_data_bridg_type_enum") // TODO 代码优化:建议设置到对应的 DictTypeConstants 枚举类中 + @DictFormat(IOT_DATA_BRIDGE_TYPE_ENUM) private Integer type; @Schema(description = "桥梁配置") @ExcelProperty("桥梁配置") - private String config; + private IotDataBridgeConfig config; @Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED) @ExcelProperty("创建时间") diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java index 2fd8b36093..96620a1ca4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -32,6 +33,7 @@ public class IotDataBridgeSaveReqVO { private Integer type; @Schema(description = "桥梁配置") - private String config; + @NotNull(message = "桥梁配置不能为空") + private IotDataBridgeConfig config; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeConfig.java new file mode 100644 index 0000000000..f37a51edb3 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeConfig.java @@ -0,0 +1,32 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Data; + +/** + * 抽象类 IotDataBridgeConfig + * + * 用于表示数据桥梁配置数据的通用类型,根据具体的 "type" 字段动态映射到对应的子类。 + * 提供多态支持,适用于不同类型的数据结构序列化和反序列化场景。 + * + * @author HUIHUI + */ +@Data +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true) +@JsonSubTypes({ + @JsonSubTypes.Type(value = IotDataBridgeHttpConfig.class, name = "HTTP"), + @JsonSubTypes.Type(value = IotDataBridgeKafkaMQConfig.class, name = "KAFKA"), + @JsonSubTypes.Type(value = IotDataBridgeMqttConfig.class, name = "MQTT"), + @JsonSubTypes.Type(value = IotDataBridgeRabbitMQConfig.class, name = "RABBITMQ"), + @JsonSubTypes.Type(value = IotDataBridgeRedisStreamMQConfig.class, name = "REDIS_STREAM"), + @JsonSubTypes.Type(value = IotDataBridgeRocketMQConfig.class, name = "ROCKETMQ"), +}) +public abstract class IotDataBridgeConfig { + + /** + * 配置类型 + */ + private String type; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeHttpConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeHttpConfig.java new file mode 100644 index 0000000000..69cdc71f7f --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeHttpConfig.java @@ -0,0 +1,36 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +import java.util.Map; + +/** + * HTTP 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeHttpConfig extends IotDataBridgeConfig { + + /** + * 请求 URL + */ + private String url; + /** + * 请求方法 + */ + private String method; + /** + * 请求头 + */ + private Map headers; + /** + * 请求参数 + */ + private Map query; + /** + * 请求体 + */ + private String body; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeKafkaMQConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeKafkaMQConfig.java new file mode 100644 index 0000000000..3acd646f33 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeKafkaMQConfig.java @@ -0,0 +1,35 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * Kafka 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeKafkaMQConfig extends IotDataBridgeConfig { + + /** + * Kafka 服务器地址 + */ + private String bootstrapServers; + /** + * 用户名 + */ + private String username; + /** + * 密码 + */ + private String password; + /** + * 是否启用 SSL + */ + private Boolean ssl; + + /** + * 主题 + */ + private String topic; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeMqttConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeMqttConfig.java new file mode 100644 index 0000000000..0bf7067bc1 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeMqttConfig.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * MQTT 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeMqttConfig extends IotDataBridgeConfig { + + /** + * MQTT 服务器地址 + */ + private String url; + /** + * 用户名 + */ + private String username; + /** + * 密码 + */ + private String password; + /** + * 客户端编号 + */ + private String clientId; + /** + * 主题 + */ + private String topic; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRabbitMQConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRabbitMQConfig.java new file mode 100644 index 0000000000..29bf328979 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRabbitMQConfig.java @@ -0,0 +1,46 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * RabbitMQ 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeRabbitMQConfig extends IotDataBridgeConfig { + + /** + * RabbitMQ 服务器地址 + */ + private String host; + /** + * 端口 + */ + private Integer port; + /** + * 虚拟主机 + */ + private String virtualHost; + /** + * 用户名 + */ + private String username; + /** + * 密码 + */ + private String password; + + /** + * 交换机名称 + */ + private String exchange; + /** + * 路由键 + */ + private String routingKey; + /** + * 队列名称 + */ + private String queue; +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRedisStreamMQConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRedisStreamMQConfig.java new file mode 100644 index 0000000000..db7b2b2bcb --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRedisStreamMQConfig.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * Redis Stream MQ 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeRedisStreamMQConfig extends IotDataBridgeConfig { + + /** + * Redis 服务器地址 + */ + private String host; + /** + * 端口 + */ + private Integer port; + /** + * 密码 + */ + private String password; + /** + * 数据库索引 + */ + private Integer database; + + /** + * 主题 + */ + private String topic; +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRocketMQConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRocketMQConfig.java new file mode 100644 index 0000000000..e911461e4d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRocketMQConfig.java @@ -0,0 +1,39 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * RocketMQ 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeRocketMQConfig extends IotDataBridgeConfig { + + /** + * RocketMQ 名称服务器地址 + */ + private String nameServer; + /** + * 访问密钥 + */ + private String accessKey; + /** + * 秘密钥匙 + */ + private String secretKey; + + /** + * 生产者组 + */ + private String group; + /** + * 主题 + */ + private String topic; + /** + * 标签 + */ + private String tags; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java index bb0623a1ff..ef0a60ecab 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java @@ -1,16 +1,16 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule; import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeDirectionEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import com.baomidou.mybatisplus.annotation.KeySequence; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler; -import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.*; -import java.util.Map; - /** * IoT 数据桥梁 DO * @@ -48,14 +48,14 @@ public class IotDataBridgeDO extends BaseDO { /** * 桥梁方向 * - * 枚举 {@link cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgDirectionEnum} + * 枚举 {@link IotDataBridgeDirectionEnum} */ private Integer direction; /** * 桥梁类型 * - * 枚举 {@link cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum} + * 枚举 {@link IotDataBridgeTypeEnum} */ private Integer type; @@ -63,211 +63,6 @@ public class IotDataBridgeDO extends BaseDO { * 桥梁配置 */ @TableField(typeHandler = JacksonTypeHandler.class) - private Config config; - - /** - * 文件客户端的配置 - * 不同实现的客户端,需要不同的配置,通过子类来定义 - * - * @author 芋道源码 - */ - @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) - // @JsonTypeInfo 注解的作用,Jackson 多态 - // 1. 序列化到时数据库时,增加 @class 属性。 - // 2. 反序列化到内存对象时,通过 @class 属性,可以创建出正确的类型 - public interface Config { - } - - /** - * HTTP 配置 - */ - @Data - public static class HttpConfig implements Config { - - /** - * 请求 URL - */ - private String url; - /** - * 请求方法 - */ - private String method; - /** - * 请求头 - */ - private Map headers; - /** - * 请求参数 - */ - private Map query; - /** - * 请求体 - */ - private String body; - - } - - /** - * MQTT 配置 - */ - @Data - public static class MqttConfig implements Config { - - /** - * MQTT 服务器地址 - */ - private String url; - /** - * 用户名 - */ - private String username; - /** - * 密码 - */ - private String password; - /** - * 客户端编号 - */ - private String clientId; - /** - * 主题 - */ - private String topic; - - } - - /** - * RocketMQ 配置 - */ - @Data - public static class RocketMQConfig implements Config { - - /** - * RocketMQ 名称服务器地址 - */ - private String nameServer; - /** - * 访问密钥 - */ - private String accessKey; - /** - * 秘密钥匙 - */ - private String secretKey; - - /** - * 生产者组 - */ - private String group; - /** - * 主题 - */ - private String topic; - /** - * 标签 - */ - private String tags; - - } - - /** - * Kafka 配置 - */ - @Data - public static class KafkaMQConfig implements Config { - - /** - * Kafka 服务器地址 - */ - private String bootstrapServers; - /** - * 用户名 - */ - private String username; - /** - * 密码 - */ - private String password; - /** - * 是否启用 SSL - */ - private Boolean ssl; - - /** - * 主题 - */ - private String topic; - - } - - /** - * RabbitMQ 配置 - */ - @Data - public static class RabbitMQConfig implements Config { - - /** - * RabbitMQ 服务器地址 - */ - private String host; - /** - * 端口 - */ - private Integer port; - /** - * 虚拟主机 - */ - private String virtualHost; - /** - * 用户名 - */ - private String username; - /** - * 密码 - */ - private String password; - - /** - * 交换机名称 - */ - private String exchange; - /** - * 路由键 - */ - private String routingKey; - /** - * 队列名称 - */ - private String queue; - } - - /** - * Redis Stream MQ 配置 - */ - @Data - public static class RedisStreamMQConfig implements Config { - - /** - * Redis 服务器地址 - */ - private String host; - /** - * 端口 - */ - private Integer port; - /** - * 密码 - */ - private String password; - /** - * 数据库索引 - */ - private Integer database; - - /** - * 主题 - */ - private String topic; - } + private IotDataBridgeConfig config; } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java index ffe2c5b803..22b72e055e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java @@ -3,8 +3,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.common.util.http.HttpUtils; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeHttpConfig; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -25,19 +25,19 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_ */ @Component @Slf4j -public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { +public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { @Resource private RestTemplate restTemplate; @Override public Integer getType() { - return IotDataBridgTypeEnum.HTTP.getType(); + return IotDataBridgeTypeEnum.HTTP.getType(); } @Override @SuppressWarnings({"unchecked", "deprecation"}) - public void execute0(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) { + public void execute0(IotDeviceMessage message, IotDataBridgeHttpConfig config) { String url = null; HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase()); HttpEntity requestEntity = null; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 6e3ef67a25..3b7f99bf42 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -1,7 +1,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeKafkaMQConfig; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; @@ -26,17 +27,17 @@ import java.util.concurrent.TimeUnit; @Component @Slf4j public class IotKafkaMQDataBridgeExecute extends - AbstractCacheableDataBridgeExecute> { + AbstractCacheableDataBridgeExecute> { private static final Duration SEND_TIMEOUT = Duration.ofMillis(10000); // 10 秒超时时间 @Override public Integer getType() { - return IotDataBridgTypeEnum.KAFKA.getType(); + return IotDataBridgeTypeEnum.KAFKA.getType(); } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) throws Exception { + public void execute0(IotDeviceMessage message, IotDataBridgeKafkaMQConfig config) throws Exception { // 1. 获取或创建 KafkaTemplate KafkaTemplate kafkaTemplate = getProducer(config); @@ -47,7 +48,7 @@ public class IotKafkaMQDataBridgeExecute extends } @Override - protected KafkaTemplate initProducer(IotDataBridgeDO.KafkaMQConfig config) { + protected KafkaTemplate initProducer(IotDataBridgeKafkaMQConfig config) { // 1.1 构建生产者配置 Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); @@ -82,7 +83,7 @@ public class IotKafkaMQDataBridgeExecute extends IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute(); // 2. 创建共享的配置 - IotDataBridgeDO.KafkaMQConfig config = new IotDataBridgeDO.KafkaMQConfig(); + IotDataBridgeKafkaMQConfig config = new IotDataBridgeKafkaMQConfig(); config.setBootstrapServers("127.0.0.1:9092"); config.setTopic("test-topic"); config.setSsl(false); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 5599144dad..54485c091d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -1,7 +1,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRabbitMQConfig; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -22,16 +23,16 @@ import java.time.LocalDateTime; @Component @Slf4j public class IotRabbitMQDataBridgeExecute extends - AbstractCacheableDataBridgeExecute { + AbstractCacheableDataBridgeExecute { @Override public Integer getType() { - return IotDataBridgTypeEnum.RABBITMQ.getType(); + return IotDataBridgeTypeEnum.RABBITMQ.getType(); } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) throws Exception { + public void execute0(IotDeviceMessage message, IotDataBridgeRabbitMQConfig config) throws Exception { // 1. 获取或创建 Channel Channel channel = getProducer(config); @@ -48,7 +49,7 @@ public class IotRabbitMQDataBridgeExecute extends @Override @SuppressWarnings("resource") - protected Channel initProducer(IotDataBridgeDO.RabbitMQConfig config) throws Exception { + protected Channel initProducer(IotDataBridgeRabbitMQConfig config) throws Exception { // 1. 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(config.getHost()); @@ -81,7 +82,7 @@ public class IotRabbitMQDataBridgeExecute extends IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute(); // 2. 创建共享的配置 - IotDataBridgeDO.RabbitMQConfig config = new IotDataBridgeDO.RabbitMQConfig(); + IotDataBridgeRabbitMQConfig config = new IotDataBridgeRabbitMQConfig(); config.setHost("localhost"); config.setPort(5672); config.setVirtualHost("/"); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java index a20334f4fd..5616c7e648 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -2,8 +2,9 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRedisStreamMQConfig; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; @@ -30,15 +31,15 @@ import java.time.LocalDateTime; @Component @Slf4j public class IotRedisStreamMQDataBridgeExecute extends - AbstractCacheableDataBridgeExecute> { + AbstractCacheableDataBridgeExecute> { @Override public Integer getType() { - return IotDataBridgTypeEnum.REDIS_STREAM.getType(); + return IotDataBridgeTypeEnum.REDIS_STREAM.getType(); } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) throws Exception { + public void execute0(IotDeviceMessage message, IotDataBridgeRedisStreamMQConfig config) throws Exception { // 1. 获取 RedisTemplate RedisTemplate redisTemplate = getProducer(config); @@ -50,7 +51,7 @@ public class IotRedisStreamMQDataBridgeExecute extends } @Override - protected RedisTemplate initProducer(IotDataBridgeDO.RedisStreamMQConfig config) { + protected RedisTemplate initProducer(IotDataBridgeRedisStreamMQConfig config) { // 1.1 创建 Redisson 配置 Config redissonConfig = new Config(); SingleServerConfig serverConfig = redissonConfig.useSingleServer() @@ -101,7 +102,7 @@ public class IotRedisStreamMQDataBridgeExecute extends IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute(); // 2. 创建共享的配置 - IotDataBridgeDO.RedisStreamMQConfig config = new IotDataBridgeDO.RedisStreamMQConfig(); + IotDataBridgeRedisStreamMQConfig config = new IotDataBridgeRedisStreamMQConfig(); config.setHost("127.0.0.1"); config.setPort(6379); config.setDatabase(0); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 542d190ed5..541bd181e0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -1,7 +1,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRocketMQConfig; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -23,15 +24,15 @@ import java.time.LocalDateTime; @Component @Slf4j public class IotRocketMQDataBridgeExecute extends - AbstractCacheableDataBridgeExecute { + AbstractCacheableDataBridgeExecute { @Override public Integer getType() { - return IotDataBridgTypeEnum.ROCKETMQ.getType(); + return IotDataBridgeTypeEnum.ROCKETMQ.getType(); } @Override - public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) throws Exception { + public void execute0(IotDeviceMessage message, IotDataBridgeRocketMQConfig config) throws Exception { // 1. 获取或创建 Producer DefaultMQProducer producer = getProducer(config); @@ -52,7 +53,7 @@ public class IotRocketMQDataBridgeExecute extends } @Override - protected DefaultMQProducer initProducer(IotDataBridgeDO.RocketMQConfig config) throws Exception { + protected DefaultMQProducer initProducer(IotDataBridgeRocketMQConfig config) throws Exception { DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); producer.setNamesrvAddr(config.getNameServer()); producer.start(); @@ -70,7 +71,7 @@ public class IotRocketMQDataBridgeExecute extends IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute(); // 2. 创建共享的配置 - IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig(); + IotDataBridgeRocketMQConfig config = new IotDataBridgeRocketMQConfig(); config.setNameServer("127.0.0.1:9876"); config.setGroup("test-group"); config.setTopic("test-topic"); From cdf316e7788a232d6d37db2b43aa7d87d360b5bf Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sun, 9 Mar 2025 18:04:46 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=91IoT:=20=E6=95=B0=E6=8D=AE=E6=A1=A5=E6=A2=81?= =?UTF-8?q?=20config=20=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java | 1 - .../yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java index fc442d2b35..1fc47c0c00 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java @@ -18,5 +18,4 @@ public class DictTypeConstants { public static final String IOT_DATA_BRIDGE_DIRECTION_ENUM = "iot_data_bridge_direction_enum"; public static final String IOT_DATA_BRIDGE_TYPE_ENUM = "iot_data_bridge_type_enum"; - } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java index ef0a60ecab..05493b916f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java @@ -16,7 +16,7 @@ import lombok.*; * * @author 芋道源码 */ -@TableName("iot_data_bridge") +@TableName(value = "iot_data_bridge", autoResultMap = true) @KeySequence("iot_data_bridge_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。 @Data @EqualsAndHashCode(callSuper = true)