diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 608ed04138..88d1b36fe0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.config; +import cn.hutool.core.util.StrUtil; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import lombok.Data; @@ -175,7 +176,7 @@ public class IotGatewayProperties { * @return MQTT 客户端 ID */ public String getMqttClientId() { - if (cn.hutool.core.util.StrUtil.isBlank(mqttClientId)) { + if (StrUtil.isBlank(mqttClientId)) { mqttClientId = "iot-gateway-mqtt-" + System.currentTimeMillis(); } return mqttClientId; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java index 00e0905274..861c3a5496 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -1,6 +1,5 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; -import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -45,13 +44,13 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber { - log.warn("[closeHandler][MQTT 连接已断开,准备重连]"); - reconnectWithDelay(); - }); - // 3.2 设置消息处理器 + // 2. 异步连接 + mqttClient.connect(port, host, connectResult -> { + if (connectResult.succeeded()) { + if (isReconnect) { + log.info("[connectMqtt][MQTT 客户端重连成功, host: {}, port: {}]", host, port); + } else { + log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port); + } + + // 3. 设置处理器 + setupMqttHandlers(); + + // 4. 订阅主题 + subscribeToTopics(); + + } else { + log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}, isReconnect: {}]", + host, port, isReconnect, connectResult.cause()); + + if (!isReconnect) { + // 首次连接失败时,也要尝试重连 + log.warn("[connectMqtt][首次连接失败,将开始重连机制]"); + reconnectWithDelay(); + } else { + // 重连失败时,继续尝试重连 + reconnectWithDelay(); + } + } + }); + } + + /** + * 创建 MQTT 客户端 + */ + private void createMqttClient() { + log.debug("[createMqttClient][创建 MQTT 客户端, clientId: {}]", emqxProperties.getMqttClientId()); + + MqttClientOptions options = new MqttClientOptions() + .setClientId(emqxProperties.getMqttClientId()) + .setUsername(emqxProperties.getMqttUsername()) + .setPassword(emqxProperties.getMqttPassword()) + .setSsl(emqxProperties.getMqttSsl()); + + this.mqttClient = MqttClient.create(vertx, options); + } + + /** + * 设置 MQTT 处理器 + */ + private void setupMqttHandlers() { + if (mqttClient == null) { + log.warn("[setupMqttHandlers][MQTT 客户端为空,跳过处理器设置]"); + return; + } + + // 设置断开重连监听器 + mqttClient.closeHandler(closeEvent -> { + log.warn("[closeHandler][MQTT 连接已断开, 准备重连]"); + reconnectWithDelay(); + }); + + // 设置异常处理器 + mqttClient.exceptionHandler(exception -> { + log.error("[exceptionHandler][MQTT 客户端异常]", exception); + }); + + // 设置消息处理器 + if (upstreamHandler != null) { mqttClient.publishHandler(upstreamHandler::handle); - log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]"); - - // 4. 订阅主题 - subscribeToTopics(); - } catch (Exception e) { - log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, e); - reconnectWithDelay(); // 连接失败时,也要尝试重连 - throw new RuntimeException("MQTT 客户端启动失败", e); + log.debug("[setupMqttHandlers][MQTT 消息处理器设置完成]"); + } else { + log.warn("[setupMqttHandlers][上行消息处理器为空,跳过设置]"); } } @@ -261,15 +325,43 @@ public class IotMqttUpstreamProtocol { */ private void subscribeToTopics() { List topicList = emqxProperties.getMqttTopics(); - int qos = emqxProperties.getMqttQos(); + if (CollUtil.isEmpty(topicList)) { + log.warn("[subscribeToTopics][订阅主题列表为空, 跳过订阅]"); + return; + } + + if (mqttClient == null || !mqttClient.isConnected()) { + log.warn("[subscribeToTopics][MQTT 客户端未连接, 跳过订阅]"); + return; + } + + int qos = emqxProperties.getMqttQos(); + log.info("[subscribeToTopics][开始订阅主题, 共 {} 个, QoS: {}]", topicList.size(), qos); + + int[] successCount = {0}; // 使用数组以便在 lambda 中修改 + int[] failCount = {0}; - log.info("[subscribeToTopics][开始订阅主题,共 {} 个]", topicList.size()); for (String topic : topicList) { mqttClient.subscribe(topic, qos, subscribeResult -> { if (subscribeResult.succeeded()) { - log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, qos); + successCount[0]++; + log.debug("[subscribeToTopics][订阅主题成功, topic: {}, qos: {}]", topic, qos); + + // 当所有主题都处理完成时,记录汇总日志 + if (successCount[0] + failCount[0] == topicList.size()) { + log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]", + successCount[0], failCount[0], topicList.size()); + } } else { - log.error("[subscribeToTopics][订阅主题失败: {}]", topic, subscribeResult.cause()); + failCount[0]++; + log.error("[subscribeToTopics][订阅主题失败, topic: {}, qos: {}, 原因: {}]", + topic, qos, subscribeResult.cause().getMessage(), subscribeResult.cause()); + + // 当所有主题都处理完成时,记录汇总日志 + if (successCount[0] + failCount[0] == topicList.size()) { + log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]", + successCount[0], failCount[0], topicList.size()); + } } }); } @@ -281,15 +373,21 @@ public class IotMqttUpstreamProtocol { private void reconnectWithDelay() { long delay = emqxProperties.getReconnectDelayMs(); vertx.setTimer(delay, timerId -> { - if (!isRunning || (mqttClient != null && mqttClient.isConnected())) { + if (!isRunning) { + log.debug("[reconnectWithDelay][服务已停止, 取消重连]"); return; } - log.info("[reconnectWithDelay][开始重连 MQTT Broker,延迟 {} 毫秒]", delay); + // 检查连接状态,如果已连接则无需重连 + if (mqttClient != null && mqttClient.isConnected()) { + log.debug("[reconnectWithDelay][MQTT 客户端已连接, 无需重连]"); + return; + } + log.info("[reconnectWithDelay][开始重连 MQTT Broker, 延迟: {} ms]", delay); try { - connectMqtt(); + connectMqtt(true); // 标记为重连 } catch (Exception e) { - log.error("[reconnectWithDelay][重连失败,将继续尝试重连]", e); - reconnectWithDelay(); // 失败后,继续尝试 + log.error("[reconnectWithDelay][重连失败, 将继续尝试]", e); + // 重连失败时,不需要重复调用,因为 connectMqtt(true) 内部已经处理了重连逻辑 } }); } @@ -302,7 +400,7 @@ public class IotMqttUpstreamProtocol { */ public void publishMessage(String topic, String payload) { if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("[publishMessage][MQTT 客户端未连接,无法发布消息][topic: {}]", topic); + log.warn("[publishMessage][MQTT 客户端未连接, 无法发布消息到 topic({})]", topic); return; } MqttQoS qos = MqttQoS.valueOf(emqxProperties.getMqttQos()); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java index 619d4b4957..0c19fef193 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java @@ -41,14 +41,14 @@ public class IotMqttDownstreamHandler { // 1. 获取设备信息(使用缓存) IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId()); if (deviceInfo == null) { - log.warn("[handle][设备信息不存在][deviceId: {}]", message.getDeviceId()); + log.warn("[handle][设备信息不存在, deviceId: {}]", message.getDeviceId()); return; } // 2. 根据方法构建主题 String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName()); if (StrUtil.isBlank(topic)) { - log.warn("[handle][未知的消息方法:{}]", message.getMethod()); + log.warn("[handle][未知的消息方法: {}]", message.getMethod()); return; } @@ -57,7 +57,7 @@ public class IotMqttDownstreamHandler { // 4. 发布消息 protocol.publishMessage(topic, payload.toString()); - log.info("[handle][发布下行消息成功][method: {}][topic: {}]", message.getMethod(), topic); + log.debug("[handle][发布下行消息成功, method: {}, topic: {}]", message.getMethod(), topic); } /** diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java index 6f74555b9a..a24999c3cd 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java @@ -8,6 +8,7 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; @@ -45,15 +46,28 @@ public class IotMqttHttpAuthHandler { */ private static final int INTERNAL_ERROR_STATUS_CODE = 500; + /** + * MQTT 协议实例,用于获取服务器ID + */ + private final IotMqttUpstreamProtocol protocol; + + /** + * 构造器 + * + * @param protocol MQTT 协议实例 + */ + public IotMqttHttpAuthHandler(IotMqttUpstreamProtocol protocol) { + this.protocol = protocol; + } + /** * EMQX 认证接口 */ public void authenticate(RoutingContext context) { try { // 解析请求体 - JsonObject body = context.body().asJsonObject(); + JsonObject body = parseRequestBody(context); if (body == null) { - sendErrorResponse(context, 400, "请求体不能为空"); return; } @@ -61,34 +75,23 @@ public class IotMqttHttpAuthHandler { String username = body.getString("username"); String password = body.getString("password"); - log.info("[authenticate][EMQX 设备认证请求][clientId: {}][username: {}]", clientid, username); + log.debug("[authenticate][EMQX 设备认证, clientId: {}, username: {}]", clientid, username); // 参数校验 - if (StrUtil.isEmpty(clientid) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) { - log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientid, username); - sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "认证参数不完整"); + if (!validateAuthParams(context, clientid, username, password)) { return; } // 执行设备认证 - IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); - CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() - .setClientId(clientid) - .setUsername(username) - .setPassword(password)); - - result.checkError(); - if (!BooleanUtil.isTrue(result.getData())) { - log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientid, username); - sendErrorResponse(context, UNAUTHORIZED_STATUS_CODE, DEVICE_AUTH_FAIL.getMsg()); + if (!performDeviceAuth(context, clientid, username, password)) { return; } - log.info("[authenticate][设备认证成功][clientId: {}][username: {}]", clientid, username); + log.debug("[authenticate][设备认证成功, clientId: {}, username: {}]", clientid, username); sendSuccessResponse(context, "认证成功"); } catch (Exception e) { - log.error("[authenticate][设备认证异常]", e); + log.error("[authenticate][设备认证异常, 详细信息: {}]", e.getMessage(), e); sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常"); } } @@ -99,9 +102,8 @@ public class IotMqttHttpAuthHandler { public void connected(RoutingContext context) { try { // 解析请求体 - JsonObject body = context.body().asJsonObject(); + JsonObject body = parseRequestBody(context); if (body == null) { - sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空"); return; } @@ -109,13 +111,14 @@ public class IotMqttHttpAuthHandler { String username = body.getString("username"); Long timestamp = body.getLong("timestamp"); - log.info("[connected][设备连接事件][clientId: {}][username: {}]", clientid, username); + log.debug("[connected][设备连接, clientId: {}, username: {}, timestamp: {}]", + clientid, username, timestamp); - handleDeviceStateChange(username, true); + handleDeviceStateChange(username, true, "设备连接"); sendSuccessResponse(context, "处理成功"); } catch (Exception e) { - log.error("[connected][处理设备连接事件失败]", e); + log.error("[connected][处理设备连接事件失败, 详细信息: {}]", e.getMessage(), e); sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败"); } } @@ -126,9 +129,8 @@ public class IotMqttHttpAuthHandler { public void disconnected(RoutingContext context) { try { // 解析请求体 - JsonObject body = context.body().asJsonObject(); + JsonObject body = parseRequestBody(context); if (body == null) { - sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空"); return; } @@ -137,58 +139,144 @@ public class IotMqttHttpAuthHandler { String reason = body.getString("reason"); Long timestamp = body.getLong("timestamp"); - log.info("[disconnected][设备断开连接事件][clientId: {}][username: {}][reason: {}]", - clientid, username, reason); + log.debug("[disconnected][设备断开连接, clientId: {}, username: {}, reason: {}, timestamp: {}]", + clientid, username, reason, timestamp); - handleDeviceStateChange(username, false); + handleDeviceStateChange(username, false, "设备断开连接,原因:" + reason); sendSuccessResponse(context, "处理成功"); } catch (Exception e) { - log.error("[disconnected][处理设备断开连接事件失败]", e); + log.error("[disconnected][处理设备断开连接事件失败, 详细信息: {}]", e.getMessage(), e); sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败"); } } + /** + * 解析请求体 + * + * @param context 路由上下文 + * @return 请求体JSON对象,解析失败时返回null + */ + private JsonObject parseRequestBody(RoutingContext context) { + try { + JsonObject body = context.body().asJsonObject(); + if (body == null) { + log.warn("[parseRequestBody][请求体为空]"); + sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空"); + return null; + } + return body; + } catch (Exception e) { + log.error("[parseRequestBody][解析请求体失败]", e); + sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体格式错误"); + return null; + } + } + + /** + * 验证认证参数 + * + * @param context 路由上下文 + * @param clientid 客户端ID + * @param username 用户名 + * @param password 密码 + * @return 验证是否通过 + */ + private boolean validateAuthParams(RoutingContext context, String clientid, String username, String password) { + if (StrUtil.hasEmpty(clientid, username, password)) { + log.warn("[validateAuthParams][认证参数不完整, clientId: {}, username: {}, password: {}]", + clientid, username, StrUtil.isNotEmpty(password) ? "***" : "空"); + sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "认证参数不完整"); + return false; + } + return true; + } + + /** + * 执行设备认证 + * + * @param context 路由上下文 + * @param clientid 客户端ID + * @param username 用户名 + * @param password 密码 + * @return 认证是否成功 + */ + private boolean performDeviceAuth(RoutingContext context, String clientid, String username, String password) { + try { + IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() + .setClientId(clientid) + .setUsername(username) + .setPassword(password)); + + result.checkError(); + if (!BooleanUtil.isTrue(result.getData())) { + log.warn("[performDeviceAuth][设备认证失败, clientId: {}, username: {}]", clientid, username); + sendErrorResponse(context, UNAUTHORIZED_STATUS_CODE, DEVICE_AUTH_FAIL.getMsg()); + return false; + } + return true; + } catch (Exception e) { + log.error("[performDeviceAuth][设备认证异常, clientId: {}, username: {}]", clientid, username, e); + sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常"); + return false; + } + } + /** * 处理设备状态变化 * - * @param username 用户名 - * @param online 是否在线 + * @param username 用户名 + * @param online 是否在线 + * @param actionDesc 操作描述 */ - private void handleDeviceStateChange(String username, boolean online) { + private void handleDeviceStateChange(String username, boolean online, String actionDesc) { if (StrUtil.isEmpty(username) || "undefined".equals(username)) { - log.warn("[handleDeviceStateChange][用户名为空或未定义][username: {}]", username); + log.warn("[handleDeviceStateChange][用户名为空或'undefined', username: {}, action: {}]", + username, actionDesc); return; } // 解析设备信息 IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username); if (deviceInfo == null) { - log.warn("[handleDeviceStateChange][无法解析设备信息][username: {}]", username); + log.warn("[handleDeviceStateChange][无法从 username({}) 解析设备信息, action: {}]", + username, actionDesc); return; } try { - // 获取服务器 ID - String serverId = "mqtt_auth_gateway"; + // 从协议实例获取服务器 ID + String serverId = protocol.getServerId(); + if (StrUtil.isEmpty(serverId)) { + log.error("[handleDeviceStateChange][获取服务器ID失败, username: {}, action: {}]", + username, actionDesc); + return; + } // 构建设备状态消息 IotDeviceMessageService deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); IotDeviceMessage message; if (online) { message = IotDeviceMessage.buildStateOnline(); - log.info("[handleDeviceStateChange][发送设备上线消息成功][username: {}]", username); + log.debug("[handleDeviceStateChange][发送设备上线消息, username: {}, serverId: {}]", + username, serverId); } else { message = IotDeviceMessage.buildStateOffline(); - log.info("[handleDeviceStateChange][发送设备下线消息成功][username: {}]", username); + log.debug("[handleDeviceStateChange][发送设备下线消息, username: {}, serverId: {}]", + username, serverId); } // 发送消息到消息总线 deviceMessageService.sendDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId); + + log.info("[handleDeviceStateChange][{}处理成功, productKey: {}, deviceName: {}, serverId: {}]", + actionDesc, deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId); + } catch (Exception e) { - log.error("[handleDeviceStateChange][发送设备状态消息失败][username: {}][online: {}]", - username, online, e); + log.error("[handleDeviceStateChange][发送设备状态消息失败, username: {}, online: {}, action: {}]", + username, online, actionDesc, e); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java index 14b36cbc54..58aa1d4118 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -9,6 +9,8 @@ import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.util.Assert; +import java.util.Arrays; + /** * IoT 网关 MQTT 上行消息处理器 * @@ -32,12 +34,12 @@ public class IotMqttUpstreamHandler { String topic = message.topicName(); byte[] payload = message.payload().getBytes(); - log.debug("[handle][收到 MQTT 消息][topic: {}]", topic); + log.debug("[handle][收到 MQTT 消息, topic: {}]", topic); try { // 1. 前置校验 if (StrUtil.isBlank(topic)) { - log.warn("[validateInput][主题为空,忽略消息]"); + log.warn("[handle][主题为空, 忽略消息]"); return; } // 注意:payload 可以为空 @@ -45,7 +47,7 @@ public class IotMqttUpstreamHandler { // 2. 识别并验证消息类型 String messageType = getMessageType(topic); Assert.notNull(messageType, String.format("未知的消息类型, topic(%s)", topic)); - log.info("[handle][接收到{}][topic: {}]", messageType, topic); + log.debug("[handle][接收到上行消息({}), topic: {}]", messageType, topic); // 3. 解析主题,获取 productKey 和 deviceName String[] topicParts = topic.split("/"); @@ -72,7 +74,7 @@ public class IotMqttUpstreamHandler { deviceMessageService.sendDeviceMessage(deviceMessage, productKey, deviceName, serverId); // 6. 记录成功日志 - log.info("[handle][处理{}成功,已转发到 MQ][topic: {}]", messageType, topic); + log.debug("[handle][处理上行消息({})成功, topic: {}]", messageType, topic); } catch (Exception e) { log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, new String(payload), e); } @@ -86,10 +88,13 @@ public class IotMqttUpstreamHandler { */ private String getMessageType(String topic) { String[] topicParts = topic.split("/"); - if (topicParts.length < 7) { - return null; + // 约定:topic 第 4 个部分开始为消息类型 + // 例如:/sys/{productKey}/{deviceName}/thing/property/post -> + // thing/property/post + if (topicParts.length > 4) { + return String.join("/", Arrays.copyOfRange(topicParts, 4, topicParts.length)); } - return topicParts[3]; + return topicParts[topicParts.length - 1]; } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml index d8a2c80116..21514ddabd 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml @@ -37,6 +37,7 @@ yudao: mqtt-port: 1883 # MQTT Broker 端口 mqtt-username: admin # MQTT 用户名 mqtt-password: public # MQTT 密码 + mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID mqtt-ssl: false # 是否开启 SSL mqtt-topics: - "/sys/#" # 系统主题