diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index ebd0f87764..ec0c8bea97 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; import lombok.extern.slf4j.Slf4j; import java.time.Duration; @@ -15,22 +16,36 @@ import java.time.Duration; /** * 带缓存功能的数据桥梁执行器抽象类 * + * 该类提供了一个通用的缓存机制,用于管理各类数据桥接的生产者(Producer)实例。 + * + * 主要特点: + * - 基于Guava Cache实现高效的生产者实例缓存管理 + * - 自动处理生产者的生命周期(创建、获取、关闭) + * - 支持30分钟未访问自动过期清理机制 + * - 异常处理与日志记录,便于问题排查 + * + * 子类需要实现: + * - initProducer(Config) - 初始化特定类型的生产者实例 + * - closeProducer(Producer) - 关闭生产者实例并释放资源 + * + * @param 配置信息类型,用于初始化生产者 + * @param 生产者类型,负责将数据发送到目标系统 * @author HUIHUI */ @Slf4j -public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { +public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { - // TODO @huihui:AbstractCacheableDataBridgeExecute 这样,下面的 Object, Object 就有了类型;另外 IotDataBridgeDO.Config 可以替代一个 Object 哇, /** * Producer 缓存 */ - private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() - .expireAfterAccess(Duration.ofMinutes(30)) - .removalListener(notification -> { - Object producer = notification.getValue(); + private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofMinutes(30)) // 30 分钟未访问就提前过期 + .removalListener((RemovalListener) notification -> { + Producer producer = notification.getValue(); if (producer == null) { return; } + try { closeProducer(producer); log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey()); @@ -38,15 +53,18 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e); } }) - .build(new CacheLoader() { - + .build(new CacheLoader() { @Override - public Object load(Object config) throws Exception { - Object producer = initProducer(config); - log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); - return producer; + public Producer load(Config config) throws Exception { + try { + Producer producer = initProducer(config); + log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); + return producer; + } catch (Exception e) { + log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 创建启动失败]", config, e); + throw e; // 抛出异常,触发缓存加载失败机制 + } } - }); /** @@ -55,7 +73,7 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg * @param config 配置信息 * @return 生产者对象 */ - protected Object getProducer(Object config) throws Exception { + protected Producer getProducer(Config config) throws Exception { return PRODUCER_CACHE.get(config); } @@ -66,13 +84,13 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg * @return 生产者对象 * @throws Exception 如果初始化失败 */ - protected abstract Object initProducer(Object config) throws Exception; + protected abstract Producer initProducer(Config config) throws Exception; /** * 关闭生产者 * * @param producer 生产者对象 */ - protected abstract void closeProducer(Object producer); + protected abstract void closeProducer(Producer producer) throws Exception; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index e95d4ced95..33aa744a85 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -24,7 +24,8 @@ import java.util.concurrent.TimeoutException; */ @Component @Slf4j -public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotKafkaMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute> { private static final Duration SEND_TIMEOUT = Duration.ofMillis(10); @@ -38,11 +39,10 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig()); } - @SuppressWarnings("unchecked") private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { try { // 1. 获取或创建 KafkaTemplate - KafkaTemplate kafkaTemplate = (KafkaTemplate) getProducer(config); + KafkaTemplate kafkaTemplate = getProducer(config); // 2. 发送消息并等待结果 kafkaTemplate.send(config.getTopic(), message.toString()) @@ -56,24 +56,22 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec } @Override - protected Object initProducer(Object config) { - IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config; - + protected KafkaTemplate initProducer(IotDataBridgeDO.KafkaMQConfig config) { // 1.1 构建生产者配置 Map props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 1.2 如果配置了认证信息 - if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) { + if (config.getUsername() != null && config.getPassword() != null) { props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" - + kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";"); + + config.getUsername() + "\" password=\"" + config.getPassword() + "\";"); } // 1.3 如果启用 SSL - if (Boolean.TRUE.equals(kafkaConfig.getSsl())) { + if (Boolean.TRUE.equals(config.getSsl())) { props.put("security.protocol", "SSL"); } @@ -83,10 +81,8 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec } @Override - protected void closeProducer(Object producer) { - if (producer instanceof KafkaTemplate) { - ((KafkaTemplate) producer).destroy(); - } + protected void closeProducer(KafkaTemplate producer) { + producer.destroy(); } // TODO @芋艿:测试代码,后续清理 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 5611873155..0918a5546d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -19,7 +19,8 @@ import java.time.LocalDateTime; */ @Component @Slf4j -public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotRabbitMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute { @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { @@ -34,7 +35,7 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { try { // 1. 获取或创建 Channel - Channel channel = (Channel) getProducer(config); + Channel channel = getProducer(config); // 2.1 声明交换机、队列和绑定关系 channel.exchangeDeclare(config.getExchange(), "direct", true); @@ -52,16 +53,14 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe @Override @SuppressWarnings("resource") - protected Object initProducer(Object config) throws Exception { - IotDataBridgeDO.RabbitMQConfig rabbitConfig = (IotDataBridgeDO.RabbitMQConfig) config; - + protected Channel initProducer(IotDataBridgeDO.RabbitMQConfig config) throws Exception { // 1. 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(rabbitConfig.getHost()); - factory.setPort(rabbitConfig.getPort()); - factory.setVirtualHost(rabbitConfig.getVirtualHost()); - factory.setUsername(rabbitConfig.getUsername()); - factory.setPassword(rabbitConfig.getPassword()); + factory.setHost(config.getHost()); + factory.setPort(config.getPort()); + factory.setVirtualHost(config.getVirtualHost()); + factory.setUsername(config.getUsername()); + factory.setPassword(config.getPassword()); // 2. 创建连接 Connection connection = factory.newConnection(); @@ -71,20 +70,13 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe } @Override - protected void closeProducer(Object producer) { - if (producer instanceof Channel) { - try { - Channel channel = (Channel) producer; - if (channel.isOpen()) { - channel.close(); - } - Connection connection = channel.getConnection(); - if (connection.isOpen()) { - connection.close(); - } - } catch (Exception e) { - log.error("[closeProducer][关闭 RabbitMQ 连接异常]", e); - } + protected void closeProducer(Channel channel) throws Exception { + if (channel.isOpen()) { + channel.close(); + } + Connection connection = channel.getConnection(); + if (connection.isOpen()) { + connection.close(); } } @@ -124,4 +116,5 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe log.info("[main][第二次执行,应该会复用缓存的 channel]"); action.executeRabbitMQ(message, config); } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java index 552fc3425b..77b688606a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -29,7 +29,8 @@ import java.time.LocalDateTime; */ @Component @Slf4j -public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotRedisStreamMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute> { @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { @@ -46,7 +47,7 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { try { // 1. 获取 RedisTemplate - RedisTemplate redisTemplate = (RedisTemplate) getProducer(config); + RedisTemplate redisTemplate = getProducer(config); // 2. 创建并发送 Stream 记录 ObjectRecord record = StreamRecords.newRecord() @@ -59,17 +60,15 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid } @Override - protected Object initProducer(Object config) { - IotDataBridgeDO.RedisStreamMQConfig redisConfig = (IotDataBridgeDO.RedisStreamMQConfig) config; - + protected RedisTemplate initProducer(IotDataBridgeDO.RedisStreamMQConfig config) { // 1.1 创建 Redisson 配置 Config redissonConfig = new Config(); SingleServerConfig serverConfig = redissonConfig.useSingleServer() - .setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort()) - .setDatabase(redisConfig.getDatabase()); + .setAddress("redis://" + config.getHost() + ":" + config.getPort()) + .setDatabase(config.getDatabase()); // 1.2 设置密码(如果有) - if (StrUtil.isNotBlank(redisConfig.getPassword())) { - serverConfig.setPassword(redisConfig.getPassword()); + if (StrUtil.isNotBlank(config.getPassword())) { + serverConfig.setPassword(config.getPassword()); } // TODO @huihui:看看能不能简化一些。按道理说,不用这么多的哈。 @@ -90,17 +89,10 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid } @Override - protected void closeProducer(Object producer) { - // TODO @huihui:try catch 交给父类来做。子类不处理异常 - if (producer instanceof RedisTemplate) { - RedisConnectionFactory factory = ((RedisTemplate) producer).getConnectionFactory(); - try { - if (factory != null) { - ((RedissonConnectionFactory) factory).destroy(); - } - } catch (Exception e) { - log.error("[closeProducer][关闭 redisson 连接异常]", e); - } + protected void closeProducer(RedisTemplate producer) throws Exception { + RedisConnectionFactory factory = producer.getConnectionFactory(); + if (factory != null) { + ((RedissonConnectionFactory) factory).destroy(); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 3b53252539..061bbfc69f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -20,7 +20,8 @@ import java.time.LocalDateTime; */ @Component @Slf4j -public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotRocketMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute { @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { @@ -35,7 +36,7 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { try { // 1. 获取或创建 Producer - DefaultMQProducer producer = (DefaultMQProducer) getProducer(config); + DefaultMQProducer producer = getProducer(config); // 2.1 创建消息对象,指定Topic、Tag和消息体 Message msg = new Message( @@ -57,19 +58,16 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe } @Override - protected Object initProducer(Object config) throws Exception { - IotDataBridgeDO.RocketMQConfig rocketMQConfig = (IotDataBridgeDO.RocketMQConfig) config; - DefaultMQProducer producer = new DefaultMQProducer(rocketMQConfig.getGroup()); - producer.setNamesrvAddr(rocketMQConfig.getNameServer()); + protected DefaultMQProducer initProducer(IotDataBridgeDO.RocketMQConfig config) throws Exception { + DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); + producer.setNamesrvAddr(config.getNameServer()); producer.start(); return producer; } @Override - protected void closeProducer(Object producer) { - if (producer instanceof DefaultMQProducer) { - ((DefaultMQProducer) producer).shutdown(); - } + protected void closeProducer(DefaultMQProducer producer) { + producer.shutdown(); } // TODO @芋艿:测试代码,后续清理