From 7e49c901562889b22cbe85e153ad0cf23d51d9bd Mon Sep 17 00:00:00 2001 From: YunaiV Date: Fri, 13 Jun 2025 23:13:29 +0800 Subject: [PATCH] =?UTF-8?q?review=EF=BC=9A=E3=80=90IoT=20=E7=89=A9?= =?UTF-8?q?=E8=81=94=E7=BD=91=E3=80=91MqTT=20=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../enums/IotDeviceMessageMethodEnum.java | 5 +- .../gateway/config/IotGatewayProperties.java | 12 ++--- .../mqtt/IotMqttUpstreamProtocol.java | 48 ++++++++----------- .../mqtt/router/IotMqttDownstreamHandler.java | 17 +++---- .../mqtt/router/IotMqttHttpAuthHandler.java | 43 +++++------------ .../mqtt/router/IotMqttUpstreamHandler.java | 47 ++++++++---------- 6 files changed, 68 insertions(+), 104 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java index 267f7b8053..b714153b20 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java @@ -23,12 +23,13 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable { STATE_OFFLINE("thing.state.offline", true), // ========== 设备属性 ========== - // 可参考 - // https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services + // 可参考:https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services + PROPERTY_POST("thing.property.post", true), PROPERTY_SET("thing.property.set", false), // ========== 设备事件 ========== + EVENT_POST("thing.event.post", true), ; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 88d1b36fe0..0101f32aaa 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -105,6 +105,7 @@ public class IotGatewayProperties { @NotNull(message = "是否开启不能为空") private Boolean enabled; + // TODO @haohao:是不是改成 httpPort?不只认证,目前看。 /** * HTTP 认证端口(默认:8090) */ @@ -115,42 +116,35 @@ public class IotGatewayProperties { */ @NotEmpty(message = "MQTT 服务器地址不能为空") private String mqttHost; - /** * MQTT 服务器端口(默认:1883) */ @NotNull(message = "MQTT 服务器端口不能为空") private Integer mqttPort = 1883; - /** * MQTT 用户名 */ @NotEmpty(message = "MQTT 用户名不能为空") private String mqttUsername; - /** * MQTT 密码 */ @NotEmpty(message = "MQTT 密码不能为空") private String mqttPassword; - /** * MQTT 客户端的 SSL 开关 */ @NotNull(message = "MQTT 是否开启 SSL 不能为空") private Boolean mqttSsl = false; - /** * MQTT 客户端 ID(如果为空,系统将自动生成) */ private String mqttClientId; - /** * MQTT 订阅的主题 */ @NotEmpty(message = "MQTT 主题不能为空") - private List mqttTopics; - + private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics; /** * 默认 QoS 级别 *

@@ -164,12 +158,12 @@ public class IotGatewayProperties { * 连接超时时间(秒) */ private Integer connectTimeoutSeconds = 10; - /** * 重连延迟时间(毫秒) */ private Long reconnectDelayMs = 5000L; + // TODO @haohao:貌似可以通过配置文件 + el 表达式;尽量还是配置文件; /** * 获取 MQTT 客户端 ID,如果未配置则自动生成 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java index 398dbaa5dc..29dc0b59aa 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java @@ -35,6 +35,11 @@ public class IotMqttUpstreamProtocol { private final IotGatewayProperties.EmqxProperties emqxProperties; + /** + * 服务运行状态标志 + */ + private volatile boolean isRunning = false; + private Vertx vertx; @Getter @@ -47,11 +52,6 @@ public class IotMqttUpstreamProtocol { // HTTP 认证服务相关 private HttpServer httpAuthServer; - /** - * 服务运行状态标志 - */ - private volatile boolean isRunning = false; - public IotMqttUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) { this.emqxProperties = emqxProperties; this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); @@ -66,13 +66,12 @@ public class IotMqttUpstreamProtocol { log.info("[start][启动 MQTT 协议服务]"); try { - // 1. 创建共享的 Vertx 实例 this.vertx = Vertx.vertx(); - // 2. 启动 HTTP 认证服务 + // 1. 启动 HTTP 认证服务 startHttpAuthServer(); - // 3. 启动 MQTT 客户端 + // 2. 启动 MQTT 客户端 startMqttClient(); isRunning = true; @@ -119,16 +118,15 @@ public class IotMqttUpstreamProtocol { private void startHttpAuthServer() { log.info("[startHttpAuthServer][启动 HTTP 认证服务]"); - // 创建路由 + // 1.1 创建路由 Router router = Router.router(vertx); router.route().handler(BodyHandler.create()); - - // 创建认证处理器 + // 1.2 创建认证处理器 IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler(this); router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(authHandler::handleAuth); router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(authHandler::handleEvent); - // 启动 HTTP 服务器 + // 2. 启动 HTTP 服务器 int authPort = emqxProperties.getHttpAuthPort(); try { httpAuthServer = vertx.createHttpServer() @@ -172,7 +170,7 @@ public class IotMqttUpstreamProtocol { createMqttClient(); // 3. 连接 MQTT Broker(异步连接,不会抛出异常) - connectMqtt(); + connectMqtt(false); log.info("[startMqttClient][MQTT 客户端启动完成,正在异步连接中...]"); } catch (Exception e) { @@ -211,13 +209,6 @@ public class IotMqttUpstreamProtocol { } } - /** - * 连接 MQTT Broker 并订阅主题 - */ - private void connectMqtt() { - connectMqtt(false); - } - /** * 连接 MQTT Broker 并订阅主题 * @@ -227,6 +218,7 @@ public class IotMqttUpstreamProtocol { // 1. 参数校验 String host = emqxProperties.getMqttHost(); Integer port = emqxProperties.getMqttPort(); + // TODO @haohao:这些参数校验,交给 validator; if (StrUtil.isBlank(host)) { log.error("[connectMqtt][MQTT Host 为空, 无法连接]"); throw new IllegalArgumentException("MQTT Host 不能为空"); @@ -246,6 +238,7 @@ public class IotMqttUpstreamProtocol { // 2. 异步连接 mqttClient.connect(port, host, connectResult -> { + // TODO @haohao:if return,减少括号哈; if (connectResult.succeeded()) { if (isReconnect) { log.info("[connectMqtt][MQTT 客户端重连成功, host: {}, port: {}]", host, port); @@ -253,16 +246,15 @@ public class IotMqttUpstreamProtocol { log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port); } - // 3. 设置处理器 + // 设置处理器 setupMqttHandlers(); - - // 4. 订阅主题 + // 订阅主题 subscribeToTopics(); - } else { log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}, isReconnect: {}]", host, port, isReconnect, connectResult.cause()); + // TODO @haohao:体感上,是不是首次必须连接成功?类似 mysql;首次要连接上,然后后续可以重连; if (!isReconnect) { // 首次连接失败时,也要尝试重连 log.warn("[connectMqtt][首次连接失败,将开始重连机制]"); @@ -279,14 +271,11 @@ public class IotMqttUpstreamProtocol { * 创建 MQTT 客户端 */ private void createMqttClient() { - log.debug("[createMqttClient][创建 MQTT 客户端, clientId: {}]", emqxProperties.getMqttClientId()); - MqttClientOptions options = new MqttClientOptions() .setClientId(emqxProperties.getMqttClientId()) .setUsername(emqxProperties.getMqttUsername()) .setPassword(emqxProperties.getMqttPassword()) .setSsl(emqxProperties.getMqttSsl()); - this.mqttClient = MqttClient.create(vertx, options); } @@ -294,6 +283,7 @@ public class IotMqttUpstreamProtocol { * 设置 MQTT 处理器 */ private void setupMqttHandlers() { + // TODO @haohao:mqttClient 一定非空; if (mqttClient == null) { log.warn("[setupMqttHandlers][MQTT 客户端为空,跳过处理器设置]"); return; @@ -311,6 +301,7 @@ public class IotMqttUpstreamProtocol { }); // 设置消息处理器 + // TODO @haohao:upstreamHandler 一定非空; if (upstreamHandler != null) { mqttClient.publishHandler(upstreamHandler::handle); log.debug("[setupMqttHandlers][MQTT 消息处理器设置完成]"); @@ -328,7 +319,6 @@ public class IotMqttUpstreamProtocol { log.warn("[subscribeToTopics][订阅主题列表为空, 跳过订阅]"); return; } - if (mqttClient == null || !mqttClient.isConnected()) { log.warn("[subscribeToTopics][MQTT 客户端未连接, 跳过订阅]"); return; @@ -337,10 +327,12 @@ public class IotMqttUpstreamProtocol { int qos = emqxProperties.getMqttQos(); log.info("[subscribeToTopics][开始订阅主题, 共 {} 个, QoS: {}]", topicList.size(), qos); + // TODO @haohao:使用 atomicinteger 会更合适; int[] successCount = { 0 }; // 使用数组以便在 lambda 中修改 int[] failCount = { 0 }; for (String topic : topicList) { + // TODO @haohao:MqttClient subscribe(Map topics, 是不是更简洁哈; mqttClient.subscribe(topic, qos, subscribeResult -> { if (subscribeResult.succeeded()) { successCount[0]++; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java index 0c19fef193..4599e1f071 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java @@ -15,7 +15,7 @@ import lombok.extern.slf4j.Slf4j; /** * IoT 网关 MQTT 下行消息处理器 *

- * 从消息总线接收到下行消息,然后发布到 MQTT Broker + * 从消息总线接收到下行消息,然后发布到 MQTT Broker,从而被设备所接收 * * @author 芋道源码 */ @@ -23,7 +23,9 @@ import lombok.extern.slf4j.Slf4j; public class IotMqttDownstreamHandler { private final IotMqttUpstreamProtocol protocol; + private final IotDeviceService deviceService; + private final IotDeviceMessageService deviceMessageService; public IotMqttDownstreamHandler(IotMqttUpstreamProtocol protocol) { @@ -41,25 +43,24 @@ public class IotMqttDownstreamHandler { // 1. 获取设备信息(使用缓存) IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId()); if (deviceInfo == null) { - log.warn("[handle][设备信息不存在, deviceId: {}]", message.getDeviceId()); + log.error("[handle][设备信息({})不存在]", message.getDeviceId()); return; } - // 2. 根据方法构建主题 + // 2.1 根据方法构建主题 String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName()); if (StrUtil.isBlank(topic)) { log.warn("[handle][未知的消息方法: {}]", message.getMethod()); return; } - - // 3. 构建载荷 + // 2.2 构建载荷 + // TODO @haohao:这里是不是 encode 就可以发拉?因为本身就 json 化了。 JSONObject payload = buildDownstreamPayload(message); - - // 4. 发布消息 + // 2.3 发布消息 protocol.publishMessage(topic, payload.toString()); - log.debug("[handle][发布下行消息成功, method: {}, topic: {}]", message.getMethod(), topic); } + // TODO @haohao:这个是不是也可以计算;IotDeviceMessageUtils 的 isReplyMessage;这样就直接生成了; /** * 根据方法构建主题 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java index 86e1c451b2..b9dcdb5cce 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java @@ -35,12 +35,10 @@ public class IotMqttHttpAuthHandler { * 认证允许结果 */ private static final String RESULT_ALLOW = "allow"; - /** * 认证拒绝结果 */ private static final String RESULT_DENY = "deny"; - /** * 认证忽略结果 */ @@ -53,14 +51,11 @@ public class IotMqttHttpAuthHandler { private static final String EVENT_CLIENT_DISCONNECTED = "client.disconnected"; private final IotMqttUpstreamProtocol protocol; + private final IotDeviceMessageService deviceMessageService; + private final IotDeviceCommonApi deviceApi; - /** - * 构造器 - * - * @param protocol MQTT 协议实例 - */ public IotMqttHttpAuthHandler(IotMqttUpstreamProtocol protocol) { this.protocol = protocol; this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); @@ -72,19 +67,15 @@ public class IotMqttHttpAuthHandler { */ public void handleAuth(RoutingContext context) { try { - // 解析请求体 + // 参数校验 JsonObject body = parseRequestBody(context); if (body == null) { return; } - String clientId = body.getString("clientid"); String username = body.getString("username"); String password = body.getString("password"); - log.debug("[handleAuth][设备认证请求: clientId={}, username={}]", clientId, username); - - // 参数校验 if (StrUtil.hasEmpty(clientId, username, password)) { log.info("[handleAuth][认证参数不完整: clientId={}, username={}]", clientId, username); sendAuthResponse(context, RESULT_DENY, false, "认证参数不完整"); @@ -94,13 +85,13 @@ public class IotMqttHttpAuthHandler { // 执行设备认证 boolean authResult = performDeviceAuth(clientId, username, password); if (authResult) { + // TODO @haohao:是不是两条 info,直接打认证结果:authResult log.info("[handleAuth][设备认证成功: {}]", username); sendAuthResponse(context, RESULT_ALLOW, false, null); } else { log.info("[handleAuth][设备认证失败: {}]", username); sendAuthResponse(context, RESULT_DENY, false, DEVICE_AUTH_FAIL.getMsg()); } - } catch (Exception e) { log.error("[handleAuth][设备认证异常]", e); sendAuthResponse(context, RESULT_IGNORE, false, "认证服务异常"); @@ -119,10 +110,8 @@ public class IotMqttHttpAuthHandler { if (body == null) { return; } - String event = body.getString("event"); String username = body.getString("username"); - log.debug("[handleEvent][收到事件: {} - {}]", event, username); // 根据事件类型进行分发处理 @@ -138,12 +127,12 @@ public class IotMqttHttpAuthHandler { break; } - // EMQX Webhook 只需要200状态码,无需响应体 + // EMQX Webhook 只需要 200 状态码,无需响应体 context.response().setStatusCode(SUCCESS_STATUS_CODE).end(); - } catch (Exception e) { + // TODO @haohao:body 可以打印出来 log.error("[handleEvent][事件处理失败]", e); - // 即使处理失败,也返回200避免EMQX重试 + // 即使处理失败,也返回 200 避免EMQX重试 context.response().setStatusCode(SUCCESS_STATUS_CODE).end(); } } @@ -163,7 +152,6 @@ public class IotMqttHttpAuthHandler { private void handleClientDisconnected(JsonObject body) { String username = body.getString("username"); String reason = body.getString("reason"); - log.info("[handleClientDisconnected][设备下线: {} ({})]", username, reason); handleDeviceStateChange(username, false); } @@ -184,6 +172,7 @@ public class IotMqttHttpAuthHandler { } return body; } catch (Exception e) { + // TODO @haohao:最好把 body 打印出来; log.error("[parseRequestBody][解析请求体失败]", e); sendAuthResponse(context, RESULT_IGNORE, false, "请求体格式错误"); return null; @@ -201,10 +190,7 @@ public class IotMqttHttpAuthHandler { private boolean performDeviceAuth(String clientId, String username, String password) { try { CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() - .setClientId(clientId) - .setUsername(username) - .setPassword(password)); - + .setClientId(clientId).setUsername(username).setPassword(password)); result.checkError(); return BooleanUtil.isTrue(result.getData()); } catch (Exception e) { @@ -220,11 +206,10 @@ public class IotMqttHttpAuthHandler { * @param online 是否在线 */ private void handleDeviceStateChange(String username, boolean online) { + // 解析设备信息 if (StrUtil.isEmpty(username) || "undefined".equals(username)) { return; } - - // 解析设备信息 IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username); if (deviceInfo == null) { log.debug("[handleDeviceStateChange][跳过非设备连接: {}]", username); @@ -232,6 +217,7 @@ public class IotMqttHttpAuthHandler { } try { + // TODO @haohao:serverId 获取非空,可以忽略掉; String serverId = protocol.getServerId(); if (StrUtil.isEmpty(serverId)) { log.error("[handleDeviceStateChange][获取服务器ID失败]"); @@ -241,15 +227,14 @@ public class IotMqttHttpAuthHandler { // 构建设备状态消息 IotDeviceMessage message = online ? IotDeviceMessage.buildStateOnline() : IotDeviceMessage.buildStateOffline(); - // 发送消息到消息总线 deviceMessageService.sendDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId); + // TODO @haohao:online 不用翻译 log.info("[handleDeviceStateChange][设备状态更新: {}/{} -> {}]", deviceInfo.getProductKey(), deviceInfo.getDeviceName(), online ? "在线" : "离线"); - } catch (Exception e) { log.error("[handleDeviceStateChange][发送设备状态消息失败: {}]", username, e); } @@ -277,10 +262,6 @@ public class IotMqttHttpAuthHandler { // response.put("expire_at", System.currentTimeMillis() / 1000 + 3600); // 记录详细的响应日志(message仅用于日志,不返回给EMQX) - if (StrUtil.isNotEmpty(message)) { - log.debug("[sendAuthResponse][响应详情: result={}, message={}]", result, message); - } - context.response() .setStatusCode(SUCCESS_STATUS_CODE) .putHeader("Content-Type", "application/json; charset=utf-8") diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java index 58aa1d4118..8098f54427 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -20,6 +20,7 @@ import java.util.Arrays; public class IotMqttUpstreamHandler { private final IotDeviceMessageService deviceMessageService; + private final String serverId; public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol) { @@ -30,56 +31,50 @@ public class IotMqttUpstreamHandler { /** * 处理 MQTT 发布消息 */ - public void handle(MqttPublishMessage message) { - String topic = message.topicName(); - byte[] payload = message.payload().getBytes(); - - log.debug("[handle][收到 MQTT 消息, topic: {}]", topic); - + public void handle(MqttPublishMessage mqttMessage) { + String topic = mqttMessage.topicName(); + byte[] payload = mqttMessage.payload().getBytes(); try { // 1. 前置校验 if (StrUtil.isBlank(topic)) { log.warn("[handle][主题为空, 忽略消息]"); return; } - // 注意:payload 可以为空 - // 2. 识别并验证消息类型 + // 2.1 识别并验证消息类型 String messageType = getMessageType(topic); + // TODO @haohao:可以使用 hutool 的,它的字符串拼接更简单; Assert.notNull(messageType, String.format("未知的消息类型, topic(%s)", topic)); - log.debug("[handle][接收到上行消息({}), topic: {}]", messageType, topic); - - // 3. 解析主题,获取 productKey 和 deviceName + // 2.2 解析主题,获取 productKey 和 deviceName + // TODO @haohao:体感 getMessageType 和下面,都 split;是不是一次就 ok 拉;1)split 掉;2)2、3 位置是 productKey、deviceName;3)4 开始还是 method String[] topicParts = topic.split("/"); if (topicParts.length < 4) { - log.warn("[handle][主题格式不正确,无法解析 productKey 和 deviceName][topic: {}]", topic); + log.warn("[handle][topic({}) 格式不正确,无法解析 productKey 和 deviceName]", topic); return; } String productKey = topicParts[2]; String deviceName = topicParts[3]; + // TODO @haohao:是不是要判断,部分为空,就不行呀; if (StrUtil.isAllBlank(productKey, deviceName)) { - log.warn("[handle][主题中 productKey 或 deviceName 为空][topic: {}]", topic); + log.warn("[handle][topic({}) 格式不正确,productKey 和 deviceName 部分为空]", topic); return; } - // 4. 解码消息 - IotDeviceMessage deviceMessage = deviceMessageService.decodeDeviceMessage( - payload, productKey, deviceName); - if (deviceMessage == null) { - log.warn("[handle][消息解码失败][topic: {}]", topic); + // 3. 解码消息 + IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); + if (message == null) { + log.warn("[handle][topic({}) payload({}) 消息解码失败", topic, new String(payload)); return; } - // 5. 发送消息到队列 - deviceMessageService.sendDeviceMessage(deviceMessage, productKey, deviceName, serverId); - - // 6. 记录成功日志 - log.debug("[handle][处理上行消息({})成功, topic: {}]", messageType, topic); + // 4. 发送消息到队列 + deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); } catch (Exception e) { - log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, new String(payload), e); + log.error("[handle][topic({}) payload({}) 处理异常]", topic, new String(payload), e); } } + // TODO @haohao:是不是 getMethodFromTopic? /** * 从主题中,获得消息类型 * @@ -89,9 +84,9 @@ public class IotMqttUpstreamHandler { private String getMessageType(String topic) { String[] topicParts = topic.split("/"); // 约定:topic 第 4 个部分开始为消息类型 - // 例如:/sys/{productKey}/{deviceName}/thing/property/post -> - // thing/property/post + // 例如:/sys/{productKey}/{deviceName}/thing/property/post -> thing/property/post if (topicParts.length > 4) { + // TODO @haohao:是不是 subString 前 3 个,性能更好; return String.join("/", Arrays.copyOfRange(topicParts, 4, topicParts.length)); } return topicParts[topicParts.length - 1];