From 1b15fb384547411fe7d0d7c180e8553ff75d2a02 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Mon, 24 Feb 2025 10:46:38 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=91IoT:=20=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=A1=A5=E6=8E=A5=E7=BC=93=E5=AD=98=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AbstractCacheableDataBridgeExecute.java | 70 +++++++++++++++++++ .../IotRocketMQDataBridgeExecute.java | 59 +++++----------- 2 files changed, 88 insertions(+), 41 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java new file mode 100644 index 0000000000..264bce553e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -0,0 +1,70 @@ +package cn.iocoder.yudao.module.iot.service.rule.action.databridge; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; + +/** + * 带缓存功能的数据桥接执行器抽象类 + * + * @author HUIHUI + */ +@Slf4j +public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { + + /** + * Producer 缓存 + */ + private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofMinutes(30)) + .removalListener(notification -> { + Object producer = notification.getValue(); + if (producer == null) { + return; + } + try { + closeProducer(producer); + log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey()); + } catch (Exception e) { + log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e); + } + }) + .build(new CacheLoader() { + @Override + public Object load(Object config) throws Exception { + Object producer = initProducer(config); + log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); + return producer; + } + }); + + /** + * 获取生产者 + * + * @param config 配置信息 + * @return 生产者对象 + */ + protected Object getProducer(Object config) throws Exception { + return PRODUCER_CACHE.get(config); + } + + /** + * 初始化生产者 + * + * @param config 配置信息 + * @return 生产者对象 + * @throws Exception 如果初始化失败 + */ + protected abstract Object initProducer(Object config) throws Exception; + + /** + * 关闭生产者 + * + * @param producer 生产者对象 + */ + protected abstract void closeProducer(Object producer); + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 626208e187..af701cd903 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -3,9 +3,6 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; @@ -14,9 +11,7 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.stereotype.Component; -import java.time.Duration; import java.time.LocalDateTime; -import java.util.concurrent.Executors; /** * RocketMQ 的 {@link IotDataBridgeExecute} 实现类 @@ -25,41 +20,7 @@ import java.util.concurrent.Executors; */ @Component @Slf4j -public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { - - /** - * 针对 {@link IotDataBridgeDO.RocketMQConfig} 的 DefaultMQProducer 缓存 - */ - // TODO @puhui999:因为 kafka 之类也存在这个情况,是不是得搞个抽象类。提供一个 initProducer,和 closeProducer 方法 - private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() - .refreshAfterWrite(Duration.ofMinutes(10)) // TODO puhui999:应该是 read 30 分钟哈 - // 增加移除监听器,自动关闭 producer - .removalListener(notification -> { - DefaultMQProducer producer = (DefaultMQProducer) notification.getValue(); - // TODO puhui999:if return,更简短哈 - if (producer != null) { - try { - producer.shutdown(); - log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey()); - } catch (Exception e) { - log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e); - } - } - }) - // TODO @puhui999:就同步哈,不用异步处理。 - // 通过 asyncReloading 实现全异步加载,包括 refreshAfterWrite 被阻塞的加载线程 - .build(CacheLoader.asyncReloading(new CacheLoader() { - - @Override - public DefaultMQProducer load(IotDataBridgeDO.RocketMQConfig config) throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); - producer.setNamesrvAddr(config.getNameServer()); - producer.start(); - log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); - return producer; - } - - }, Executors.newCachedThreadPool())); +public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { @@ -74,7 +35,7 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { try { // 1. 获取或创建 Producer - DefaultMQProducer producer = PRODUCER_CACHE.get(config); + DefaultMQProducer producer = (DefaultMQProducer) getProducer(config); // 2.1 创建消息对象,指定Topic、Tag和消息体 Message msg = new Message( @@ -95,6 +56,22 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { } } + @Override + protected Object initProducer(Object config) throws Exception { + IotDataBridgeDO.RocketMQConfig rocketMQConfig = (IotDataBridgeDO.RocketMQConfig) config; + DefaultMQProducer producer = new DefaultMQProducer(rocketMQConfig.getGroup()); + producer.setNamesrvAddr(rocketMQConfig.getNameServer()); + producer.start(); + return producer; + } + + @Override + protected void closeProducer(Object producer) { + if (producer instanceof DefaultMQProducer) { + ((DefaultMQProducer) producer).shutdown(); + } + } + // TODO @芋艿:测试代码,后续清理 public static void main(String[] args) { // 1. 创建一个共享的实例 From 69a27b1ee2024510bd4b8c3bb28a014927cdbbcc Mon Sep 17 00:00:00 2001 From: puhui999 Date: Fri, 28 Feb 2025 14:46:02 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E3=80=91IoT:=20=E6=96=B0=E5=A2=9E=20Kafka=20=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=A1=A5=E6=A2=81=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-dependencies/pom.xml | 8 +- yudao-module-iot/yudao-module-iot-biz/pom.xml | 5 + .../dal/dataobject/rule/IotDataBridgeDO.java | 34 +++++ .../AbstractCacheableDataBridgeExecute.java | 2 +- .../databridge/IotDataBridgeExecute.java | 2 +- .../databridge/IotHttpDataBridgeExecute.java | 2 +- .../IotKafkaMQDataBridgeExecute.java | 124 ++++++++++++++++++ .../IotRocketMQDataBridgeExecute.java | 2 +- 8 files changed, 173 insertions(+), 6 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index 23859c7834..7bdba64627 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -35,6 +35,7 @@ 3.3.3 2.3.2 + 3.3.3 2.2.7 @@ -285,13 +286,16 @@ yudao-spring-boot-starter-mq ${revision} - org.apache.rocketmq rocketmq-spring-boot-starter ${rocketmq-spring.version} - + + org.springframework.kafka + spring-kafka + ${kafka-spring.version} + cn.iocoder.boot diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml index ea1dde86fe..1c07e49408 100644 --- a/yudao-module-iot/yudao-module-iot-biz/pom.xml +++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml @@ -81,6 +81,11 @@ rocketmq-spring-boot-starter true + + org.springframework.kafka + spring-kafka + true + org.pf4j diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java index 213b0cda10..220edef718 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java @@ -170,4 +170,38 @@ public class IotDataBridgeDO extends BaseDO { } + /** + * Kafka 配置 + */ + @Data + public static class KafkaMQConfig implements Config { + + /** + * Kafka 服务器地址 + */ + private String bootstrapServers; + /** + * 用户名 + */ + private String username; + /** + * 密码 + */ + private String password; + /** + * 是否启用 SSL + */ + private Boolean ssl; + + /** + * 生产者组 ID + */ + private String groupId; + /** + * 主题 + */ + private String topic; + + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index 264bce553e..e8fbb0ccb3 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -8,7 +8,7 @@ import lombok.extern.slf4j.Slf4j; import java.time.Duration; /** - * 带缓存功能的数据桥接执行器抽象类 + * 带缓存功能的数据桥梁执行器抽象类 * * @author HUIHUI */ diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index cd00f4f3e7..1617c3b091 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -12,7 +12,7 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; public interface IotDataBridgeExecute { /** - * 执行数据桥接操作 + * 执行数据桥梁操作 * * @param message 设备消息 * @param dataBridge 数据桥梁 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java index 76f1b793f6..27b8bc6bba 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java @@ -32,7 +32,7 @@ public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥接的类型 == HTTP + // 1.1 校验数据桥梁的类型 == HTTP if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) { return; } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java new file mode 100644 index 0000000000..8c0ef2b038 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -0,0 +1,124 @@ +package cn.iocoder.yudao.module.iot.service.rule.action.databridge; + +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Kafka 的 {@link IotDataBridgeExecute} 实现类 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { + + @Override + public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥梁的类型 == KAFKA + if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) { + return; + } + // 1.2 执行 Kafka 发送消息 + executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig()); + } + + private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { + try { + // 1. 获取或创建 KafkaTemplate + KafkaTemplate kafkaTemplate = (KafkaTemplate) getProducer(config); + + // 2. 发送消息并等待结果 + kafkaTemplate.send(config.getTopic(), message.toString()) + .get(10, TimeUnit.SECONDS); // 添加超时等待 + log.info("[executeKafka][message({}) 发送成功]", message); + } catch (TimeoutException e) { + log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e); + } catch (Exception e) { + log.error("[executeKafka][message({}) config({}) 发送异常]", message, config, e); + } + } + + @Override + protected Object initProducer(Object config) { + IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config; + + // 1.1 构建生产者配置 + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + // 1.2 如果配置了认证信息 + if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) { + props.put("security.protocol", "SASL_PLAINTEXT"); + props.put("sasl.mechanism", "PLAIN"); + props.put("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + + kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";"); + } + + // 1.3 如果启用 SSL + if (Boolean.TRUE.equals(kafkaConfig.getSsl())) { + props.put("security.protocol", "SSL"); + } + + // 2. 创建 KafkaTemplate + DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(props); + return new KafkaTemplate<>(producerFactory); + } + + @Override + protected void closeProducer(Object producer) { + if (producer instanceof KafkaTemplate) { + ((KafkaTemplate) producer).destroy(); + } + } + + // TODO @芋艿:测试代码,后续清理 + public static void main(String[] args) { + // 1. 创建一个共享的实例 + IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute(); + + // 2. 创建共享的配置 + IotDataBridgeDO.KafkaMQConfig config = new IotDataBridgeDO.KafkaMQConfig(); + config.setBootstrapServers("127.0.0.1:9092"); + config.setTopic("test-topic"); + config.setSsl(false); + config.setUsername(null); + config.setPassword(null); + + // 3. 创建共享的消息 + IotDeviceMessage message = IotDeviceMessage.builder() + .requestId("TEST-001") + .productKey("testProduct") + .deviceName("testDevice") + .deviceKey("testDeviceKey") + .type("property") + .identifier("temperature") + .data("{\"value\": 60}") + .reportTime(LocalDateTime.now()) + .tenantId(1L) + .build(); + + // 4. 执行两次测试,验证缓存 + log.info("[main][第一次执行,应该会创建新的 producer]"); + action.executeKafka(message, config); + + log.info("[main][第二次执行,应该会复用缓存的 producer]"); + action.executeKafka(message, config); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index af701cd903..3b53252539 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -24,7 +24,7 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥接的类型 == ROCKETMQ + // 1.1 校验数据桥梁的类型 == ROCKETMQ if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) { return; } From 7b449b81e7be2671446a94a74c9c539f6c9be2a8 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Fri, 28 Feb 2025 18:03:34 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E3=80=91IoT:=20=E6=96=B0=E5=A2=9E=20RabbitMQ=20?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=A1=A5=E6=A2=81=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-dependencies/pom.xml | 7 + yudao-module-iot/yudao-module-iot-biz/pom.xml | 5 + .../dal/dataobject/rule/IotDataBridgeDO.java | 41 ++++++ .../IotRabbitMQDataBridgeExecute.java | 126 ++++++++++++++++++ 4 files changed, 179 insertions(+) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index 7bdba64627..fa54eef606 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -36,6 +36,7 @@ 2.3.2 3.3.3 + 3.4.3 2.2.7 @@ -296,6 +297,12 @@ spring-kafka ${kafka-spring.version} + + org.springframework.boot + spring-boot-starter-amqp + ${rabbitmq-spring.version} + + cn.iocoder.boot diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml index 1c07e49408..1b897c5d7f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/pom.xml +++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml @@ -86,6 +86,11 @@ spring-kafka true + + org.springframework.boot + spring-boot-starter-amqp + true + org.pf4j diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java index 220edef718..e95cb695ee 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java @@ -204,4 +204,45 @@ public class IotDataBridgeDO extends BaseDO { } + /** + * RabbitMQ 配置 + */ + @Data + public static class RabbitMQConfig implements Config { + + /** + * RabbitMQ 服务器地址 + */ + private String host; + /** + * 端口 + */ + private Integer port; + /** + * 虚拟主机 + */ + private String virtualHost; + /** + * 用户名 + */ + private String username; + /** + * 密码 + */ + private String password; + + /** + * 交换机名称 + */ + private String exchange; + /** + * 路由键 + */ + private String routingKey; + /** + * 队列名称 + */ + private String queue; + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java new file mode 100644 index 0000000000..2c257051f6 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -0,0 +1,126 @@ +package cn.iocoder.yudao.module.iot.service.rule.action.databridge; + +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; + +/** + * RabbitMQ 的 {@link IotDataBridgeExecute} 实现类 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { + + @Override + public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥梁的类型 == RABBITMQ + if (!IotDataBridgTypeEnum.RABBITMQ.getType().equals(dataBridge.getType())) { + return; + } + // 1.2 执行 RabbitMQ 发送消息 + executeRabbitMQ(message, (IotDataBridgeDO.RabbitMQConfig) dataBridge.getConfig()); + } + + private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { + try { + // 1. 获取或创建 Channel + Channel channel = (Channel) getProducer(config); + + // 2.1 声明交换机、队列和绑定关系 + channel.exchangeDeclare(config.getExchange(), "direct", true); + channel.queueDeclare(config.getQueue(), true, false, false, null); + channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey()); + + // 2.2 发送消息 + channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, + message.toString().getBytes(StandardCharsets.UTF_8)); + log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config); + } catch (Exception e) { + log.error("[executeRabbitMQ][message({}) config({}) 发送异常]", message, config, e); + } + } + + @Override + protected Object initProducer(Object config) throws Exception { + IotDataBridgeDO.RabbitMQConfig rabbitConfig = (IotDataBridgeDO.RabbitMQConfig) config; + + // 1. 创建连接工厂 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(rabbitConfig.getHost()); + factory.setPort(rabbitConfig.getPort()); + factory.setVirtualHost(rabbitConfig.getVirtualHost()); + factory.setUsername(rabbitConfig.getUsername()); + factory.setPassword(rabbitConfig.getPassword()); + + // 2. 创建连接 + Connection connection = factory.newConnection(); + + // 3. 创建信道 + return connection.createChannel(); + } + + @Override + protected void closeProducer(Object producer) { + if (producer instanceof Channel) { + try { + Channel channel = (Channel) producer; + if (channel.isOpen()) { + channel.close(); + } + Connection connection = channel.getConnection(); + if (connection.isOpen()) { + connection.close(); + } + } catch (Exception e) { + log.error("[closeProducer][关闭 RabbitMQ 连接异常]", e); + } + } + } + + // TODO @芋道源码:测试代码,后续清理 + public static void main(String[] args) { + // 1. 创建一个共享的实例 + IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute(); + + // 2. 创建共享的配置 + IotDataBridgeDO.RabbitMQConfig config = new IotDataBridgeDO.RabbitMQConfig(); + config.setHost("localhost"); + config.setPort(5672); + config.setVirtualHost("/"); + config.setUsername("admin"); + config.setPassword("123456"); + config.setExchange("test-exchange"); + config.setRoutingKey("test-key"); + config.setQueue("test-queue"); + + // 3. 创建共享的消息 + IotDeviceMessage message = IotDeviceMessage.builder() + .requestId("TEST-001") + .productKey("testProduct") + .deviceName("testDevice") + .deviceKey("testDeviceKey") + .type("property") + .identifier("temperature") + .data("{\"value\": 60}") + .reportTime(LocalDateTime.now()) + .tenantId(1L) + .build(); + + // 4. 执行两次测试,验证缓存 + log.info("[main][第一次执行,应该会创建新的 channel]"); + action.executeRabbitMQ(message, config); + + log.info("[main][第二次执行,应该会复用缓存的 channel]"); + action.executeRabbitMQ(message, config); + } +} From cb16539b663b2e510f8e67aa9f37b78378c68599 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sat, 1 Mar 2025 17:43:46 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E3=80=91IoT:=20=E6=96=B0=E5=A2=9E=20Redis=20Stream=20?= =?UTF-8?q?MQ=20=E6=95=B0=E6=8D=AE=E6=A1=A5=E6=A2=81=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/enums/rule/IotDataBridgTypeEnum.java | 2 +- .../dal/dataobject/rule/IotDataBridgeDO.java | 33 +++- .../AbstractCacheableDataBridgeExecute.java | 10 ++ .../databridge/IotDataBridgeExecute.java | 10 -- .../IotKafkaMQDataBridgeExecute.java | 1 + .../IotRabbitMQDataBridgeExecute.java | 2 +- .../IotRedisStreamMQDataBridgeExecute.java | 147 ++++++++++++++++++ 7 files changed, 189 insertions(+), 16 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java index cdec8d7979..295f35cffd 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java @@ -22,7 +22,7 @@ public enum IotDataBridgTypeEnum implements ArrayValuable { MQTT(10), DATABASE(20), - REDIS(21), + REDIS_STREAM(21), ROCKETMQ(30), RABBITMQ(31), diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java index e95cb695ee..bb0623a1ff 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java @@ -193,10 +193,6 @@ public class IotDataBridgeDO extends BaseDO { */ private Boolean ssl; - /** - * 生产者组 ID - */ - private String groupId; /** * 主题 */ @@ -245,4 +241,33 @@ public class IotDataBridgeDO extends BaseDO { private String queue; } + /** + * Redis Stream MQ 配置 + */ + @Data + public static class RedisStreamMQConfig implements Config { + + /** + * Redis 服务器地址 + */ + private String host; + /** + * 端口 + */ + private Integer port; + /** + * 密码 + */ + private String password; + /** + * 数据库索引 + */ + private Integer database; + + /** + * 主题 + */ + private String topic; + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index e8fbb0ccb3..96b1edd330 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -7,6 +7,16 @@ import lombok.extern.slf4j.Slf4j; import java.time.Duration; +// TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟; +// TODO @芋艿:mq-redis +// TODO @芋艿:mq-数据库 +// TODO @芋艿:kafka +// TODO @芋艿:rocketmq +// TODO @芋艿:rabbitmq +// TODO @芋艿:mqtt +// TODO @芋艿:tcp +// TODO @芋艿:websocket + /** * 带缓存功能的数据桥梁执行器抽象类 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index 1617c3b091..a2593f3d9b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -19,14 +19,4 @@ public interface IotDataBridgeExecute { */ void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge); - // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟; - // TODO @芋艿:mq-redis - // TODO @芋艿:mq-数据库 - // TODO @芋艿:kafka - // TODO @芋艿:rocketmq - // TODO @芋艿:rabbitmq - // TODO @芋艿:mqtt - // TODO @芋艿:tcp - // TODO @芋艿:websocket - } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 8c0ef2b038..ec560122a8 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -35,6 +35,7 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig()); } + @SuppressWarnings("unchecked") private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { try { // 1. 获取或创建 KafkaTemplate diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 2c257051f6..78c1343c2d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -87,7 +87,7 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe } } - // TODO @芋道源码:测试代码,后续清理 + // TODO @芋艿:测试代码,后续清理 public static void main(String[] args) { // 1. 创建一个共享的实例 IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute(); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java new file mode 100644 index 0000000000..4eed88aec6 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -0,0 +1,147 @@ +package cn.iocoder.yudao.module.iot.service.rule.action.databridge; + +import cn.hutool.core.util.ReflectUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import lombok.extern.slf4j.Slf4j; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SingleServerConfig; +import org.redisson.spring.data.connection.RedissonConnectionFactory; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * Redis Stream MQ 的 {@link IotDataBridgeExecute} 实现类 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { + + @Override + public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥梁类型 + if (!IotDataBridgTypeEnum.REDIS_STREAM.getType().equals(dataBridge.getType())) { + return; + } + // 1.2 执行消息发送 + executeRedisStream(message, (IotDataBridgeDO.RedisStreamMQConfig) dataBridge.getConfig()); + } + + @SuppressWarnings("unchecked") + private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { + try { + // 1. 获取 RedisTemplate + RedisTemplate redisTemplate = (RedisTemplate) getProducer(config); + + // 2. 创建并发送 Stream 记录 + ObjectRecord record = StreamRecords.newRecord() + .ofObject(message).withStreamKey(config.getTopic()); + String recordId = String.valueOf(redisTemplate.opsForStream().add(record)); + log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config); + } catch (Exception e) { + log.error("[executeRedisStream][消息发送失败] message: {}, config: {}", message, config, e); + } + } + + @Override + protected Object initProducer(Object config) { + IotDataBridgeDO.RedisStreamMQConfig redisConfig = (IotDataBridgeDO.RedisStreamMQConfig) config; + + // 1.1 创建 Redisson 配置 + Config redissonConfig = new Config(); + SingleServerConfig serverConfig = redissonConfig.useSingleServer() + .setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort()) + .setDatabase(redisConfig.getDatabase()); + // 1.2 设置密码(如果有) + if (StrUtil.isNotBlank(redisConfig.getPassword())) { + serverConfig.setPassword(redisConfig.getPassword()); + } + + // 2.1 创建 RedissonClient + RedissonClient redisson = Redisson.create(redissonConfig); + // 2.2 创建并配置 RedisTemplate + RedisTemplate template = new RedisTemplate<>(); + // 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。 + template.setConnectionFactory(new RedissonConnectionFactory(redisson)); + // 使用 String 序列化方式,序列化 KEY 。 + template.setKeySerializer(RedisSerializer.string()); + template.setHashKeySerializer(RedisSerializer.string()); + // 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。 + template.setValueSerializer(buildRedisSerializer()); + template.setHashValueSerializer(buildRedisSerializer()); + template.afterPropertiesSet();// 初始化 + return template; + } + + @Override + protected void closeProducer(Object producer) { + if (producer instanceof RedisTemplate) { + RedisConnectionFactory factory = ((RedisTemplate) producer).getConnectionFactory(); + try { + if (factory != null) { + ((RedissonConnectionFactory) factory).destroy(); + } + } catch (Exception e) { + log.error("[closeProducer][关闭 redisson 连接异常]", e); + } + } + } + + + public static RedisSerializer buildRedisSerializer() { + RedisSerializer json = RedisSerializer.json(); + // 解决 LocalDateTime 的序列化 + ObjectMapper objectMapper = (ObjectMapper) ReflectUtil.getFieldValue(json, "mapper"); + objectMapper.registerModules(new JavaTimeModule()); + return json; + } + + // TODO @芋艿:测试代码,后续清理 + public static void main(String[] args) { + // 1. 创建一个共享的实例 + IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute(); + + // 2. 创建共享的配置 + IotDataBridgeDO.RedisStreamMQConfig config = new IotDataBridgeDO.RedisStreamMQConfig(); + config.setHost("127.0.0.1"); + config.setPort(6379); + config.setDatabase(0); + config.setPassword("123456"); + config.setTopic("test-stream"); + + // 3. 创建共享的消息 + IotDeviceMessage message = IotDeviceMessage.builder() + .requestId("TEST-001") + .productKey("testProduct") + .deviceName("testDevice") + .deviceKey("testDeviceKey") + .type("property") + .identifier("temperature") + .data("{\"value\": 60}") + .reportTime(LocalDateTime.now()) + .tenantId(1L) + .build(); + + // 4. 执行两次测试,验证缓存 + log.info("[main][第一次执行,应该会创建新的 RedisTemplate]"); + action.executeRedisStream(message, config); + + log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]"); + action.executeRedisStream(message, config); + } + +} \ No newline at end of file