From 569eef4a743610606762e3ffd9f077a4d320ef25 Mon Sep 17 00:00:00 2001 From: haohao <1036606149@qq.com> Date: Fri, 13 Jun 2025 11:51:05 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E6=9B=B4=E6=96=B0=E8=AE=BE=E5=A4=87=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C=E9=87=8D?= =?UTF-8?q?=E6=9E=84=20MQTT=20=E4=B8=8B=E8=A1=8C=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=99=A8=EF=BC=8C=E4=BC=98=E5=8C=96=E4=B8=BB?= =?UTF-8?q?=E9=A2=98=E6=9E=84=E5=BB=BA=E5=92=8C=E6=B6=88=E6=81=AF=E5=8F=91?= =?UTF-8?q?=E5=B8=83=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/IotDeviceMessageSubscriber.java | 2 +- .../message/IotDeviceMessageServiceImpl.java | 5 +- .../enums/IotDeviceMessageMethodEnum.java | 10 +- .../iot/core/mq/message/IotDeviceMessage.java | 6 +- .../config/IotGatewayConfiguration.java | 2 +- .../gateway/config/IotGatewayProperties.java | 23 +- .../iot/gateway/enums/IotDeviceTopicEnum.java | 262 ------------------ .../mqtt/IotMqttDownstreamSubscriber.java | 125 +-------- .../mqtt/IotMqttUpstreamProtocol.java | 166 ++++------- .../mqtt/router/IotMqttAbstractHandler.java | 95 ------- .../mqtt/router/IotMqttDownstreamHandler.java | 98 +++++++ .../mqtt/router/IotMqttUpstreamHandler.java | 209 ++++---------- .../iot/gateway/util/IotMqttTopicUtils.java | 90 ++++++ 13 files changed, 344 insertions(+), 749 deletions(-) delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/enums/IotDeviceTopicEnum.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceMessageSubscriber.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceMessageSubscriber.java index 9f975ba32d..3eaa019fd2 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceMessageSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceMessageSubscriber.java @@ -22,7 +22,7 @@ import java.time.LocalDateTime; /** * 针对 {@link IotDeviceMessage} 的业务处理器:调用 method 对应的逻辑。例如说: - * 1. {@link IotDeviceMessageMethodEnum#PROPERTY_REPORT} 属性上报时,记录设备属性 + * 1. {@link IotDeviceMessageMethodEnum#PROPERTY_POST} 属性上报时,记录设备属性 * * @author alwayssuper */ diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java index 30623f5760..f3382f36f7 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java @@ -126,7 +126,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { @Override public void handleUpstreamDeviceMessage(IotDeviceMessage message, IotDeviceDO device) { - // 1. 理消息 + // 1. 处理消息 Object replyData = null; ServiceException serviceException = null; try { @@ -175,8 +175,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { } // 属性上报 - if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_REPORT.getMethod()) || - Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) { + if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) { devicePropertyService.saveDeviceProperty(device, message); return null; } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java index 220b94ae27..267f7b8053 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java @@ -23,15 +23,13 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable { STATE_OFFLINE("thing.state.offline", true), // ========== 设备属性 ========== - // 可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services - // TODO @haohao:使用 report 哈; + // 可参考 + // https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services PROPERTY_POST("thing.property.post", true), - PROPERTY_REPORT("thing.property.report", true), - PROPERTY_SET("thing.property.set", false), - PROPERTY_GET("thing.property.get", false), - // + // ========== 设备事件 ========== + EVENT_POST("thing.event.post", true), ; diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java index adb66c7060..046e75f61f 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java @@ -25,9 +25,9 @@ public class IotDeviceMessage { public static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message"; /** - * 【消息总线】设备消息 Topic,由 iot-biz 发送给 iot-gateway 的某个 “server”(protocol) 进行消费 + * 【消息总线】设备消息 Topic,由 iot-biz 发送给 iot-gateway 的某个 "server"(protocol) 进行消费 * - * 其中,%s 就是该“server”(protocol) 的标识 + * 其中,%s 就是该"server"(protocol) 的标识 */ public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s"; @@ -92,7 +92,7 @@ public class IotDeviceMessage { */ private String msg; - // ========== 基础方法:只传递“codec(编解码)字段” ========== + // ========== 基础方法:只传递"codec(编解码)字段" ========== public static IotDeviceMessage requestOf(String method) { return requestOf(null, method, null); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 32bba95332..d730e92782 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -45,7 +45,7 @@ public class IotGatewayConfiguration { public static class MqttProtocolConfiguration { @Bean - public IotMqttUpstreamProtocol iotMqttUnifiedProtocol(IotGatewayProperties gatewayProperties) { + public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) { return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index d5707c8b69..608ed04138 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -134,7 +134,7 @@ public class IotGatewayProperties { private String mqttPassword; /** - * MQTT 是否开启 SSL(默认:false) + * MQTT 客户端的 SSL 开关 */ @NotNull(message = "MQTT 是否开启 SSL 不能为空") private Boolean mqttSsl = false; @@ -145,11 +145,30 @@ public class IotGatewayProperties { private String mqttClientId; /** - * MQTT 主题列表 + * MQTT 订阅的主题 */ @NotEmpty(message = "MQTT 主题不能为空") private List mqttTopics; + /** + * 默认 QoS 级别 + *

+ * 0 - 最多一次 + * 1 - 至少一次 + * 2 - 刚好一次 + */ + private Integer mqttQos = 1; + + /** + * 连接超时时间(秒) + */ + private Integer connectTimeoutSeconds = 10; + + /** + * 重连延迟时间(毫秒) + */ + private Long reconnectDelayMs = 5000L; + /** * 获取 MQTT 客户端 ID,如果未配置则自动生成 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/enums/IotDeviceTopicEnum.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/enums/IotDeviceTopicEnum.java deleted file mode 100644 index 5b26edc060..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/enums/IotDeviceTopicEnum.java +++ /dev/null @@ -1,262 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.enums; - -import lombok.Getter; -import lombok.RequiredArgsConstructor; - -// TODO @haohao:这个类,我们是不是可以删除了哈? -/** - * IoT 设备主题枚举 - *

- * 用于统一管理 MQTT 协议中的主题常量,基于 Alink 协议规范 - * - * @author haohao - */ -@RequiredArgsConstructor -@Getter -public enum IotDeviceTopicEnum { - - /** - * 设备属性设置主题 - * 请求 Topic:/sys/${productKey}/${deviceName}/thing/property/set - * 响应 Topic:/sys/${productKey}/${deviceName}/thing/property/set_reply - */ - PROPERTY_SET_TOPIC("/thing/property/set", "设备属性设置主题"), - - /** - * 设备属性获取主题 - * 请求 Topic:/sys/${productKey}/${deviceName}/thing/property/get - * 响应 Topic:/sys/${productKey}/${deviceName}/thing/property/get_reply - */ - PROPERTY_GET_TOPIC("/thing/property/get", "设备属性获取主题"), - - /** - * 设备配置设置主题 - * 请求 Topic:/sys/${productKey}/${deviceName}/thing/config/set - * 响应 Topic:/sys/${productKey}/${deviceName}/thing/config/set_reply - */ - CONFIG_SET_TOPIC("/thing/config/set", "设备配置设置主题"), - - /** - * 设备 OTA 升级主题 - * 请求 Topic:/sys/${productKey}/${deviceName}/thing/ota/upgrade - * 响应 Topic:/sys/${productKey}/${deviceName}/thing/ota/upgrade_reply - */ - OTA_UPGRADE_TOPIC("/thing/ota/upgrade", "设备 OTA 升级主题"), - - /** - * 设备属性上报主题 - * 请求 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post - * 响应 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply - */ - PROPERTY_POST_TOPIC("/thing/property/post", "设备属性上报主题"), - - /** - * 设备事件上报主题前缀 - */ - EVENT_POST_TOPIC_PREFIX("/thing/event/", "设备事件上报主题前缀"), - - /** - * 设备事件上报主题后缀 - */ - EVENT_POST_TOPIC_SUFFIX("/post", "设备事件上报主题后缀"); - - // ========== 静态常量 ========== - - /** - * 系统主题前缀 - */ - public static final String SYS_TOPIC_PREFIX = "/sys/"; - - /** - * 服务调用主题前缀 - */ - public static final String SERVICE_TOPIC_PREFIX = "/thing/"; - - /** - * 响应主题后缀 - */ - public static final String REPLY_SUFFIX = "_reply"; - - // ========== 方法常量 ========== - - /** - * 服务方法前缀 - */ - public static final String SERVICE_METHOD_PREFIX = "thing."; - - /** - * 属性服务方法前缀 - */ - public static final String PROPERTY_SERVICE_METHOD_PREFIX = "thing.property."; - - /** - * 配置服务方法前缀 - */ - public static final String CONFIG_SERVICE_METHOD_PREFIX = "thing.config."; - - /** - * OTA 服务方法前缀 - */ - public static final String OTA_SERVICE_METHOD_PREFIX = "thing.ota."; - - /** - * 属性设置方法 - */ - public static final String PROPERTY_SET_METHOD = "thing.property.set"; - - /** - * 属性获取方法 - */ - public static final String PROPERTY_GET_METHOD = "thing.property.get"; - - // ========== 主题匹配常量 ========== - - /** - * 事件上报主题模式 - */ - public static final String EVENT_POST_TOPIC_PATTERN = "/thing/event/"; - - /** - * 主题后缀:post - */ - public static final String POST_SUFFIX = "/post"; - - /** - * 属性上报主题后缀 - */ - public static final String PROPERTY_POST_SUFFIX = "/thing/property/post"; - - /** - * 属性设置响应主题包含 - */ - public static final String PROPERTY_SET_TOPIC_CONTAINS = "/thing/property/set"; - - /** - * 属性获取响应主题包含 - */ - public static final String PROPERTY_GET_TOPIC_CONTAINS = "/thing/property/get"; - - // ========== MQTT 认证路径常量 ========== - - /** - * MQTT 认证路径 - */ - public static final String MQTT_AUTH_AUTHENTICATE_PATH = "/mqtt/auth/authenticate"; - - /** - * MQTT 连接事件路径 - */ - public static final String MQTT_AUTH_CONNECTED_PATH = "/mqtt/auth/connected"; - - /** - * MQTT 断开事件路径 - */ - public static final String MQTT_AUTH_DISCONNECTED_PATH = "/mqtt/auth/disconnected"; - - private final String topic; - private final String description; - - // ========== 工具方法 ========== - - /** - * 构建设备主题前缀 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 设备主题前缀:/sys/{productKey}/{deviceName} - */ - private static String buildDeviceTopicPrefix(String productKey, String deviceName) { - return SYS_TOPIC_PREFIX + productKey + "/" + deviceName; - } - - /** - * 构建设备服务调用主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @param serviceIdentifier 服务标识符 - * @return 完整的主题路径 - */ - public static String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) { - return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier; - } - - /** - * 构建设备属性设置主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildPropertySetTopic(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_SET_TOPIC.getTopic(); - } - - /** - * 构建设备属性获取主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildPropertyGetTopic(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_GET_TOPIC.getTopic(); - } - - /** - * 构建设备配置设置主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildConfigSetTopic(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + CONFIG_SET_TOPIC.getTopic(); - } - - /** - * 构建设备 OTA 升级主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildOtaUpgradeTopic(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + OTA_UPGRADE_TOPIC.getTopic(); - } - - /** - * 构建设备属性上报主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildPropertyPostTopic(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_POST_TOPIC.getTopic(); - } - - /** - * 构建设备事件上报主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @param eventIdentifier 事件标识符 - * @return 完整的主题路径 - */ - public static String buildEventPostTopic(String productKey, String deviceName, String eventIdentifier) { - return buildDeviceTopicPrefix(productKey, deviceName) + - EVENT_POST_TOPIC_PREFIX.getTopic() + eventIdentifier + EVENT_POST_TOPIC_SUFFIX.getTopic(); - } - - /** - * 获取响应主题 - * - * @param requestTopic 请求主题 - * @return 响应主题 - */ - public static String getReplyTopic(String requestTopic) { - return requestTopic + REPLY_SUFFIX; - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java index 59cfab0702..00e0905274 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -1,18 +1,12 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; -import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONObject; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; import jakarta.annotation.PostConstruct; -import jakarta.annotation.Resource; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; /** @@ -20,15 +14,18 @@ import lombok.extern.slf4j.Slf4j; * * @author 芋道源码 */ -@RequiredArgsConstructor @Slf4j public class IotMqttDownstreamSubscriber implements IotMessageSubscriber { - private final IotMqttUpstreamProtocol protocol; + private final IotMqttDownstreamHandler downstreamHandler; private final IotMessageBus messageBus; + private final IotMqttUpstreamProtocol protocol; - @Resource - private IotDeviceService deviceService; + public IotMqttDownstreamSubscriber(IotMqttUpstreamProtocol protocol, IotMessageBus messageBus) { + this.protocol = protocol; + this.messageBus = messageBus; + this.downstreamHandler = new IotMqttDownstreamHandler(protocol); + } @PostConstruct public void init() { @@ -51,7 +48,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber * 1. MQTT 客户端:连接 EMQX,消费处理设备上行和下行消息 * 2. HTTP 认证服务:为 EMQX 提供设备认证、连接、断开接口 @@ -37,24 +34,6 @@ import java.util.concurrent.TimeUnit; @Slf4j public class IotMqttUpstreamProtocol { - // TODO @haohao:是不是也丢到配置里? - /** - * 默认 QoS 级别 - 至少一次 - */ - private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE; - - // TODO @haohao:这个也是; - /** - * 连接超时时间(秒) - */ - private static final int CONNECT_TIMEOUT_SECONDS = 10; - - // TODO @haohao:重连也是; - /** - * 重连延迟时间(毫秒) - */ - private static final long RECONNECT_DELAY_MS = 5000; - private final IotGatewayProperties.EmqxProperties emqxProperties; private Vertx vertx; @@ -68,8 +47,6 @@ public class IotMqttUpstreamProtocol { // HTTP 认证服务相关 private HttpServer httpAuthServer; - // TODO @haohao:authHandler 可以 local 哈; - private IotMqttHttpAuthHandler authHandler; /** * 服务运行状态标志 @@ -92,7 +69,6 @@ public class IotMqttUpstreamProtocol { try { // 1. 创建共享的 Vertx 实例 this.vertx = Vertx.vertx(); - log.info("[start][共享 Vertx 实例创建成功]"); // 2. 启动 HTTP 认证服务 startHttpAuthServer(); @@ -103,7 +79,6 @@ public class IotMqttUpstreamProtocol { isRunning = true; log.info("[start][MQTT 统一协议服务启动完成]"); } catch (Exception e) { - // TODO @haohao:失败,是不是直接 System.exit 哈! log.error("[start][MQTT 统一协议服务启动失败]", e); // 启动失败时清理资源 stop(); @@ -150,12 +125,10 @@ public class IotMqttUpstreamProtocol { router.route().handler(BodyHandler.create()); // 创建认证处理器 - this.authHandler = new IotMqttHttpAuthHandler(); - - // 添加认证路由 - router.post(IotDeviceTopicEnum.MQTT_AUTH_AUTHENTICATE_PATH).handler(authHandler::authenticate); - router.post(IotDeviceTopicEnum.MQTT_AUTH_CONNECTED_PATH).handler(authHandler::connected); - router.post(IotDeviceTopicEnum.MQTT_AUTH_DISCONNECTED_PATH).handler(authHandler::disconnected); + IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler(); + router.post(IotMqttTopicUtils.MQTT_AUTH_AUTHENTICATE_PATH).handler(authHandler::authenticate); + router.post(IotMqttTopicUtils.MQTT_AUTH_CONNECTED_PATH).handler(authHandler::connected); + router.post(IotMqttTopicUtils.MQTT_AUTH_DISCONNECTED_PATH).handler(authHandler::disconnected); // 启动 HTTP 服务器 int authPort = emqxProperties.getHttpAuthPort(); @@ -175,14 +148,14 @@ public class IotMqttUpstreamProtocol { * 停止 HTTP 认证服务 */ private void stopHttpAuthServer() { - // TODO @haohao:一些 if return 最好搞下; - if (httpAuthServer != null) { - try { - httpAuthServer.close().result(); - log.info("[stopHttpAuthServer][HTTP 认证服务已停止]"); - } catch (Exception e) { - log.error("[stopHttpAuthServer][HTTP 认证服务停止失败]", e); - } + if (httpAuthServer == null) { + return; + } + try { + httpAuthServer.close().result(); + log.info("[stopHttpAuthServer][HTTP 认证服务已停止]"); + } catch (Exception e) { + log.error("[stopHttpAuthServer][HTTP 认证服务停止失败]", e); } } @@ -192,10 +165,10 @@ public class IotMqttUpstreamProtocol { private void startMqttClient() { log.info("[startMqttClient][开始启动 MQTT 客户端]"); - // 初始化消息处理器 + // 1. 初始化消息处理器 this.upstreamHandler = new IotMqttUpstreamHandler(this); - // 创建 MQTT 客户端 + // 2. 创建 MQTT 客户端 log.info("[startMqttClient][使用 MQTT 客户端 ID: {}]", emqxProperties.getMqttClientId()); MqttClientOptions options = new MqttClientOptions() @@ -205,7 +178,7 @@ public class IotMqttUpstreamProtocol { .setSsl(emqxProperties.getMqttSsl()); this.mqttClient = MqttClient.create(vertx, options); - // 连接 MQTT Broker + // 3. 连接 MQTT Broker connectMqtt(); } @@ -229,13 +202,13 @@ public class IotMqttUpstreamProtocol { } // 2. 关闭 MQTT 客户端 - try { - if (mqttClient != null && mqttClient.isConnected()) { + if (mqttClient != null && mqttClient.isConnected()) { + try { mqttClient.disconnect(); log.info("[stopMqttClient][MQTT 客户端已断开]"); + } catch (Exception e) { + log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e); } - } catch (Exception e) { - log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e); } } @@ -243,10 +216,9 @@ public class IotMqttUpstreamProtocol { * 连接 MQTT Broker 并订阅主题 */ private void connectMqtt() { - // 参数校验 + // 1. 参数校验 String host = emqxProperties.getMqttHost(); Integer port = emqxProperties.getMqttPort(); - if (StrUtil.isBlank(host)) { log.error("[connectMqtt][MQTT Host 为空,无法连接]"); throw new IllegalArgumentException("MQTT Host 不能为空"); @@ -255,63 +227,47 @@ public class IotMqttUpstreamProtocol { log.error("[connectMqtt][MQTT Port 无效:{}]", port); throw new IllegalArgumentException("MQTT Port 必须为正整数"); } - log.info("[connectMqtt][开始连接 MQTT Broker][host: {}][port: {}]", host, port); - CompletableFuture connectFuture = mqttClient.connect(port, host) - .toCompletionStage() - .toCompletableFuture() - .thenAccept(connAck -> { - // TODO @haohao:是不是可以连接完,然后在执行里面;不用 通过 thenAccept 哈; - log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port); - // 设置断开重连监听器 - mqttClient.closeHandler(closeEvent -> { - log.warn("[closeHandler][MQTT 连接已断开,准备重连]"); - reconnectWithDelay(); - }); - // 设置消息处理器 - setupMessageHandler(); - // 订阅主题 - subscribeToTopics(); - }) - .exceptionally(error -> { - // TODO @haohao:这里的异常,是不是不用重连哈?因为直接就退出了。然后有 closeHandler 监听重连了; - log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, error); - // 连接失败时也要尝试重连 - reconnectWithDelay(); - return null; - }); - - // 等待连接完成 + // 2. 连接 try { - connectFuture.get(CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - log.info("[connectMqtt][MQTT 客户端启动完成]"); + mqttClient.connect(port, host) + .toCompletionStage() + .toCompletableFuture() + .get(emqxProperties.getConnectTimeoutSeconds(), TimeUnit.SECONDS); + log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port); + + // 3. 设置处理器 + // 3.1 设置断开重连监听器 + mqttClient.closeHandler(closeEvent -> { + log.warn("[closeHandler][MQTT 连接已断开,准备重连]"); + reconnectWithDelay(); + }); + // 3.2 设置消息处理器 + mqttClient.publishHandler(upstreamHandler::handle); + log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]"); + + // 4. 订阅主题 + subscribeToTopics(); } catch (Exception e) { - log.error("[connectMqtt][MQTT 客户端启动失败]", e); + log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, e); + reconnectWithDelay(); // 连接失败时,也要尝试重连 throw new RuntimeException("MQTT 客户端启动失败", e); } } - /** - * 设置 MQTT 消息处理器 - */ - private void setupMessageHandler() { - mqttClient.publishHandler(upstreamHandler::handle); - log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]"); - } - /** * 订阅设备上行消息主题 */ private void subscribeToTopics() { List topicList = emqxProperties.getMqttTopics(); + int qos = emqxProperties.getMqttQos(); log.info("[subscribeToTopics][开始订阅主题,共 {} 个]", topicList.size()); - for (String topic : topicList) { - mqttClient.subscribe(topic, DEFAULT_QOS.value(), subscribeResult -> { + mqttClient.subscribe(topic, qos, subscribeResult -> { if (subscribeResult.succeeded()) { - log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, DEFAULT_QOS.value()); + log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, qos); } else { log.error("[subscribeToTopics][订阅主题失败: {}]", topic, subscribeResult.cause()); } @@ -323,17 +279,17 @@ public class IotMqttUpstreamProtocol { * 延迟重连 */ private void reconnectWithDelay() { - vertx.setTimer(RECONNECT_DELAY_MS, timerId -> { - // TODO @haohao:if return,括号少一些; - if (isRunning && (mqttClient == null || !mqttClient.isConnected())) { - log.info("[reconnectWithDelay][开始重连 MQTT Broker,延迟 {} 毫秒]", RECONNECT_DELAY_MS); - try { - connectMqtt(); - } catch (Exception e) { - log.error("[reconnectWithDelay][重连失败,将继续尝试重连]", e); - // 重连失败时继续尝试重连 - reconnectWithDelay(); - } + long delay = emqxProperties.getReconnectDelayMs(); + vertx.setTimer(delay, timerId -> { + if (!isRunning || (mqttClient != null && mqttClient.isConnected())) { + return; + } + log.info("[reconnectWithDelay][开始重连 MQTT Broker,延迟 {} 毫秒]", delay); + try { + connectMqtt(); + } catch (Exception e) { + log.error("[reconnectWithDelay][重连失败,将继续尝试重连]", e); + reconnectWithDelay(); // 失败后,继续尝试 } }); } @@ -345,12 +301,12 @@ public class IotMqttUpstreamProtocol { * @param payload 消息内容 */ public void publishMessage(String topic, String payload) { - if (mqttClient != null && mqttClient.isConnected()) { - mqttClient.publish(topic, Buffer.buffer(payload), DEFAULT_QOS, false, false); - log.debug("[publishMessage][发布消息成功][topic: {}]", topic); - } else { + if (mqttClient == null || !mqttClient.isConnected()) { log.warn("[publishMessage][MQTT 客户端未连接,无法发布消息][topic: {}]", topic); + return; } + MqttQoS qos = MqttQoS.valueOf(emqxProperties.getMqttQos()); + mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java deleted file mode 100644 index a0af65ba18..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java +++ /dev/null @@ -1,95 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; - -import cn.hutool.core.util.StrUtil; -import lombok.extern.slf4j.Slf4j; - -// TODO @haohao:是不是不用基类哈; -/** - * IoT 网关 MQTT 协议的处理器抽象基类 - *

- * 提供通用的异常处理、参数校验等功能 - * - * @author 芋道源码 - */ -@Slf4j -public abstract class IotMqttAbstractHandler { - - /** - * 处理 MQTT 消息的模板方法 - * - * @param topic 主题 - * @param payload 消息内容 - */ - public final void handle(String topic, String payload) { - try { - // 1. 前置校验 - if (!validateInput(topic, payload)) { - return; - } - - // 2. 执行具体逻辑 - doHandle(topic, payload); - - } catch (Exception e) { - log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, payload, e); - handleException(topic, payload, e); - } - } - - /** - * 具体的处理逻辑,由子类实现 - * - * @param topic 主题 - * @param payload 消息内容 - */ - protected abstract void doHandle(String topic, String payload); - - /** - * 输入参数校验 - * - * @param topic 主题 - * @param payload 消息内容 - * @return 校验是否通过 - */ - protected boolean validateInput(String topic, String payload) { - if (StrUtil.isBlank(topic)) { - log.warn("[validateInput][主题为空,忽略消息]"); - return false; - } - - if (StrUtil.isBlank(payload)) { - log.warn("[validateInput][消息内容为空][topic: {}]", topic); - return false; - } - - return true; - } - - /** - * 异常处理 - * - * @param topic 主题 - * @param payload 消息内容 - * @param e 异常 - */ - protected void handleException(String topic, String payload, Exception e) { - // 默认实现:记录错误日志 - // 子类可以重写此方法,添加特定的异常处理逻辑 - log.error("[handleException][MQTT 消息处理异常][topic: {}]", topic, e); - } - - /** - * 解析主题,获取主题各部分 - * - * @param topic 主题 - * @return 主题各部分数组,如果解析失败返回 null - */ - protected String[] parseTopic(String topic) { - String[] topicParts = topic.split("/"); - if (topicParts.length < 7) { - log.warn("[parseTopic][主题格式不正确][topic: {}]", topic); - return null; - } - return topicParts; - } -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java new file mode 100644 index 0000000000..619d4b4957 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java @@ -0,0 +1,98 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.hutool.json.JSONObject; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 下行消息处理器 + *

+ * 从消息总线接收到下行消息,然后发布到 MQTT Broker + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttDownstreamHandler { + + private final IotMqttUpstreamProtocol protocol; + private final IotDeviceService deviceService; + private final IotDeviceMessageService deviceMessageService; + + public IotMqttDownstreamHandler(IotMqttUpstreamProtocol protocol) { + this.protocol = protocol; + this.deviceService = SpringUtil.getBean(IotDeviceService.class); + this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + } + + /** + * 处理下行消息 + * + * @param message 设备消息 + */ + public void handle(IotDeviceMessage message) { + // 1. 获取设备信息(使用缓存) + IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId()); + if (deviceInfo == null) { + log.warn("[handle][设备信息不存在][deviceId: {}]", message.getDeviceId()); + return; + } + + // 2. 根据方法构建主题 + String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + if (StrUtil.isBlank(topic)) { + log.warn("[handle][未知的消息方法:{}]", message.getMethod()); + return; + } + + // 3. 构建载荷 + JSONObject payload = buildDownstreamPayload(message); + + // 4. 发布消息 + protocol.publishMessage(topic, payload.toString()); + log.info("[handle][发布下行消息成功][method: {}][topic: {}]", message.getMethod(), topic); + } + + /** + * 根据方法构建主题 + * + * @param method 消息方法 + * @param productKey 产品标识 + * @param deviceName 设备名称 + * @return 构建的主题,如果方法不支持返回 null + */ + private String buildTopicByMethod(String method, String productKey, String deviceName) { + IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(method); + if (methodEnum == null) { + return null; + } + return switch (methodEnum) { + case PROPERTY_POST -> IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName); + case PROPERTY_SET -> IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName); + default -> null; + }; + + } + + /** + * 构建下行消息载荷 + * + * @param message 设备消息 + * @return JSON 载荷 + */ + private JSONObject buildDownstreamPayload(IotDeviceMessage message) { + // 使用 IotDeviceMessageService 进行消息编码 + IotDeviceRespDTO device = deviceService.getDeviceFromCache(message.getDeviceId()); + byte[] encodedBytes = deviceMessageService.encodeDeviceMessage(message, device.getProductKey(), + device.getDeviceName()); + return new JSONObject(new String(encodedBytes)); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java index cbdab348f5..14b36cbc54 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -1,27 +1,23 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; +import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.StandardCharsets; +import org.springframework.util.Assert; /** - * IoT 网关 MQTT 协议的【上行】处理器 - *

- * 处理设备上行消息,包括事件上报、属性上报、服务调用响应等 + * IoT 网关 MQTT 上行消息处理器 * * @author 芋道源码 */ @Slf4j -public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { +public class IotMqttUpstreamHandler { private final IotDeviceMessageService deviceMessageService; - private final String serverId; public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol) { @@ -34,167 +30,66 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { */ public void handle(MqttPublishMessage message) { String topic = message.topicName(); - // TODO @haohao: message.payload().getBytes(); - String payload = message.payload().toString(StandardCharsets.UTF_8); + byte[] payload = message.payload().getBytes(); log.debug("[handle][收到 MQTT 消息][topic: {}]", topic); - // 调用父类的 handle 方法,父类会进行参数校验 - handle(topic, payload); - } - @Override - protected void doHandle(String topic, String payload) { - // 1. 识别并验证消息类型 - String messageType = getMessageType(topic); - if (messageType == null) { - // TODO @haohao:log 是不是把 payload 也打印下哈 - log.warn("[doHandle][未知的消息类型][topic: {}]", topic); - return; + try { + // 1. 前置校验 + if (StrUtil.isBlank(topic)) { + log.warn("[validateInput][主题为空,忽略消息]"); + return; + } + // 注意:payload 可以为空 + + // 2. 识别并验证消息类型 + String messageType = getMessageType(topic); + Assert.notNull(messageType, String.format("未知的消息类型, topic(%s)", topic)); + log.info("[handle][接收到{}][topic: {}]", messageType, topic); + + // 3. 解析主题,获取 productKey 和 deviceName + String[] topicParts = topic.split("/"); + if (topicParts.length < 4) { + log.warn("[handle][主题格式不正确,无法解析 productKey 和 deviceName][topic: {}]", topic); + return; + } + String productKey = topicParts[2]; + String deviceName = topicParts[3]; + if (StrUtil.isAllBlank(productKey, deviceName)) { + log.warn("[handle][主题中 productKey 或 deviceName 为空][topic: {}]", topic); + return; + } + + // 4. 解码消息 + IotDeviceMessage deviceMessage = deviceMessageService.decodeDeviceMessage( + payload, productKey, deviceName); + if (deviceMessage == null) { + log.warn("[handle][消息解码失败][topic: {}]", topic); + return; + } + + // 5. 发送消息到队列 + deviceMessageService.sendDeviceMessage(deviceMessage, productKey, deviceName, serverId); + + // 6. 记录成功日志 + log.info("[handle][处理{}成功,已转发到 MQ][topic: {}]", messageType, topic); + } catch (Exception e) { + log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, new String(payload), e); } - - // 2. 处理消息 - processMessage(topic, payload, messageType); } /** - * 处理消息的统一逻辑 - */ - private void processMessage(String topic, String payload, String messageType) { - // TODO @haohao:messageType 解析,是不是作用不大哈? - log.info("[processMessage][接收到{}][topic: {}]", messageType, topic); - - // 解析主题获取设备信息 - // TODO @haohao:不一定是 7 个哈;阿里云 topic 有点差异的;可以考虑解析到 topicParts[2]、topicParts[3] 的 topic - String[] topicParts = parseTopic(topic); - if (topicParts == null) { - return; - } - - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - // TODO @haohao:解析不到,可以打个 error log; - - // 解码消息 - byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName); - - // 发送消息到队列 - deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); - - // 记录成功日志 - log.info("[processMessage][处理{}成功][topic: {}]", messageType, topic); - } - - // TODO @haohao:合并下处理;不搞成每个 topic 一个处理; - /** - * 识别消息类型 + * 从主题中,获得消息类型 * * @param topic 主题 - * @return 消息类型描述,如果不支持返回 null + * @return 消息类型 */ private String getMessageType(String topic) { - // 此方法由 doHandle 调用,topic 已经在父类中校验过,无需重复校验 - - // 按优先级匹配主题类型,避免误匹配 - - // 1. 设备属性上报: /sys/{productKey}/{deviceName}/thing/property/post - if (isPropertyPostTopic(topic)) { - return IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getDescription(); + String[] topicParts = topic.split("/"); + if (topicParts.length < 7) { + return null; } - - // 2. 设备事件上报: /sys/{productKey}/{deviceName}/thing/{eventIdentifier}/post - if (isEventPostTopic(topic)) { - return "设备事件上报"; - } - - // 3. 设备属性设置响应: /sys/{productKey}/{deviceName}/thing/property/set_reply - if (isPropertySetReplyTopic(topic)) { - return "设备属性设置响应"; - } - - // 4. 设备属性获取响应: /sys/{productKey}/{deviceName}/thing/property/get_reply - if (isPropertyGetReplyTopic(topic)) { - return "设备属性获取响应"; - } - - // 5. 设备配置设置响应: /sys/{productKey}/{deviceName}/thing/config/set_reply - if (isConfigSetReplyTopic(topic)) { - return IotDeviceTopicEnum.CONFIG_SET_TOPIC.getDescription() + "响应"; - } - - // 6. 设备 OTA 升级响应: - // /sys/{productKey}/{deviceName}/thing/ota/upgrade_reply - if (isOtaUpgradeReplyTopic(topic)) { - return IotDeviceTopicEnum.OTA_UPGRADE_TOPIC.getDescription() + "响应"; - } - - // 7. 其他服务调用响应: 通用服务调用响应 - if (isServiceReplyTopic(topic)) { - return "设备服务调用响应"; - } - - // 不支持的消息类型 - return null; - } - - /** - * 判断是否为属性上报主题 - */ - private boolean isPropertyPostTopic(String topic) { - return topic.contains(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic()); - } - - /** - * 判断是否为事件上报主题 - */ - private boolean isEventPostTopic(String topic) { - return topic.contains(IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic()) - && topic.endsWith(IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic()) - && !topic.contains("property"); // 排除属性上报主题 - } - - /** - * 判断是否为属性设置响应主题 - */ - private boolean isPropertySetReplyTopic(String topic) { - return topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic()) - && topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX); - } - - /** - * 判断是否为属性获取响应主题 - */ - private boolean isPropertyGetReplyTopic(String topic) { - return topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic()) - && topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX); - } - - /** - * 判断是否为配置设置响应主题 - */ - private boolean isConfigSetReplyTopic(String topic) { - return topic.contains(IotDeviceTopicEnum.CONFIG_SET_TOPIC.getTopic()) - && topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX); - } - - /** - * 判断是否为 OTA 升级响应主题 - */ - private boolean isOtaUpgradeReplyTopic(String topic) { - return topic.contains(IotDeviceTopicEnum.OTA_UPGRADE_TOPIC.getTopic()) - && topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX); - } - - /** - * 判断是否为服务调用响应主题(排除已处理的特殊服务) - */ - private boolean isServiceReplyTopic(String topic) { - return topic.contains(IotDeviceTopicEnum.SERVICE_TOPIC_PREFIX) - && topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX) - && !topic.contains("property") - && !topic.contains("config") - && !topic.contains("ota"); + return topicParts[3]; } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java new file mode 100644 index 0000000000..8e25fb14e6 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java @@ -0,0 +1,90 @@ +package cn.iocoder.yudao.module.iot.gateway.util; + +/** + * IoT 网关 MQTT 主题工具类 + *

+ * 用于统一管理 MQTT 协议中的主题常量,基于 Alink 协议规范 + * + * @author 芋道源码 + */ +public final class IotMqttTopicUtils { + + // ========== 静态常量 ========== + + /** + * 系统主题前缀 + */ + private static final String SYS_TOPIC_PREFIX = "/sys/"; + + /** + * 服务调用主题前缀 + */ + private static final String SERVICE_TOPIC_PREFIX = "/thing/"; + + // ========== MQTT 认证路径常量 ========== + + /** + * MQTT 认证路径 + */ + public static final String MQTT_AUTH_AUTHENTICATE_PATH = "/mqtt/auth/authenticate"; + + /** + * MQTT 连接事件路径 + */ + public static final String MQTT_AUTH_CONNECTED_PATH = "/mqtt/auth/connected"; + + /** + * MQTT 断开事件路径 + */ + public static final String MQTT_AUTH_DISCONNECTED_PATH = "/mqtt/auth/disconnected"; + + // ========== 工具方法 ========== + + /** + * 构建设备主题前缀 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 设备主题前缀:/sys/{productKey}/{deviceName} + */ + private static String buildDeviceTopicPrefix(String productKey, String deviceName) { + return SYS_TOPIC_PREFIX + productKey + "/" + deviceName; + } + + /** + * 构建设备属性设置主题 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 完整的主题路径 + */ + public static String buildPropertySetTopic(String productKey, String deviceName) { + return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/set"; + } + + /** + * 构建设备属性上报回复主题 + *

+ * 当设备上报属性时,会收到该主题的回复 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 完整的主题路径 + */ + public static String buildPropertyPostReplyTopic(String productKey, String deviceName) { + return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/post_reply"; + } + + /** + * 构建设备服务调用主题 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @param serviceIdentifier 服务标识符 + * @return 完整的主题路径 + */ + public static String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) { + return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier; + } + +} \ No newline at end of file