【代码优化】IoT: 优化数据桥梁执行器抽象类增加泛型,减少子类类型强转

This commit is contained in:
puhui999 2025-03-03 12:22:19 +08:00
parent 3c9985978b
commit 61ea09488e
5 changed files with 81 additions and 84 deletions

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
@ -15,22 +16,36 @@ import java.time.Duration;
/**
* 带缓存功能的数据桥梁执行器抽象类
*
* 该类提供了一个通用的缓存机制用于管理各类数据桥接的生产者(Producer)实例
*
* 主要特点:
* - 基于Guava Cache实现高效的生产者实例缓存管理
* - 自动处理生产者的生命周期创建获取关闭
* - 支持30分钟未访问自动过期清理机制
* - 异常处理与日志记录便于问题排查
*
* 子类需要实现:
* - initProducer(Config) - 初始化特定类型的生产者实例
* - closeProducer(Producer) - 关闭生产者实例并释放资源
*
* @param <Config> 配置信息类型用于初始化生产者
* @param <Producer> 生产者类型负责将数据发送到目标系统
* @author HUIHUI
*/
@Slf4j
public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute {
public abstract class AbstractCacheableDataBridgeExecute<Config, Producer> implements IotDataBridgeExecute {
// TODO @huihuiAbstractCacheableDataBridgeExecute<Producer> 这样下面的 Object, Object 就有了类型另外 IotDataBridgeDO.Config 可以替代一个 Object
/**
* Producer 缓存
*/
private final LoadingCache<Object, Object> PRODUCER_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(30))
.removalListener(notification -> {
Object producer = notification.getValue();
private final LoadingCache<Config, Producer> PRODUCER_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(30)) // 30 分钟未访问就提前过期
.removalListener((RemovalListener<Config, Producer>) notification -> {
Producer producer = notification.getValue();
if (producer == null) {
return;
}
try {
closeProducer(producer);
log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey());
@ -38,15 +53,18 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg
log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e);
}
})
.build(new CacheLoader<Object, Object>() {
.build(new CacheLoader<Config, Producer>() {
@Override
public Object load(Object config) throws Exception {
Object producer = initProducer(config);
log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
return producer;
public Producer load(Config config) throws Exception {
try {
Producer producer = initProducer(config);
log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
return producer;
} catch (Exception e) {
log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 创建启动失败]", config, e);
throw e; // 抛出异常触发缓存加载失败机制
}
}
});
/**
@ -55,7 +73,7 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg
* @param config 配置信息
* @return 生产者对象
*/
protected Object getProducer(Object config) throws Exception {
protected Producer getProducer(Config config) throws Exception {
return PRODUCER_CACHE.get(config);
}
@ -66,13 +84,13 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg
* @return 生产者对象
* @throws Exception 如果初始化失败
*/
protected abstract Object initProducer(Object config) throws Exception;
protected abstract Producer initProducer(Config config) throws Exception;
/**
* 关闭生产者
*
* @param producer 生产者对象
*/
protected abstract void closeProducer(Object producer);
protected abstract void closeProducer(Producer producer) throws Exception;
}

View File

@ -24,7 +24,8 @@ import java.util.concurrent.TimeoutException;
*/
@Component
@Slf4j
public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
public class IotKafkaMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.KafkaMQConfig, KafkaTemplate<String, String>> {
private static final Duration SEND_TIMEOUT = Duration.ofMillis(10);
@ -38,11 +39,10 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig());
}
@SuppressWarnings("unchecked")
private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) {
try {
// 1. 获取或创建 KafkaTemplate
KafkaTemplate<String, String> kafkaTemplate = (KafkaTemplate<String, String>) getProducer(config);
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
// 2. 发送消息并等待结果
kafkaTemplate.send(config.getTopic(), message.toString())
@ -56,24 +56,22 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
}
@Override
protected Object initProducer(Object config) {
IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config;
protected KafkaTemplate<String, String> initProducer(IotDataBridgeDO.KafkaMQConfig config) {
// 1.1 构建生产者配置
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 1.2 如果配置了认证信息
if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) {
if (config.getUsername() != null && config.getPassword() != null) {
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";");
+ config.getUsername() + "\" password=\"" + config.getPassword() + "\";");
}
// 1.3 如果启用 SSL
if (Boolean.TRUE.equals(kafkaConfig.getSsl())) {
if (Boolean.TRUE.equals(config.getSsl())) {
props.put("security.protocol", "SSL");
}
@ -83,10 +81,8 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
}
@Override
protected void closeProducer(Object producer) {
if (producer instanceof KafkaTemplate) {
((KafkaTemplate<?, ?>) producer).destroy();
}
protected void closeProducer(KafkaTemplate<String, String> producer) {
producer.destroy();
}
// TODO @芋艿测试代码后续清理

View File

@ -19,7 +19,8 @@ import java.time.LocalDateTime;
*/
@Component
@Slf4j
public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
public class IotRabbitMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RabbitMQConfig, Channel> {
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
@ -34,7 +35,7 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) {
try {
// 1. 获取或创建 Channel
Channel channel = (Channel) getProducer(config);
Channel channel = getProducer(config);
// 2.1 声明交换机队列和绑定关系
channel.exchangeDeclare(config.getExchange(), "direct", true);
@ -52,16 +53,14 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
@Override
@SuppressWarnings("resource")
protected Object initProducer(Object config) throws Exception {
IotDataBridgeDO.RabbitMQConfig rabbitConfig = (IotDataBridgeDO.RabbitMQConfig) config;
protected Channel initProducer(IotDataBridgeDO.RabbitMQConfig config) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitConfig.getHost());
factory.setPort(rabbitConfig.getPort());
factory.setVirtualHost(rabbitConfig.getVirtualHost());
factory.setUsername(rabbitConfig.getUsername());
factory.setPassword(rabbitConfig.getPassword());
factory.setHost(config.getHost());
factory.setPort(config.getPort());
factory.setVirtualHost(config.getVirtualHost());
factory.setUsername(config.getUsername());
factory.setPassword(config.getPassword());
// 2. 创建连接
Connection connection = factory.newConnection();
@ -71,20 +70,13 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
}
@Override
protected void closeProducer(Object producer) {
if (producer instanceof Channel) {
try {
Channel channel = (Channel) producer;
if (channel.isOpen()) {
channel.close();
}
Connection connection = channel.getConnection();
if (connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
log.error("[closeProducer][关闭 RabbitMQ 连接异常]", e);
}
protected void closeProducer(Channel channel) throws Exception {
if (channel.isOpen()) {
channel.close();
}
Connection connection = channel.getConnection();
if (connection.isOpen()) {
connection.close();
}
}
@ -124,4 +116,5 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
log.info("[main][第二次执行,应该会复用缓存的 channel]");
action.executeRabbitMQ(message, config);
}
}

View File

@ -29,7 +29,8 @@ import java.time.LocalDateTime;
*/
@Component
@Slf4j
public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
public class IotRedisStreamMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RedisStreamMQConfig, RedisTemplate<String, Object>> {
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
@ -46,7 +47,7 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid
private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) {
try {
// 1. 获取 RedisTemplate
RedisTemplate<String, Object> redisTemplate = (RedisTemplate<String, Object>) getProducer(config);
RedisTemplate<String, Object> redisTemplate = getProducer(config);
// 2. 创建并发送 Stream 记录
ObjectRecord<String, IotDeviceMessage> record = StreamRecords.newRecord()
@ -59,17 +60,15 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid
}
@Override
protected Object initProducer(Object config) {
IotDataBridgeDO.RedisStreamMQConfig redisConfig = (IotDataBridgeDO.RedisStreamMQConfig) config;
protected RedisTemplate<String, Object> initProducer(IotDataBridgeDO.RedisStreamMQConfig config) {
// 1.1 创建 Redisson 配置
Config redissonConfig = new Config();
SingleServerConfig serverConfig = redissonConfig.useSingleServer()
.setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort())
.setDatabase(redisConfig.getDatabase());
.setAddress("redis://" + config.getHost() + ":" + config.getPort())
.setDatabase(config.getDatabase());
// 1.2 设置密码如果有
if (StrUtil.isNotBlank(redisConfig.getPassword())) {
serverConfig.setPassword(redisConfig.getPassword());
if (StrUtil.isNotBlank(config.getPassword())) {
serverConfig.setPassword(config.getPassword());
}
// TODO @huihui看看能不能简化一些按道理说不用这么多的哈
@ -90,17 +89,10 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid
}
@Override
protected void closeProducer(Object producer) {
// TODO @huihuitry catch 交给父类来做子类不处理异常
if (producer instanceof RedisTemplate) {
RedisConnectionFactory factory = ((RedisTemplate<?, ?>) producer).getConnectionFactory();
try {
if (factory != null) {
((RedissonConnectionFactory) factory).destroy();
}
} catch (Exception e) {
log.error("[closeProducer][关闭 redisson 连接异常]", e);
}
protected void closeProducer(RedisTemplate<String, Object> producer) throws Exception {
RedisConnectionFactory factory = producer.getConnectionFactory();
if (factory != null) {
((RedissonConnectionFactory) factory).destroy();
}
}

View File

@ -20,7 +20,8 @@ import java.time.LocalDateTime;
*/
@Component
@Slf4j
public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
public class IotRocketMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer> {
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
@ -35,7 +36,7 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
try {
// 1. 获取或创建 Producer
DefaultMQProducer producer = (DefaultMQProducer) getProducer(config);
DefaultMQProducer producer = getProducer(config);
// 2.1 创建消息对象指定TopicTag和消息体
Message msg = new Message(
@ -57,19 +58,16 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
}
@Override
protected Object initProducer(Object config) throws Exception {
IotDataBridgeDO.RocketMQConfig rocketMQConfig = (IotDataBridgeDO.RocketMQConfig) config;
DefaultMQProducer producer = new DefaultMQProducer(rocketMQConfig.getGroup());
producer.setNamesrvAddr(rocketMQConfig.getNameServer());
protected DefaultMQProducer initProducer(IotDataBridgeDO.RocketMQConfig config) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
producer.setNamesrvAddr(config.getNameServer());
producer.start();
return producer;
}
@Override
protected void closeProducer(Object producer) {
if (producer instanceof DefaultMQProducer) {
((DefaultMQProducer) producer).shutdown();
}
protected void closeProducer(DefaultMQProducer producer) {
producer.shutdown();
}
// TODO @芋艿测试代码后续清理