diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java new file mode 100644 index 0000000000..264bce553e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -0,0 +1,70 @@ +package cn.iocoder.yudao.module.iot.service.rule.action.databridge; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; + +/** + * 带缓存功能的数据桥接执行器抽象类 + * + * @author HUIHUI + */ +@Slf4j +public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { + + /** + * Producer 缓存 + */ + private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofMinutes(30)) + .removalListener(notification -> { + Object producer = notification.getValue(); + if (producer == null) { + return; + } + try { + closeProducer(producer); + log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey()); + } catch (Exception e) { + log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e); + } + }) + .build(new CacheLoader() { + @Override + public Object load(Object config) throws Exception { + Object producer = initProducer(config); + log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); + return producer; + } + }); + + /** + * 获取生产者 + * + * @param config 配置信息 + * @return 生产者对象 + */ + protected Object getProducer(Object config) throws Exception { + return PRODUCER_CACHE.get(config); + } + + /** + * 初始化生产者 + * + * @param config 配置信息 + * @return 生产者对象 + * @throws Exception 如果初始化失败 + */ + protected abstract Object initProducer(Object config) throws Exception; + + /** + * 关闭生产者 + * + * @param producer 生产者对象 + */ + protected abstract void closeProducer(Object producer); + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 626208e187..af701cd903 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -3,9 +3,6 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; @@ -14,9 +11,7 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.stereotype.Component; -import java.time.Duration; import java.time.LocalDateTime; -import java.util.concurrent.Executors; /** * RocketMQ 的 {@link IotDataBridgeExecute} 实现类 @@ -25,41 +20,7 @@ import java.util.concurrent.Executors; */ @Component @Slf4j -public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { - - /** - * 针对 {@link IotDataBridgeDO.RocketMQConfig} 的 DefaultMQProducer 缓存 - */ - // TODO @puhui999:因为 kafka 之类也存在这个情况,是不是得搞个抽象类。提供一个 initProducer,和 closeProducer 方法 - private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() - .refreshAfterWrite(Duration.ofMinutes(10)) // TODO puhui999:应该是 read 30 分钟哈 - // 增加移除监听器,自动关闭 producer - .removalListener(notification -> { - DefaultMQProducer producer = (DefaultMQProducer) notification.getValue(); - // TODO puhui999:if return,更简短哈 - if (producer != null) { - try { - producer.shutdown(); - log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey()); - } catch (Exception e) { - log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e); - } - } - }) - // TODO @puhui999:就同步哈,不用异步处理。 - // 通过 asyncReloading 实现全异步加载,包括 refreshAfterWrite 被阻塞的加载线程 - .build(CacheLoader.asyncReloading(new CacheLoader() { - - @Override - public DefaultMQProducer load(IotDataBridgeDO.RocketMQConfig config) throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); - producer.setNamesrvAddr(config.getNameServer()); - producer.start(); - log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); - return producer; - } - - }, Executors.newCachedThreadPool())); +public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { @Override public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { @@ -74,7 +35,7 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { try { // 1. 获取或创建 Producer - DefaultMQProducer producer = PRODUCER_CACHE.get(config); + DefaultMQProducer producer = (DefaultMQProducer) getProducer(config); // 2.1 创建消息对象,指定Topic、Tag和消息体 Message msg = new Message( @@ -95,6 +56,22 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { } } + @Override + protected Object initProducer(Object config) throws Exception { + IotDataBridgeDO.RocketMQConfig rocketMQConfig = (IotDataBridgeDO.RocketMQConfig) config; + DefaultMQProducer producer = new DefaultMQProducer(rocketMQConfig.getGroup()); + producer.setNamesrvAddr(rocketMQConfig.getNameServer()); + producer.start(); + return producer; + } + + @Override + protected void closeProducer(Object producer) { + if (producer instanceof DefaultMQProducer) { + ((DefaultMQProducer) producer).shutdown(); + } + } + // TODO @芋艿:测试代码,后续清理 public static void main(String[] args) { // 1. 创建一个共享的实例