From cb16539b663b2e510f8e67aa9f37b78378c68599 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sat, 1 Mar 2025 17:43:46 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E3=80=91IoT:=20=E6=96=B0=E5=A2=9E=20Redis=20Stream=20MQ=20?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=A1=A5=E6=A2=81=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/enums/rule/IotDataBridgTypeEnum.java | 2 +- .../dal/dataobject/rule/IotDataBridgeDO.java | 33 +++- .../AbstractCacheableDataBridgeExecute.java | 10 ++ .../databridge/IotDataBridgeExecute.java | 10 -- .../IotKafkaMQDataBridgeExecute.java | 1 + .../IotRabbitMQDataBridgeExecute.java | 2 +- .../IotRedisStreamMQDataBridgeExecute.java | 147 ++++++++++++++++++ 7 files changed, 189 insertions(+), 16 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java index cdec8d7979..295f35cffd 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java @@ -22,7 +22,7 @@ public enum IotDataBridgTypeEnum implements ArrayValuable { MQTT(10), DATABASE(20), - REDIS(21), + REDIS_STREAM(21), ROCKETMQ(30), RABBITMQ(31), diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java index e95cb695ee..bb0623a1ff 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java @@ -193,10 +193,6 @@ public class IotDataBridgeDO extends BaseDO { */ private Boolean ssl; - /** - * 生产者组 ID - */ - private String groupId; /** * 主题 */ @@ -245,4 +241,33 @@ public class IotDataBridgeDO extends BaseDO { private String queue; } + /** + * Redis Stream MQ 配置 + */ + @Data + public static class RedisStreamMQConfig implements Config { + + /** + * Redis 服务器地址 + */ + private String host; + /** + * 端口 + */ + private Integer port; + /** + * 密码 + */ + private String password; + /** + * 数据库索引 + */ + private Integer database; + + /** + * 主题 + */ + private String topic; + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index e8fbb0ccb3..96b1edd330 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -7,6 +7,16 @@ import lombok.extern.slf4j.Slf4j; import java.time.Duration; +// TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟; +// TODO @芋艿:mq-redis +// TODO @芋艿:mq-数据库 +// TODO @芋艿:kafka +// TODO @芋艿:rocketmq +// TODO @芋艿:rabbitmq +// TODO @芋艿:mqtt +// TODO @芋艿:tcp +// TODO @芋艿:websocket + /** * 带缓存功能的数据桥梁执行器抽象类 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index 1617c3b091..a2593f3d9b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -19,14 +19,4 @@ public interface IotDataBridgeExecute { */ void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge); - // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟; - // TODO @芋艿:mq-redis - // TODO @芋艿:mq-数据库 - // TODO @芋艿:kafka - // TODO @芋艿:rocketmq - // TODO @芋艿:rabbitmq - // TODO @芋艿:mqtt - // TODO @芋艿:tcp - // TODO @芋艿:websocket - } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 8c0ef2b038..ec560122a8 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -35,6 +35,7 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig()); } + @SuppressWarnings("unchecked") private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { try { // 1. 获取或创建 KafkaTemplate diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 2c257051f6..78c1343c2d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -87,7 +87,7 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe } } - // TODO @芋道源码:测试代码,后续清理 + // TODO @芋艿:测试代码,后续清理 public static void main(String[] args) { // 1. 创建一个共享的实例 IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute(); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java new file mode 100644 index 0000000000..4eed88aec6 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -0,0 +1,147 @@ +package cn.iocoder.yudao.module.iot.service.rule.action.databridge; + +import cn.hutool.core.util.ReflectUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import lombok.extern.slf4j.Slf4j; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SingleServerConfig; +import org.redisson.spring.data.connection.RedissonConnectionFactory; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * Redis Stream MQ 的 {@link IotDataBridgeExecute} 实现类 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { + + @Override + public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥梁类型 + if (!IotDataBridgTypeEnum.REDIS_STREAM.getType().equals(dataBridge.getType())) { + return; + } + // 1.2 执行消息发送 + executeRedisStream(message, (IotDataBridgeDO.RedisStreamMQConfig) dataBridge.getConfig()); + } + + @SuppressWarnings("unchecked") + private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { + try { + // 1. 获取 RedisTemplate + RedisTemplate redisTemplate = (RedisTemplate) getProducer(config); + + // 2. 创建并发送 Stream 记录 + ObjectRecord record = StreamRecords.newRecord() + .ofObject(message).withStreamKey(config.getTopic()); + String recordId = String.valueOf(redisTemplate.opsForStream().add(record)); + log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config); + } catch (Exception e) { + log.error("[executeRedisStream][消息发送失败] message: {}, config: {}", message, config, e); + } + } + + @Override + protected Object initProducer(Object config) { + IotDataBridgeDO.RedisStreamMQConfig redisConfig = (IotDataBridgeDO.RedisStreamMQConfig) config; + + // 1.1 创建 Redisson 配置 + Config redissonConfig = new Config(); + SingleServerConfig serverConfig = redissonConfig.useSingleServer() + .setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort()) + .setDatabase(redisConfig.getDatabase()); + // 1.2 设置密码(如果有) + if (StrUtil.isNotBlank(redisConfig.getPassword())) { + serverConfig.setPassword(redisConfig.getPassword()); + } + + // 2.1 创建 RedissonClient + RedissonClient redisson = Redisson.create(redissonConfig); + // 2.2 创建并配置 RedisTemplate + RedisTemplate template = new RedisTemplate<>(); + // 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。 + template.setConnectionFactory(new RedissonConnectionFactory(redisson)); + // 使用 String 序列化方式,序列化 KEY 。 + template.setKeySerializer(RedisSerializer.string()); + template.setHashKeySerializer(RedisSerializer.string()); + // 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。 + template.setValueSerializer(buildRedisSerializer()); + template.setHashValueSerializer(buildRedisSerializer()); + template.afterPropertiesSet();// 初始化 + return template; + } + + @Override + protected void closeProducer(Object producer) { + if (producer instanceof RedisTemplate) { + RedisConnectionFactory factory = ((RedisTemplate) producer).getConnectionFactory(); + try { + if (factory != null) { + ((RedissonConnectionFactory) factory).destroy(); + } + } catch (Exception e) { + log.error("[closeProducer][关闭 redisson 连接异常]", e); + } + } + } + + + public static RedisSerializer buildRedisSerializer() { + RedisSerializer json = RedisSerializer.json(); + // 解决 LocalDateTime 的序列化 + ObjectMapper objectMapper = (ObjectMapper) ReflectUtil.getFieldValue(json, "mapper"); + objectMapper.registerModules(new JavaTimeModule()); + return json; + } + + // TODO @芋艿:测试代码,后续清理 + public static void main(String[] args) { + // 1. 创建一个共享的实例 + IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute(); + + // 2. 创建共享的配置 + IotDataBridgeDO.RedisStreamMQConfig config = new IotDataBridgeDO.RedisStreamMQConfig(); + config.setHost("127.0.0.1"); + config.setPort(6379); + config.setDatabase(0); + config.setPassword("123456"); + config.setTopic("test-stream"); + + // 3. 创建共享的消息 + IotDeviceMessage message = IotDeviceMessage.builder() + .requestId("TEST-001") + .productKey("testProduct") + .deviceName("testDevice") + .deviceKey("testDeviceKey") + .type("property") + .identifier("temperature") + .data("{\"value\": 60}") + .reportTime(LocalDateTime.now()) + .tenantId(1L) + .build(); + + // 4. 执行两次测试,验证缓存 + log.info("[main][第一次执行,应该会创建新的 RedisTemplate]"); + action.executeRedisStream(message, config); + + log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]"); + action.executeRedisStream(message, config); + } + +} \ No newline at end of file