【代码优化】IoT: 优化数据桥梁的执行器减少子类代码冗余
This commit is contained in:
parent
61ea09488e
commit
3b54deb989
|
@ -33,7 +33,7 @@ import java.time.Duration;
|
||||||
* @author HUIHUI
|
* @author HUIHUI
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractCacheableDataBridgeExecute<Config, Producer> implements IotDataBridgeExecute {
|
public abstract class AbstractCacheableDataBridgeExecute<Config, Producer> implements IotDataBridgeExecute<Config> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Producer 缓存
|
* Producer 缓存
|
||||||
|
|
|
@ -9,9 +9,14 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
|
||||||
*
|
*
|
||||||
* @author HUIHUI
|
* @author HUIHUI
|
||||||
*/
|
*/
|
||||||
public interface IotDataBridgeExecute {
|
public interface IotDataBridgeExecute<Config> {
|
||||||
|
|
||||||
// TODO @huihui:要不搞个 getType?然后 execute0 由子类实现。这样,子类的 executeRedisStream ,其实就是 execute0 了。
|
/**
|
||||||
|
* 获取数据桥梁类型
|
||||||
|
*
|
||||||
|
* @return 数据桥梁类型
|
||||||
|
*/
|
||||||
|
Integer getType();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 执行数据桥梁操作
|
* 执行数据桥梁操作
|
||||||
|
@ -19,6 +24,23 @@ public interface IotDataBridgeExecute {
|
||||||
* @param message 设备消息
|
* @param message 设备消息
|
||||||
* @param dataBridge 数据桥梁
|
* @param dataBridge 数据桥梁
|
||||||
*/
|
*/
|
||||||
void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge);
|
@SuppressWarnings({"unchecked"})
|
||||||
|
default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
|
||||||
|
// 1.1 校验数据桥梁类型
|
||||||
|
if (!getType().equals(dataBridge.getType())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1.2 执行对应的数据桥梁发送消息
|
||||||
|
execute0(message, (Config) dataBridge.getConfig());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 【真正】执行数据桥梁操作
|
||||||
|
*
|
||||||
|
* @param message 设备消息
|
||||||
|
* @param config 桥梁配置
|
||||||
|
*/
|
||||||
|
void execute0(IotDeviceMessage message, Config config);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,23 +25,19 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class IotHttpDataBridgeExecute implements IotDataBridgeExecute {
|
public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataBridgeDO.HttpConfig> {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private RestTemplate restTemplate;
|
private RestTemplate restTemplate;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
|
public Integer getType() {
|
||||||
// 1.1 校验数据桥梁的类型 == HTTP
|
return IotDataBridgTypeEnum.HTTP.getType();
|
||||||
if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 1.2 执行 HTTP 请求
|
|
||||||
executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
@SuppressWarnings({"unchecked", "deprecation"})
|
@SuppressWarnings({"unchecked", "deprecation"})
|
||||||
private void executeHttp(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) {
|
public void execute0(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) {
|
||||||
String url = null;
|
String url = null;
|
||||||
HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
|
HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
|
||||||
HttpEntity<String> requestEntity = null;
|
HttpEntity<String> requestEntity = null;
|
||||||
|
|
|
@ -30,16 +30,12 @@ public class IotKafkaMQDataBridgeExecute extends
|
||||||
private static final Duration SEND_TIMEOUT = Duration.ofMillis(10);
|
private static final Duration SEND_TIMEOUT = Duration.ofMillis(10);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
|
public Integer getType() {
|
||||||
// 1. 校验数据桥梁的类型 == KAFKA
|
return IotDataBridgTypeEnum.KAFKA.getType();
|
||||||
if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 2. 执行 Kafka 发送消息
|
|
||||||
executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) {
|
@Override
|
||||||
|
public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) {
|
||||||
try {
|
try {
|
||||||
// 1. 获取或创建 KafkaTemplate
|
// 1. 获取或创建 KafkaTemplate
|
||||||
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
|
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
|
||||||
|
@ -113,10 +109,10 @@ public class IotKafkaMQDataBridgeExecute extends
|
||||||
|
|
||||||
// 4. 执行两次测试,验证缓存
|
// 4. 执行两次测试,验证缓存
|
||||||
log.info("[main][第一次执行,应该会创建新的 producer]");
|
log.info("[main][第一次执行,应该会创建新的 producer]");
|
||||||
action.executeKafka(message, config);
|
action.execute0(message, config);
|
||||||
|
|
||||||
log.info("[main][第二次执行,应该会复用缓存的 producer]");
|
log.info("[main][第二次执行,应该会复用缓存的 producer]");
|
||||||
action.executeKafka(message, config);
|
action.execute0(message, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,17 +22,14 @@ import java.time.LocalDateTime;
|
||||||
public class IotRabbitMQDataBridgeExecute extends
|
public class IotRabbitMQDataBridgeExecute extends
|
||||||
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RabbitMQConfig, Channel> {
|
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RabbitMQConfig, Channel> {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
|
public Integer getType() {
|
||||||
// 1.1 校验数据桥梁的类型 == RABBITMQ
|
return IotDataBridgTypeEnum.RABBITMQ.getType();
|
||||||
if (!IotDataBridgTypeEnum.RABBITMQ.getType().equals(dataBridge.getType())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 1.2 执行 RabbitMQ 发送消息
|
|
||||||
executeRabbitMQ(message, (IotDataBridgeDO.RabbitMQConfig) dataBridge.getConfig());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) {
|
@Override
|
||||||
|
public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) {
|
||||||
try {
|
try {
|
||||||
// 1. 获取或创建 Channel
|
// 1. 获取或创建 Channel
|
||||||
Channel channel = getProducer(config);
|
Channel channel = getProducer(config);
|
||||||
|
@ -111,10 +108,10 @@ public class IotRabbitMQDataBridgeExecute extends
|
||||||
|
|
||||||
// 4. 执行两次测试,验证缓存
|
// 4. 执行两次测试,验证缓存
|
||||||
log.info("[main][第一次执行,应该会创建新的 channel]");
|
log.info("[main][第一次执行,应该会创建新的 channel]");
|
||||||
action.executeRabbitMQ(message, config);
|
action.execute0(message, config);
|
||||||
|
|
||||||
log.info("[main][第二次执行,应该会复用缓存的 channel]");
|
log.info("[main][第二次执行,应该会复用缓存的 channel]");
|
||||||
action.executeRabbitMQ(message, config);
|
action.execute0(message, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,18 +33,12 @@ public class IotRedisStreamMQDataBridgeExecute extends
|
||||||
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RedisStreamMQConfig, RedisTemplate<String, Object>> {
|
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RedisStreamMQConfig, RedisTemplate<String, Object>> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
|
public Integer getType() {
|
||||||
// 1.1 校验数据桥梁类型
|
return IotDataBridgTypeEnum.REDIS_STREAM.getType();
|
||||||
if (!IotDataBridgTypeEnum.REDIS_STREAM.getType().equals(dataBridge.getType())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 1.2 执行消息发送
|
|
||||||
executeRedisStream(message, (IotDataBridgeDO.RedisStreamMQConfig) dataBridge.getConfig());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@Override
|
||||||
// TODO @huihui:try catch 交给父类来做,子类不处理异常
|
public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) {
|
||||||
private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) {
|
|
||||||
try {
|
try {
|
||||||
// 1. 获取 RedisTemplate
|
// 1. 获取 RedisTemplate
|
||||||
RedisTemplate<String, Object> redisTemplate = getProducer(config);
|
RedisTemplate<String, Object> redisTemplate = getProducer(config);
|
||||||
|
@ -133,10 +127,10 @@ public class IotRedisStreamMQDataBridgeExecute extends
|
||||||
|
|
||||||
// 4. 执行两次测试,验证缓存
|
// 4. 执行两次测试,验证缓存
|
||||||
log.info("[main][第一次执行,应该会创建新的 RedisTemplate]");
|
log.info("[main][第一次执行,应该会创建新的 RedisTemplate]");
|
||||||
action.executeRedisStream(message, config);
|
action.execute0(message, config);
|
||||||
|
|
||||||
log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]");
|
log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]");
|
||||||
action.executeRedisStream(message, config);
|
action.execute0(message, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -24,16 +24,12 @@ public class IotRocketMQDataBridgeExecute extends
|
||||||
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer> {
|
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
|
public Integer getType() {
|
||||||
// 1.1 校验数据桥梁的类型 == ROCKETMQ
|
return IotDataBridgTypeEnum.ROCKETMQ.getType();
|
||||||
if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 1.2 执行 RocketMQ 发送消息
|
|
||||||
executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
|
@Override
|
||||||
|
public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
|
||||||
try {
|
try {
|
||||||
// 1. 获取或创建 Producer
|
// 1. 获取或创建 Producer
|
||||||
DefaultMQProducer producer = getProducer(config);
|
DefaultMQProducer producer = getProducer(config);
|
||||||
|
@ -97,10 +93,10 @@ public class IotRocketMQDataBridgeExecute extends
|
||||||
|
|
||||||
// 4. 执行两次测试,验证缓存
|
// 4. 执行两次测试,验证缓存
|
||||||
log.info("[main][第一次执行,应该会创建新的 producer]");
|
log.info("[main][第一次执行,应该会创建新的 producer]");
|
||||||
action.executeRocketMQ(message, config);
|
action.execute0(message, config);
|
||||||
|
|
||||||
log.info("[main][第二次执行,应该会复用缓存的 producer]");
|
log.info("[main][第二次执行,应该会复用缓存的 producer]");
|
||||||
action.executeRocketMQ(message, config);
|
action.execute0(message, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue