【代码优化】IoT: 优化数据桥梁的执行器减少子类代码冗余

This commit is contained in:
puhui999 2025-03-03 13:03:41 +08:00
parent 3b54deb989
commit ce5e64e0aa
8 changed files with 81 additions and 79 deletions

View File

@ -22,7 +22,7 @@ public interface IotRuleSceneAction {
* 2. 非空的情况设备触发 * 2. 非空的情况设备触发
* @param config 配置 * @param config 配置
*/ */
void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config); void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception;
/** /**
* 获得类型 * 获得类型

View File

@ -26,10 +26,10 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
@Resource @Resource
private IotDataBridgeService dataBridgeService; private IotDataBridgeService dataBridgeService;
@Resource @Resource
private List<IotDataBridgeExecute> dataBridgeExecutes; private List<IotDataBridgeExecute<?>> dataBridgeExecutes;
@Override @Override
public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) { public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception {
// 1.1 如果消息为空直接返回 // 1.1 如果消息为空直接返回
if (message == null) { if (message == null) {
return; return;
@ -47,7 +47,9 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
} }
// 2. 执行数据桥接操作 // 2. 执行数据桥接操作
dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge)); for (IotDataBridgeExecute<?> execute : dataBridgeExecutes) {
execute.execute(message, dataBridge);
}
} }
@Override @Override

View File

@ -1,5 +1,7 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge; package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
@ -93,4 +95,20 @@ public abstract class AbstractCacheableDataBridgeExecute<Config, Producer> imple
*/ */
protected abstract void closeProducer(Producer producer) throws Exception; protected abstract void closeProducer(Producer producer) throws Exception;
@Override
@SuppressWarnings({"unchecked"})
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
// 1.1 校验数据桥梁类型
if (!getType().equals(dataBridge.getType())) {
return;
}
// 1.2 执行对应的数据桥梁发送消息
try {
execute0(message, (Config) dataBridge.getConfig());
} catch (Exception e) {
log.error("[execute][桥梁配置 config({}) 对应的 message({}) 发送异常]", dataBridge.getConfig(), message, e);
}
}
} }

View File

@ -25,7 +25,7 @@ public interface IotDataBridgeExecute<Config> {
* @param dataBridge 数据桥梁 * @param dataBridge 数据桥梁
*/ */
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) throws Exception {
// 1.1 校验数据桥梁类型 // 1.1 校验数据桥梁类型
if (!getType().equals(dataBridge.getType())) { if (!getType().equals(dataBridge.getType())) {
return; return;
@ -41,6 +41,6 @@ public interface IotDataBridgeExecute<Config> {
* @param message 设备消息 * @param message 设备消息
* @param config 桥梁配置 * @param config 桥梁配置
*/ */
void execute0(IotDeviceMessage message, Config config); void execute0(IotDeviceMessage message, Config config) throws Exception;
} }

View File

@ -15,7 +15,6 @@ import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* Kafka {@link IotDataBridgeExecute} 实现类 * Kafka {@link IotDataBridgeExecute} 实现类
@ -35,20 +34,14 @@ public class IotKafkaMQDataBridgeExecute extends
} }
@Override @Override
public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) throws Exception {
try { // 1. 获取或创建 KafkaTemplate
// 1. 获取或创建 KafkaTemplate KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
// 2. 发送消息并等待结果 // 2. 发送消息并等待结果
kafkaTemplate.send(config.getTopic(), message.toString()) kafkaTemplate.send(config.getTopic(), message.toString())
.get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待 .get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待
log.info("[executeKafka][message({}) 发送成功]", message); log.info("[execute0][message({}) 发送成功]", message);
} catch (TimeoutException e) {
log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e);
} catch (Exception e) {
log.error("[executeKafka][message({}) config({}) 发送异常]", message, config, e);
}
} }
@Override @Override
@ -109,10 +102,10 @@ public class IotKafkaMQDataBridgeExecute extends
// 4. 执行两次测试验证缓存 // 4. 执行两次测试验证缓存
log.info("[main][第一次执行,应该会创建新的 producer]"); log.info("[main][第一次执行,应该会创建新的 producer]");
action.execute0(message, config); action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
log.info("[main][第二次执行,应该会复用缓存的 producer]"); log.info("[main][第二次执行,应该会复用缓存的 producer]");
action.execute0(message, config); action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
} }
} }

View File

@ -29,23 +29,19 @@ public class IotRabbitMQDataBridgeExecute extends
} }
@Override @Override
public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) throws Exception {
try { // 1. 获取或创建 Channel
// 1. 获取或创建 Channel Channel channel = getProducer(config);
Channel channel = getProducer(config);
// 2.1 声明交换机队列和绑定关系 // 2.1 声明交换机队列和绑定关系
channel.exchangeDeclare(config.getExchange(), "direct", true); channel.exchangeDeclare(config.getExchange(), "direct", true);
channel.queueDeclare(config.getQueue(), true, false, false, null); channel.queueDeclare(config.getQueue(), true, false, false, null);
channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey()); channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey());
// 2.2 发送消息 // 2.2 发送消息
channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, channel.basicPublish(config.getExchange(), config.getRoutingKey(), null,
message.toString().getBytes(StandardCharsets.UTF_8)); message.toString().getBytes(StandardCharsets.UTF_8));
log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config); log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config);
} catch (Exception e) {
log.error("[executeRabbitMQ][message({}) config({}) 发送异常]", message, config, e);
}
} }
@Override @Override
@ -107,11 +103,12 @@ public class IotRabbitMQDataBridgeExecute extends
.build(); .build();
// 4. 执行两次测试验证缓存 // 4. 执行两次测试验证缓存
log.info("[main][第一次执行,应该会创建新的 channel]"); // 4. 执行两次测试验证缓存
action.execute0(message, config); log.info("[main][第一次执行,应该会创建新的 producer]");
action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
log.info("[main][第二次执行,应该会复用缓存的 channel]"); log.info("[main][第二次执行,应该会复用缓存的 producer]");
action.execute0(message, config); action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
} }
} }

View File

@ -38,19 +38,15 @@ public class IotRedisStreamMQDataBridgeExecute extends
} }
@Override @Override
public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) throws Exception {
try { // 1. 获取 RedisTemplate
// 1. 获取 RedisTemplate RedisTemplate<String, Object> redisTemplate = getProducer(config);
RedisTemplate<String, Object> redisTemplate = getProducer(config);
// 2. 创建并发送 Stream 记录 // 2. 创建并发送 Stream 记录
ObjectRecord<String, IotDeviceMessage> record = StreamRecords.newRecord() ObjectRecord<String, IotDeviceMessage> record = StreamRecords.newRecord()
.ofObject(message).withStreamKey(config.getTopic()); .ofObject(message).withStreamKey(config.getTopic());
String recordId = String.valueOf(redisTemplate.opsForStream().add(record)); String recordId = String.valueOf(redisTemplate.opsForStream().add(record));
log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config); log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config);
} catch (Exception e) {
log.error("[executeRedisStream][消息发送失败] message: {}, config: {}", message, config, e);
}
} }
@Override @Override
@ -126,11 +122,11 @@ public class IotRedisStreamMQDataBridgeExecute extends
.build(); .build();
// 4. 执行两次测试验证缓存 // 4. 执行两次测试验证缓存
log.info("[main][第一次执行,应该会创建新的 RedisTemplate]"); log.info("[main][第一次执行,应该会创建新的 producer]");
action.execute0(message, config); action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]"); log.info("[main][第二次执行,应该会复用缓存的 producer]");
action.execute0(message, config); action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
} }
} }

View File

@ -29,27 +29,23 @@ public class IotRocketMQDataBridgeExecute extends
} }
@Override @Override
public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) throws Exception {
try { // 1. 获取或创建 Producer
// 1. 获取或创建 Producer DefaultMQProducer producer = getProducer(config);
DefaultMQProducer producer = getProducer(config);
// 2.1 创建消息对象指定TopicTag和消息体 // 2.1 创建消息对象指定TopicTag和消息体
Message msg = new Message( Message msg = new Message(
config.getTopic(), config.getTopic(),
config.getTags(), config.getTags(),
message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
); );
// 2.2 发送同步消息并处理结果 // 2.2 发送同步消息并处理结果
SendResult sendResult = producer.send(msg); SendResult sendResult = producer.send(msg);
// 2.3 处理发送结果 // 2.3 处理发送结果
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult);
} else { } else {
log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult);
}
} catch (Exception e) {
log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e);
} }
} }
@ -93,10 +89,10 @@ public class IotRocketMQDataBridgeExecute extends
// 4. 执行两次测试验证缓存 // 4. 执行两次测试验证缓存
log.info("[main][第一次执行,应该会创建新的 producer]"); log.info("[main][第一次执行,应该会创建新的 producer]");
action.execute0(message, config); action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
log.info("[main][第二次执行,应该会复用缓存的 producer]"); log.info("[main][第二次执行,应该会复用缓存的 producer]");
action.execute0(message, config); action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
} }
} }