From a4e80d45fed7fe541e207510aea2ce7581b4c29d Mon Sep 17 00:00:00 2001 From: haohao <1036606149@qq.com> Date: Thu, 12 Jun 2025 18:59:16 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E4=BF=AE=E5=A4=8D=E8=AE=BE=E5=A4=87=E5=B1=9E?= =?UTF-8?q?=E6=80=A7=E4=B8=8A=E6=8A=A5=E5=92=8C=E9=85=8D=E7=BD=AE=E8=AE=BE?= =?UTF-8?q?=E7=BD=AE=E4=B8=BB=E9=A2=98=EF=BC=8C=E4=BC=98=E5=8C=96=20MQTT?= =?UTF-8?q?=20=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0=E7=9B=B8=E5=85=B3=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message/IotDeviceMessageServiceImpl.java | 10 +- .../enums/IotDeviceMessageMethodEnum.java | 4 +- .../gateway/config/IotGatewayProperties.java | 4 + .../iot/gateway/enums/IotDeviceTopicEnum.java | 177 +++++++++++++----- .../mqtt/IotMqttDownstreamSubscriber.java | 16 +- .../mqtt/IotMqttUpstreamProtocol.java | 14 +- .../mqtt/router/IotMqttUpstreamHandler.java | 114 +++++++++-- .../src/main/resources/application-local.yaml | 4 +- .../src/main/resources/application.yaml | 4 +- 9 files changed, 263 insertions(+), 84 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java index 838295fc03..71c98b61f3 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/message/IotDeviceMessageServiceImpl.java @@ -144,12 +144,13 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { // 3. 回复消息。前提:非 _reply 消息,并且非禁用回复的消息 if (IotDeviceMessageUtils.isReplyMessage(message) - || IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod()) - || StrUtil.isEmpty(message.getServerId())) { + || IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod()) + || StrUtil.isEmpty(message.getServerId())) { return; } try { - IotDeviceMessage replyMessage = IotDeviceMessage.replyOf(message.getRequestId(), message.getMethod(), replyData, + IotDeviceMessage replyMessage = IotDeviceMessage.replyOf(message.getRequestId(), message.getMethod(), + replyData, serviceException != null ? serviceException.getCode() : null, serviceException != null ? serviceException.getMessage() : null); sendDeviceMessage(replyMessage, device, message.getServerId()); @@ -175,7 +176,8 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { } // 属性上报 - if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_REPORT.getMethod())) { + if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_REPORT.getMethod()) || + Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) { devicePropertyService.saveDeviceProperty(device, message); return null; } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java index 61b05ad37a..e7947397ef 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java @@ -24,6 +24,7 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable { // ========== 设备属性 ========== // 可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services + PROPERTY_POST("thing.property.post", true), PROPERTY_REPORT("thing.property.report", true), PROPERTY_SET("thing.property.set", false), @@ -33,7 +34,8 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable { ; - public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod).toArray(String[]::new); + public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod) + .toArray(String[]::new); /** * 不进行 reply 回复的方法集合 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 590d1e17e4..ca44d11c45 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -127,6 +127,10 @@ public class IotGatewayProperties { * MQTT 是否开启 SSL */ private Boolean mqttSsl; + /** + * MQTT客户端 ID + */ + private String mqttClientId; /** * MQTT 主题 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/enums/IotDeviceTopicEnum.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/enums/IotDeviceTopicEnum.java index 543b307f27..4a829485b0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/enums/IotDeviceTopicEnum.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/enums/IotDeviceTopicEnum.java @@ -14,52 +14,40 @@ import lombok.RequiredArgsConstructor; @Getter public enum IotDeviceTopicEnum { - // TODO @haohao:SYS_TOPIC_PREFIX、SERVICE_TOPIC_PREFIX、REPLY_SUFFIX 类似这种,要不搞成这个里面的静态变量?不是枚举值 - /** - * 系统主题前缀 - */ - SYS_TOPIC_PREFIX("/sys/", "系统主题前缀"), - - /** - * 服务调用主题前缀 - */ - SERVICE_TOPIC_PREFIX("/thing/service/", "服务调用主题前缀"), - - // TODO @haohao:注释时,中英文之间,有个空格; /** * 设备属性设置主题 - * 请求 Topic:/sys/${productKey}/${deviceName}/thing/service/property/set - * 响应 Topic:/sys/${productKey}/${deviceName}/thing/service/property/set_reply + * 请求 Topic:/sys/${productKey}/${deviceName}/thing/property/set + * 响应 Topic:/sys/${productKey}/${deviceName}/thing/property/set_reply */ - PROPERTY_SET_TOPIC("/thing/service/property/set", "设备属性设置主题"), + PROPERTY_SET_TOPIC("/thing/property/set", "设备属性设置主题"), /** * 设备属性获取主题 - * 请求 Topic:/sys/${productKey}/${deviceName}/thing/service/property/get - * 响应 Topic:/sys/${productKey}/${deviceName}/thing/service/property/get_reply + * 请求 Topic:/sys/${productKey}/${deviceName}/thing/property/get + * 响应 Topic:/sys/${productKey}/${deviceName}/thing/property/get_reply */ - PROPERTY_GET_TOPIC("/thing/service/property/get", "设备属性获取主题"), + PROPERTY_GET_TOPIC("/thing/property/get", "设备属性获取主题"), /** * 设备配置设置主题 - * 请求 Topic:/sys/${productKey}/${deviceName}/thing/service/config/set - * 响应 Topic:/sys/${productKey}/${deviceName}/thing/service/config/set_reply + * 请求 Topic:/sys/${productKey}/${deviceName}/thing/config/set + * 响应 Topic:/sys/${productKey}/${deviceName}/thing/config/set_reply */ - CONFIG_SET_TOPIC("/thing/service/config/set", "设备配置设置主题"), + CONFIG_SET_TOPIC("/thing/config/set", "设备配置设置主题"), /** - * 设备OTA升级主题 - * 请求 Topic:/sys/${productKey}/${deviceName}/thing/service/ota/upgrade - * 响应 Topic:/sys/${productKey}/${deviceName}/thing/service/ota/upgrade_reply + * 设备 OTA 升级主题 + * 请求 Topic:/sys/${productKey}/${deviceName}/thing/ota/upgrade + * 响应 Topic:/sys/${productKey}/${deviceName}/thing/ota/upgrade_reply */ - OTA_UPGRADE_TOPIC("/thing/service/ota/upgrade", "设备OTA升级主题"), + OTA_UPGRADE_TOPIC("/thing/ota/upgrade", "设备 OTA 升级主题"), /** * 设备属性上报主题 * 请求 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post * 响应 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply */ - PROPERTY_POST_TOPIC("/thing/event/property/post", "设备属性上报主题"), + PROPERTY_POST_TOPIC("/thing/property/post", "设备属性上报主题"), /** * 设备事件上报主题前缀 @@ -69,95 +57,194 @@ public enum IotDeviceTopicEnum { /** * 设备事件上报主题后缀 */ - EVENT_POST_TOPIC_SUFFIX("/post", "设备事件上报主题后缀"), + EVENT_POST_TOPIC_SUFFIX("/post", "设备事件上报主题后缀"); + + // ========== 静态常量 ========== + + /** + * 系统主题前缀 + */ + public static final String SYS_TOPIC_PREFIX = "/sys/"; + + /** + * 服务调用主题前缀 + */ + public static final String SERVICE_TOPIC_PREFIX = "/thing/"; /** * 响应主题后缀 */ - REPLY_SUFFIX("_reply", "响应主题后缀"); + public static final String REPLY_SUFFIX = "_reply"; + + // ========== 方法常量 ========== + + /** + * 服务方法前缀 + */ + public static final String SERVICE_METHOD_PREFIX = "thing."; + + /** + * 属性服务方法前缀 + */ + public static final String PROPERTY_SERVICE_METHOD_PREFIX = "thing.property."; + + /** + * 配置服务方法前缀 + */ + public static final String CONFIG_SERVICE_METHOD_PREFIX = "thing.config."; + + /** + * OTA 服务方法前缀 + */ + public static final String OTA_SERVICE_METHOD_PREFIX = "thing.ota."; + + /** + * 属性设置方法 + */ + public static final String PROPERTY_SET_METHOD = "thing.property.set"; + + /** + * 属性获取方法 + */ + public static final String PROPERTY_GET_METHOD = "thing.property.get"; + + // ========== 主题匹配常量 ========== + + /** + * 事件上报主题模式 + */ + public static final String EVENT_POST_TOPIC_PATTERN = "/thing/event/"; + + /** + * 主题后缀:post + */ + public static final String POST_SUFFIX = "/post"; + + /** + * 属性上报主题后缀 + */ + public static final String PROPERTY_POST_SUFFIX = "/thing/property/post"; + + /** + * 属性设置响应主题包含 + */ + public static final String PROPERTY_SET_TOPIC_CONTAINS = "/thing/property/set"; + + /** + * 属性获取响应主题包含 + */ + public static final String PROPERTY_GET_TOPIC_CONTAINS = "/thing/property/get"; + + // ========== MQTT 认证路径常量 ========== + + /** + * MQTT 认证路径 + */ + public static final String MQTT_AUTH_AUTHENTICATE_PATH = "/mqtt/auth/authenticate"; + + /** + * MQTT 连接事件路径 + */ + public static final String MQTT_AUTH_CONNECTED_PATH = "/mqtt/auth/connected"; + + /** + * MQTT 断开事件路径 + */ + public static final String MQTT_AUTH_DISCONNECTED_PATH = "/mqtt/auth/disconnected"; private final String topic; private final String description; + // ========== 工具方法 ========== + + /** + * 构建设备主题前缀 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 设备主题前缀:/sys/{productKey}/{deviceName} + */ + private static String buildDeviceTopicPrefix(String productKey, String deviceName) { + return SYS_TOPIC_PREFIX + productKey + "/" + deviceName; + } + /** * 构建设备服务调用主题 * - * @param productKey 产品Key + * @param productKey 产品 Key * @param deviceName 设备名称 * @param serviceIdentifier 服务标识符 * @return 完整的主题路径 */ public static String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) { - // TODO @haohao:貌似 SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName 是统一的; - return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + - SERVICE_TOPIC_PREFIX.getTopic() + serviceIdentifier; + return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier; } /** * 构建设备属性设置主题 * - * @param productKey 产品Key + * @param productKey 产品 Key * @param deviceName 设备名称 * @return 完整的主题路径 */ public static String buildPropertySetTopic(String productKey, String deviceName) { - return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + PROPERTY_SET_TOPIC.getTopic(); + return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_SET_TOPIC.getTopic(); } /** * 构建设备属性获取主题 * - * @param productKey 产品Key + * @param productKey 产品 Key * @param deviceName 设备名称 * @return 完整的主题路径 */ public static String buildPropertyGetTopic(String productKey, String deviceName) { - return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + PROPERTY_GET_TOPIC.getTopic(); + return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_GET_TOPIC.getTopic(); } /** * 构建设备配置设置主题 * - * @param productKey 产品Key + * @param productKey 产品 Key * @param deviceName 设备名称 * @return 完整的主题路径 */ public static String buildConfigSetTopic(String productKey, String deviceName) { - return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + CONFIG_SET_TOPIC.getTopic(); + return buildDeviceTopicPrefix(productKey, deviceName) + CONFIG_SET_TOPIC.getTopic(); } /** * 构建设备 OTA 升级主题 * - * @param productKey 产品Key + * @param productKey 产品 Key * @param deviceName 设备名称 * @return 完整的主题路径 */ public static String buildOtaUpgradeTopic(String productKey, String deviceName) { - return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + OTA_UPGRADE_TOPIC.getTopic(); + return buildDeviceTopicPrefix(productKey, deviceName) + OTA_UPGRADE_TOPIC.getTopic(); } /** * 构建设备属性上报主题 * - * @param productKey 产品Key + * @param productKey 产品 Key * @param deviceName 设备名称 * @return 完整的主题路径 */ public static String buildPropertyPostTopic(String productKey, String deviceName) { - return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + PROPERTY_POST_TOPIC.getTopic(); + return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_POST_TOPIC.getTopic(); } /** * 构建设备事件上报主题 * - * @param productKey 产品Key + * @param productKey 产品 Key * @param deviceName 设备名称 * @param eventIdentifier 事件标识符 * @return 完整的主题路径 */ public static String buildEventPostTopic(String productKey, String deviceName, String eventIdentifier) { - return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + + return buildDeviceTopicPrefix(productKey, deviceName) + EVENT_POST_TOPIC_PREFIX.getTopic() + eventIdentifier + EVENT_POST_TOPIC_SUFFIX.getTopic(); } @@ -168,7 +255,7 @@ public enum IotDeviceTopicEnum { * @return 响应主题 */ public static String getReplyTopic(String requestTopic) { - return requestTopic + REPLY_SUFFIX.getTopic(); + return requestTopic + REPLY_SUFFIX; } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java index 7b4d4386a3..f8edc1633a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -105,33 +105,33 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber