diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java index b3a863c8be..213b0cda10 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java @@ -146,32 +146,28 @@ public class IotDataBridgeDO extends BaseDO { * RocketMQ 名称服务器地址 */ private String nameServer; - - /** - * 生产者/消费者组 - */ - private String group; - - /** - * 主题 - */ - private String topic; - - /** - * 标签 - */ - private String tags; - /** * 访问密钥 */ private String accessKey; - /** * 秘密钥匙 */ private String secretKey; + /** + * 生产者组 + */ + private String group; + /** + * 主题 + */ + private String topic; + /** + * 标签 + */ + private String tags; + } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index a70b6e8620..7668c05b94 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -143,39 +143,38 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { } private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { - // 1. 创建生产者实例,指定生产者组名 + // 1.1 创建生产者实例,指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); + // TODO @puhui999:可以考虑,基于 guava 做 cache,使用 config 作为 key,然后假个 listener 超时,销毁 producer try { - // 2. 设置 NameServer 地址 + // 1.2 设置 NameServer 地址 producer.setNamesrvAddr(config.getNameServer()); - // 3. 启动生产者 + // 1.3 启动生产者 producer.start(); - // 4. 创建消息对象,指定Topic、Tag和消息体 + + // 2.1 创建消息对象,指定Topic、Tag和消息体 Message msg = new Message( config.getTopic(), config.getTags(), message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) ); - - // 5. 发送同步消息并处理结果 + // 2.2 发送同步消息并处理结果 SendResult sendResult = producer.send(msg); - // 6. 处理发送结果 + // 2.3 处理发送结果 if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { - log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", - message, config, sendResult); + log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); } else { - log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", - message, config, sendResult); + log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); } } catch (Exception e) { - log.error("[executeRocketMQ][message({}) config({}) 发送异常]", - message, config, e); + log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e); } finally { - // 7. 关闭生产者 + // 3. 关闭生产者 producer.shutdown(); } } + // TODO @芋艿:测试代码,后续清理 public static void main(String[] args) { // 1. 创建 IotRuleSceneDataBridgeAction 实例 IotRuleSceneDataBridgeAction action = new IotRuleSceneDataBridgeAction();