diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index 05d9aa2b96..efdeddde13 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -34,7 +34,7 @@ 5.1.0 3.3.3 - 2.3.1 + 2.3.2 2.2.7 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml index c8972f16b2..15bbaa42de 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml @@ -36,7 +36,6 @@ org.apache.rocketmq rocketmq-spring-boot-starter - true diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml index 780817848b..2a9669c0ee 100644 --- a/yudao-module-iot/yudao-module-iot-biz/pom.xml +++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml @@ -75,6 +75,11 @@ + + + cn.iocoder.boot + yudao-spring-boot-starter-mq + org.pf4j diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java index 568377dd02..b3a863c8be 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java @@ -136,5 +136,42 @@ public class IotDataBridgeDO extends BaseDO { } + /** + * RocketMQ 配置 + */ + @Data + public static class RocketMQConfig implements Config { + + /** + * RocketMQ 名称服务器地址 + */ + private String nameServer; + + /** + * 生产者/消费者组 + */ + private String group; + + /** + * 主题 + */ + private String topic; + + /** + * 标签 + */ + private String tags; + + /** + * 访问密钥 + */ + private String accessKey; + + /** + * 秘密钥匙 + */ + private String secretKey; + + } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index 82b0467140..a70b6e8620 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -13,11 +13,17 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.http.*; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import org.springframework.web.util.UriComponentsBuilder; +import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; @@ -64,6 +70,11 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig()); return; } + // 2.2 执行 RocketMQ 发送消息 + if (IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) { + executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig()); + return; + } // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟; // TODO @芋艿:mq-redis @@ -131,4 +142,66 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { } } + private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { + // 1. 创建生产者实例,指定生产者组名 + DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); + try { + // 2. 设置 NameServer 地址 + producer.setNamesrvAddr(config.getNameServer()); + // 3. 启动生产者 + producer.start(); + // 4. 创建消息对象,指定Topic、Tag和消息体 + Message msg = new Message( + config.getTopic(), + config.getTags(), + message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) + ); + + // 5. 发送同步消息并处理结果 + SendResult sendResult = producer.send(msg); + // 6. 处理发送结果 + if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { + log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", + message, config, sendResult); + } else { + log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", + message, config, sendResult); + } + } catch (Exception e) { + log.error("[executeRocketMQ][message({}) config({}) 发送异常]", + message, config, e); + } finally { + // 7. 关闭生产者 + producer.shutdown(); + } + } + + public static void main(String[] args) { + // 1. 创建 IotRuleSceneDataBridgeAction 实例 + IotRuleSceneDataBridgeAction action = new IotRuleSceneDataBridgeAction(); + + // 2. 创建测试消息 + IotDeviceMessage message = IotDeviceMessage.builder() + .requestId("TEST-001") + .productKey("testProduct") + .deviceName("testDevice") + .deviceKey("testDeviceKey") + .type("property") + .identifier("temperature") + .data("{\"value\": 60}") + .reportTime(LocalDateTime.now()) + .tenantId(1L) + .build(); + + // 3. 创建 RocketMQ 配置 + IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig(); + config.setNameServer("127.0.0.1:9876"); + config.setGroup("test-group"); + config.setTopic("test-topic"); + config.setTags("test-tag"); + + // 4. 执行测试 + action.executeRocketMQ(message, config); + } + }