diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index 96b1edd330..ebd0f87764 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -7,12 +7,7 @@ import lombok.extern.slf4j.Slf4j; import java.time.Duration; -// TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟; -// TODO @芋艿:mq-redis -// TODO @芋艿:mq-数据库 -// TODO @芋艿:kafka -// TODO @芋艿:rocketmq -// TODO @芋艿:rabbitmq +// TODO @芋艿:数据库 // TODO @芋艿:mqtt // TODO @芋艿:tcp // TODO @芋艿:websocket @@ -25,6 +20,7 @@ import java.time.Duration; @Slf4j public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { + // TODO @huihui:AbstractCacheableDataBridgeExecute 这样,下面的 Object, Object 就有了类型;另外 IotDataBridgeDO.Config 可以替代一个 Object 哇, /** * Producer 缓存 */ @@ -43,12 +39,14 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg } }) .build(new CacheLoader() { + @Override public Object load(Object config) throws Exception { Object producer = initProducer(config); log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); return producer; } + }); /** @@ -77,4 +75,4 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg */ protected abstract void closeProducer(Object producer); -} \ No newline at end of file +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index a2593f3d9b..a10f75103c 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -11,6 +11,8 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; */ public interface IotDataBridgeExecute { + // TODO @huihui:要不搞个 getType?然后 execute0 由子类实现。这样,子类的 executeRedisStream ,其实就是 execute0 了。 + /** * 执行数据桥梁操作 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index ec560122a8..e95d4ced95 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -10,6 +10,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; +import java.time.Duration; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; @@ -25,13 +26,15 @@ import java.util.concurrent.TimeoutException; @Slf4j public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { + private static final Duration SEND_TIMEOUT = Duration.ofMillis(10); + @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == KAFKA + // 1. 校验数据桥梁的类型 == KAFKA if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) { return; } - // 1.2 执行 Kafka 发送消息 + // 2. 执行 Kafka 发送消息 executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig()); } @@ -43,7 +46,7 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec // 2. 发送消息并等待结果 kafkaTemplate.send(config.getTopic(), message.toString()) - .get(10, TimeUnit.SECONDS); // 添加超时等待 + .get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待 log.info("[executeKafka][message({}) 发送成功]", message); } catch (TimeoutException e) { log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e); @@ -55,13 +58,12 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec @Override protected Object initProducer(Object config) { IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config; - + // 1.1 构建生产者配置 Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - // 1.2 如果配置了认证信息 if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) { props.put("security.protocol", "SASL_PLAINTEXT"); @@ -70,7 +72,6 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";"); } - // 1.3 如果启用 SSL if (Boolean.TRUE.equals(kafkaConfig.getSsl())) { props.put("security.protocol", "SSL"); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 78c1343c2d..5611873155 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -51,6 +51,7 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe } @Override + @SuppressWarnings("resource") protected Object initProducer(Object config) throws Exception { IotDataBridgeDO.RabbitMQConfig rabbitConfig = (IotDataBridgeDO.RabbitMQConfig) config; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java index 4eed88aec6..552fc3425b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -42,6 +42,7 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid } @SuppressWarnings("unchecked") + // TODO @huihui:try catch 交给父类来做,子类不处理异常 private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { try { // 1. 获取 RedisTemplate @@ -71,6 +72,7 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid serverConfig.setPassword(redisConfig.getPassword()); } + // TODO @huihui:看看能不能简化一些。按道理说,不用这么多的哈。 // 2.1 创建 RedissonClient RedissonClient redisson = Redisson.create(redissonConfig); // 2.2 创建并配置 RedisTemplate @@ -89,6 +91,7 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid @Override protected void closeProducer(Object producer) { + // TODO @huihui:try catch 交给父类来做。子类不处理异常 if (producer instanceof RedisTemplate) { RedisConnectionFactory factory = ((RedisTemplate) producer).getConnectionFactory(); try { @@ -101,7 +104,7 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid } } - + // TODO @huihui:看看能不能简化一些。按道理说,不用这么多的哈。 public static RedisSerializer buildRedisSerializer() { RedisSerializer json = RedisSerializer.json(); // 解决 LocalDateTime 的序列化