【代码优化】IoT: Redis Stream 数据桥梁执行器

This commit is contained in:
puhui999 2025-03-24 16:13:19 +08:00
parent cd656fad4b
commit 3266bf0f98
5 changed files with 19 additions and 35 deletions

View File

@ -18,7 +18,7 @@ import lombok.Data;
@JsonSubTypes({ @JsonSubTypes({
@JsonSubTypes.Type(value = IotDataBridgeHttpConfig.class, name = "1"), @JsonSubTypes.Type(value = IotDataBridgeHttpConfig.class, name = "1"),
@JsonSubTypes.Type(value = IotDataBridgeMqttConfig.class, name = "10"), @JsonSubTypes.Type(value = IotDataBridgeMqttConfig.class, name = "10"),
@JsonSubTypes.Type(value = IotDataBridgeRedisStreamMQConfig.class, name = "21"), @JsonSubTypes.Type(value = IotDataBridgeRedisStreamConfig.class, name = "21"),
@JsonSubTypes.Type(value = IotDataBridgeRocketMQConfig.class, name = "30"), @JsonSubTypes.Type(value = IotDataBridgeRocketMQConfig.class, name = "30"),
@JsonSubTypes.Type(value = IotDataBridgeRabbitMQConfig.class, name = "31"), @JsonSubTypes.Type(value = IotDataBridgeRabbitMQConfig.class, name = "31"),
@JsonSubTypes.Type(value = IotDataBridgeKafkaMQConfig.class, name = "32"), @JsonSubTypes.Type(value = IotDataBridgeKafkaMQConfig.class, name = "32"),

View File

@ -2,14 +2,13 @@ package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data; import lombok.Data;
// TODO @puhui999MQ 可以去掉哈stream 更精准
/** /**
* IoT Redis Stream 配置 {@link IotDataBridgeAbstractConfig} 实现类 * IoT Redis Stream 配置 {@link IotDataBridgeAbstractConfig} 实现类
* *
* @author HUIHUI * @author HUIHUI
*/ */
@Data @Data
public class IotDataBridgeRedisStreamMQConfig extends IotDataBridgeAbstractConfig { public class IotDataBridgeRedisStreamConfig extends IotDataBridgeAbstractConfig {
/** /**
* Redis 服务器地址 * Redis 服务器地址

View File

@ -101,7 +101,7 @@ public abstract class AbstractCacheableDataBridgeExecute<Config, Producer> imple
@Override @Override
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
if (ObjUtil.notEqual(message.getType(), getType())) { if (ObjUtil.notEqual(dataBridge.getType(), getType())) {
return; return;
} }
try { try {

View File

@ -1,12 +1,9 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge; package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRedisStreamMQConfig; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRedisStreamConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
@ -21,14 +18,14 @@ import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* Redis Stream MQ {@link IotDataBridgeExecute} 实现类 * Redis Stream {@link IotDataBridgeExecute} 实现类
* *
* @author HUIHUI * @author HUIHUI
*/ */
@Component @Component
@Slf4j @Slf4j
public class IotRedisStreamMQDataBridgeExecute extends public class IotRedisStreamDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeRedisStreamMQConfig, RedisTemplate<String, Object>> { AbstractCacheableDataBridgeExecute<IotDataBridgeRedisStreamConfig, RedisTemplate<String, Object>> {
@Override @Override
public Integer getType() { public Integer getType() {
@ -36,7 +33,7 @@ public class IotRedisStreamMQDataBridgeExecute extends
} }
@Override @Override
public void execute0(IotDeviceMessage message, IotDataBridgeRedisStreamMQConfig config) throws Exception { public void execute0(IotDeviceMessage message, IotDataBridgeRedisStreamConfig config) throws Exception {
// 1. 获取 RedisTemplate // 1. 获取 RedisTemplate
RedisTemplate<String, Object> redisTemplate = getProducer(config); RedisTemplate<String, Object> redisTemplate = getProducer(config);
@ -48,7 +45,7 @@ public class IotRedisStreamMQDataBridgeExecute extends
} }
@Override @Override
protected RedisTemplate<String, Object> initProducer(IotDataBridgeRedisStreamMQConfig config) { protected RedisTemplate<String, Object> initProducer(IotDataBridgeRedisStreamConfig config) {
// 1.1 创建 Redisson 配置 // 1.1 创建 Redisson 配置
Config redissonConfig = new Config(); Config redissonConfig = new Config();
SingleServerConfig serverConfig = redissonConfig.useSingleServer() SingleServerConfig serverConfig = redissonConfig.useSingleServer()
@ -59,20 +56,17 @@ public class IotRedisStreamMQDataBridgeExecute extends
serverConfig.setPassword(config.getPassword()); serverConfig.setPassword(config.getPassword());
} }
// TODO @huihui看看能不能简化一些按道理说不用这么多的哈 // TODO @芋艿看看怎么优化
// 2.1 创建 RedissonClient // 创建 RedisTemplate 并配置
RedissonClient redisson = Redisson.create(redissonConfig); RedissonClient redisson = Redisson.create(redissonConfig);
// 2.2 创建并配置 RedisTemplate
RedisTemplate<String, Object> template = new RedisTemplate<>(); RedisTemplate<String, Object> template = new RedisTemplate<>();
// 设置 RedisConnection 工厂😈 它就是实现多种 Java Redis 客户端接入的秘密工厂感兴趣的胖友可以自己去撸下
template.setConnectionFactory(new RedissonConnectionFactory(redisson)); template.setConnectionFactory(new RedissonConnectionFactory(redisson));
// 使用 String 序列化方式序列化 KEY // 设置序列化器
template.setKeySerializer(RedisSerializer.string()); template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string()); template.setHashKeySerializer(RedisSerializer.string());
// 使用 JSON 序列化方式库是 Jackson 序列化 VALUE template.setValueSerializer(RedisSerializer.json());
template.setValueSerializer(buildRedisSerializer()); template.setHashValueSerializer(RedisSerializer.json());
template.setHashValueSerializer(buildRedisSerializer()); template.afterPropertiesSet();
template.afterPropertiesSet();// 初始化
return template; return template;
} }
@ -84,13 +78,4 @@ public class IotRedisStreamMQDataBridgeExecute extends
} }
} }
// TODO @huihui看看能不能简化一些按道理说不用这么多的哈
public static RedisSerializer<?> buildRedisSerializer() {
RedisSerializer<Object> json = RedisSerializer.json();
// 解决 LocalDateTime 的序列化
ObjectMapper objectMapper = (ObjectMapper) ReflectUtil.getFieldValue(json, "mapper");
objectMapper.registerModules(new JavaTimeModule());
return json;
}
} }

View File

@ -92,12 +92,12 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
} }
@Test @Test
public void testRedisStreamMQDataBridge() throws Exception { public void testRedisStreamDataBridge() throws Exception {
// 1. 创建执行器实例 // 1. 创建执行器实例
IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute(); IotRedisStreamDataBridgeExecute action = new IotRedisStreamDataBridgeExecute();
// 2. 创建配置 // 2. 创建配置
IotDataBridgeRedisMQConfig config = new IotDataBridgeRedisMQConfig() IotDataBridgeRedisStreamConfig config = new IotDataBridgeRedisStreamConfig()
.setHost("127.0.0.1") .setHost("127.0.0.1")
.setPort(6379) .setPort(6379)
.setDatabase(0) .setDatabase(0)
@ -105,7 +105,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
.setTopic("test-stream"); .setTopic("test-stream");
// 3. 执行测试并验证缓存 // 3. 执行测试并验证缓存
executeAndVerifyCache(action, config, "RedisStreamMQ"); executeAndVerifyCache(action, config, "RedisStream");
} }
@Test @Test