diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index ca44d11c45..d5707c8b69 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -103,39 +103,65 @@ public class IotGatewayProperties { */ @NotNull(message = "是否开启不能为空") private Boolean enabled; + /** - * HTTP 认证端口 + * HTTP 认证端口(默认:8090) */ private Integer httpAuthPort = 8090; + /** * MQTT 服务器地址 */ + @NotEmpty(message = "MQTT 服务器地址不能为空") private String mqttHost; + /** - * MQTT 服务器端口 + * MQTT 服务器端口(默认:1883) */ - private Integer mqttPort; + @NotNull(message = "MQTT 服务器端口不能为空") + private Integer mqttPort = 1883; + /** * MQTT 用户名 */ + @NotEmpty(message = "MQTT 用户名不能为空") private String mqttUsername; + /** * MQTT 密码 */ + @NotEmpty(message = "MQTT 密码不能为空") private String mqttPassword; + /** - * MQTT 是否开启 SSL + * MQTT 是否开启 SSL(默认:false) */ - private Boolean mqttSsl; + @NotNull(message = "MQTT 是否开启 SSL 不能为空") + private Boolean mqttSsl = false; + /** - * MQTT客户端 ID + * MQTT 客户端 ID(如果为空,系统将自动生成) */ private String mqttClientId; + /** - * MQTT 主题 + * MQTT 主题列表 */ + @NotEmpty(message = "MQTT 主题不能为空") private List mqttTopics; + /** + * 获取 MQTT 客户端 ID,如果未配置则自动生成 + * + * @return MQTT 客户端 ID + */ + public String getMqttClientId() { + if (cn.hutool.core.util.StrUtil.isBlank(mqttClientId)) { + mqttClientId = "iot-gateway-mqtt-" + System.currentTimeMillis(); + } + return mqttClientId; + } + } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java index f8edc1633a..9beeb42034 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -47,19 +48,28 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber connectFuture = mqttClient.connect(finalPort, host) + log.info("[connectMqtt][开始连接 MQTT Broker][host: {}][port: {}]", host, port); + + CompletableFuture connectFuture = mqttClient.connect(port, host) .toCompletionStage() .toCompletableFuture() .thenAccept(connAck -> { - log.info("[connectMqtt][MQTT 客户端连接成功]"); + log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port); // 设置断开重连监听器 mqttClient.closeHandler(closeEvent -> { log.warn("[closeHandler][MQTT 连接已断开,准备重连]"); @@ -257,17 +268,19 @@ public class IotMqttUpstreamProtocol { subscribeToTopics(); }) .exceptionally(error -> { - log.error("[connectMqtt][连接 MQTT Broker 失败]", error); + log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, error); + // 连接失败时也要尝试重连 reconnectWithDelay(); return null; }); // 等待连接完成 try { - connectFuture.get(10, TimeUnit.SECONDS); + connectFuture.get(CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS); log.info("[connectMqtt][MQTT 客户端启动完成]"); } catch (Exception e) { log.error("[connectMqtt][MQTT 客户端启动失败]", e); + throw new RuntimeException("MQTT 客户端启动失败", e); } } @@ -284,15 +297,19 @@ public class IotMqttUpstreamProtocol { */ private void subscribeToTopics() { List topicList = emqxProperties.getMqttTopics(); - if (CollUtil.isEmpty(topicList)) { - log.warn("[subscribeToTopics][没有配置要订阅的主题]"); - return; - } + // @NotEmpty 注解已保证 topicList 不为空,无需重复校验 + + log.info("[subscribeToTopics][开始订阅主题,共 {} 个]", topicList.size()); for (String topic : topicList) { + if (StrUtil.isBlank(topic)) { + log.warn("[subscribeToTopics][跳过空主题]"); + continue; + } + mqttClient.subscribe(topic, DEFAULT_QOS.value(), subscribeResult -> { if (subscribeResult.succeeded()) { - log.info("[subscribeToTopics][订阅主题成功: {}]", topic); + log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, DEFAULT_QOS.value()); } else { log.error("[subscribeToTopics][订阅主题失败: {}]", topic, subscribeResult.cause()); } @@ -304,10 +321,16 @@ public class IotMqttUpstreamProtocol { * 延迟重连 */ private void reconnectWithDelay() { - vertx.setTimer(5000, timerId -> { + vertx.setTimer(RECONNECT_DELAY_MS, timerId -> { if (isRunning && (mqttClient == null || !mqttClient.isConnected())) { - log.info("[reconnectWithDelay][开始重连 MQTT Broker]"); - connectMqtt(); + log.info("[reconnectWithDelay][开始重连 MQTT Broker,延迟 {} 毫秒]", RECONNECT_DELAY_MS); + try { + connectMqtt(); + } catch (Exception e) { + log.error("[reconnectWithDelay][重连失败,将继续尝试重连]", e); + // 重连失败时继续尝试重连 + reconnectWithDelay(); + } } }); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java new file mode 100644 index 0000000000..11ec92e6b5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java @@ -0,0 +1,94 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 协议的处理器抽象基类 + *

+ * 提供通用的异常处理、参数校验等功能 + * + * @author 芋道源码 + */ +@Slf4j +public abstract class IotMqttAbstractHandler { + + /** + * 处理 MQTT 消息的模板方法 + * + * @param topic 主题 + * @param payload 消息内容 + */ + public final void handle(String topic, String payload) { + try { + // 1. 前置校验 + if (!validateInput(topic, payload)) { + return; + } + + // 2. 执行具体逻辑 + doHandle(topic, payload); + + } catch (Exception e) { + log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, payload, e); + handleException(topic, payload, e); + } + } + + /** + * 具体的处理逻辑,由子类实现 + * + * @param topic 主题 + * @param payload 消息内容 + */ + protected abstract void doHandle(String topic, String payload); + + /** + * 输入参数校验 + * + * @param topic 主题 + * @param payload 消息内容 + * @return 校验是否通过 + */ + protected boolean validateInput(String topic, String payload) { + if (StrUtil.isBlank(topic)) { + log.warn("[validateInput][主题为空,忽略消息]"); + return false; + } + + if (StrUtil.isBlank(payload)) { + log.warn("[validateInput][消息内容为空][topic: {}]", topic); + return false; + } + + return true; + } + + /** + * 异常处理 + * + * @param topic 主题 + * @param payload 消息内容 + * @param e 异常 + */ + protected void handleException(String topic, String payload, Exception e) { + // 默认实现:记录错误日志 + // 子类可以重写此方法,添加特定的异常处理逻辑 + log.error("[handleException][MQTT 消息处理异常][topic: {}]", topic, e); + } + + /** + * 解析主题,获取主题各部分 + * + * @param topic 主题 + * @return 主题各部分数组,如果解析失败返回 null + */ + protected String[] parseTopic(String topic) { + String[] topicParts = topic.split("/"); + if (topicParts.length < 7) { + log.warn("[parseTopic][主题格式不正确][topic: {}]", topic); + return null; + } + return topicParts; + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java index ce3cc83aea..6f74555b9a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java @@ -13,16 +13,38 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; import lombok.extern.slf4j.Slf4j; +import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL; + /** * IoT 网关 MQTT HTTP 认证处理器 *

- * 处理 EMQX 的认证请求和事件钩子 + * 处理 EMQX 的认证请求和事件钩子,提供统一的错误处理和参数校验 * * @author 芋道源码 */ @Slf4j public class IotMqttHttpAuthHandler { + /** + * 认证成功状态码 + */ + private static final int SUCCESS_STATUS_CODE = 200; + + /** + * 参数错误状态码 + */ + private static final int BAD_REQUEST_STATUS_CODE = 400; + + /** + * 认证失败状态码 + */ + private static final int UNAUTHORIZED_STATUS_CODE = 401; + + /** + * 服务器错误状态码 + */ + private static final int INTERNAL_ERROR_STATUS_CODE = 500; + /** * EMQX 认证接口 */ @@ -44,7 +66,7 @@ public class IotMqttHttpAuthHandler { // 参数校验 if (StrUtil.isEmpty(clientid) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) { log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientid, username); - sendErrorResponse(context, 400, "认证参数不完整"); + sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "认证参数不完整"); return; } @@ -58,7 +80,7 @@ public class IotMqttHttpAuthHandler { result.checkError(); if (!BooleanUtil.isTrue(result.getData())) { log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientid, username); - sendErrorResponse(context, 401, "设备认证失败"); + sendErrorResponse(context, UNAUTHORIZED_STATUS_CODE, DEVICE_AUTH_FAIL.getMsg()); return; } @@ -67,7 +89,7 @@ public class IotMqttHttpAuthHandler { } catch (Exception e) { log.error("[authenticate][设备认证异常]", e); - sendErrorResponse(context, 500, "认证服务异常"); + sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常"); } } @@ -79,7 +101,7 @@ public class IotMqttHttpAuthHandler { // 解析请求体 JsonObject body = context.body().asJsonObject(); if (body == null) { - sendErrorResponse(context, 400, "请求体不能为空"); + sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空"); return; } @@ -94,7 +116,7 @@ public class IotMqttHttpAuthHandler { } catch (Exception e) { log.error("[connected][处理设备连接事件失败]", e); - sendErrorResponse(context, 500, "处理失败"); + sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败"); } } @@ -106,7 +128,7 @@ public class IotMqttHttpAuthHandler { // 解析请求体 JsonObject body = context.body().asJsonObject(); if (body == null) { - sendErrorResponse(context, 400, "请求体不能为空"); + sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空"); return; } @@ -123,7 +145,7 @@ public class IotMqttHttpAuthHandler { } catch (Exception e) { log.error("[disconnected][处理设备断开连接事件失败]", e); - sendErrorResponse(context, 500, "处理失败"); + sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败"); } } @@ -175,7 +197,7 @@ public class IotMqttHttpAuthHandler { */ private void sendSuccessResponse(RoutingContext context, String message) { context.response() - .setStatusCode(200) + .setStatusCode(SUCCESS_STATUS_CODE) .putHeader("Content-Type", "text/plain; charset=utf-8") .end(message); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java index b7bffefa37..75d014c541 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -1,9 +1,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; -import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.mqtt.messages.MqttPublishMessage; @@ -19,14 +17,12 @@ import java.nio.charset.StandardCharsets; * @author 芋道源码 */ @Slf4j -public class IotMqttUpstreamHandler { +public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { - private final IotDeviceMessageProducer deviceMessageProducer; private final IotDeviceMessageService deviceMessageService; private final String serverId; public IotMqttUpstreamHandler(cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol protocol) { - this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class); this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); this.serverId = protocol.getServerId(); } @@ -38,41 +34,22 @@ public class IotMqttUpstreamHandler { String topic = message.topicName(); String payload = message.payload().toString(StandardCharsets.UTF_8); - if (StrUtil.isBlank(topic)) { - log.warn("[handle][主题为空,忽略消息]"); - return; - } - - if (StrUtil.isBlank(payload)) { - log.warn("[handle][消息内容为空][topic: {}]", topic); - return; - } - log.debug("[handle][收到 MQTT 消息][topic: {}]", topic); + // 调用父类的 handle 方法,父类会进行参数校验 handle(topic, payload); } - /** - * 处理 MQTT 消息 - * - * @param topic 主题 - * @param payload 消息内容 - */ - public void handle(String topic, String payload) { - try { - // 1. 识别并验证消息类型 - String messageType = getMessageType(topic); - if (messageType == null) { - log.warn("[handle][未知的消息类型][topic: {}]", topic); - return; - } - - // 2. 处理消息 - processMessage(topic, payload, messageType); - - } catch (Exception e) { - log.error("[handle][处理消息失败][topic: {}][payload: {}]", topic, payload, e); + @Override + protected void doHandle(String topic, String payload) { + // 1. 识别并验证消息类型 + String messageType = getMessageType(topic); + if (messageType == null) { + log.warn("[doHandle][未知的消息类型][topic: {}]", topic); + return; } + + // 2. 处理消息 + processMessage(topic, payload, messageType); } /** @@ -109,39 +86,37 @@ public class IotMqttUpstreamHandler { * @return 消息类型描述,如果不支持返回 null */ private String getMessageType(String topic) { - if (StrUtil.isBlank(topic)) { - return null; - } + // 此方法由 doHandle 调用,topic 已经在父类中校验过,无需重复校验 // 按优先级匹配主题类型,避免误匹配 - // 1. 设备属性上报: /sys/{productKey}/{deviceName}/thing/event/property/post + // 1. 设备属性上报: /sys/{productKey}/{deviceName}/thing/property/post if (isPropertyPostTopic(topic)) { return IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getDescription(); } - // 2. 设备事件上报: /sys/{productKey}/{deviceName}/thing/event/{eventIdentifier}/post + // 2. 设备事件上报: /sys/{productKey}/{deviceName}/thing/{eventIdentifier}/post if (isEventPostTopic(topic)) { return "设备事件上报"; } - // 3. 设备属性设置响应: /sys/{productKey}/{deviceName}/thing/service/property/set_reply + // 3. 设备属性设置响应: /sys/{productKey}/{deviceName}/thing/property/set_reply if (isPropertySetReplyTopic(topic)) { return "设备属性设置响应"; } - // 4. 设备属性获取响应: /sys/{productKey}/{deviceName}/thing/service/property/get_reply + // 4. 设备属性获取响应: /sys/{productKey}/{deviceName}/thing/property/get_reply if (isPropertyGetReplyTopic(topic)) { return "设备属性获取响应"; } - // 5. 设备配置设置响应: /sys/{productKey}/{deviceName}/thing/service/config/set_reply + // 5. 设备配置设置响应: /sys/{productKey}/{deviceName}/thing/config/set_reply if (isConfigSetReplyTopic(topic)) { return IotDeviceTopicEnum.CONFIG_SET_TOPIC.getDescription() + "响应"; } // 6. 设备 OTA 升级响应: - // /sys/{productKey}/{deviceName}/thing/service/ota/upgrade_reply + // /sys/{productKey}/{deviceName}/thing/ota/upgrade_reply if (isOtaUpgradeReplyTopic(topic)) { return IotDeviceTopicEnum.OTA_UPGRADE_TOPIC.getDescription() + "响应"; } @@ -214,18 +189,4 @@ public class IotMqttUpstreamHandler { && !topic.contains("ota"); } - /** - * 解析主题,获取主题各部分 - * - * @param topic 主题 - * @return 主题各部分数组,如果解析失败返回 null - */ - private String[] parseTopic(String topic) { - String[] topicParts = topic.split("/"); - if (topicParts.length < 7) { - log.warn("[parseTopic][主题格式不正确][topic: {}]", topic); - return null; - } - return topicParts; - } } \ No newline at end of file