From af0ff1ce2672b67f0b66528fbd3c68299719134c Mon Sep 17 00:00:00 2001 From: YunaiV Date: Thu, 12 Jun 2025 09:55:17 +0800 Subject: [PATCH] =?UTF-8?q?review=EF=BC=9A=E3=80=90IoT=20=E7=89=A9?= =?UTF-8?q?=E8=81=94=E7=BD=91=E3=80=91mqtt=20broker=20=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../protocol/mqtt/IotMqttDownstreamSubscriber.java | 5 +++-- .../gateway/protocol/mqtt/IotMqttHttpAuthProtocol.java | 2 ++ .../gateway/protocol/mqtt/IotMqttUpstreamProtocol.http | 5 +++-- .../gateway/protocol/mqtt/IotMqttUpstreamProtocol.java | 9 ++++++++- .../protocol/mqtt/router/IotMqttEventHandler.java | 1 + 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java index 8db8db0714..11fa28d272 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -48,15 +48,15 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber { log.error("[connectMqtt][连接 MQTT Broker 失败]", error); reconnectWithDelay(); @@ -178,12 +182,14 @@ public class IotMqttUpstreamProtocol { */ private void subscribeToTopics() { List topicList = emqxProperties.getMqttTopics(); + // TODO @haohao:建议 topicList 直接 validate 校验 if (CollUtil.isEmpty(topicList)) { log.warn("[subscribeToTopics][未配置 MQTT 主题,使用默认主题]"); topicList = List.of("/sys/#"); // 默认订阅所有系统主题 } for (String topic : topicList) { + // TODO @haohao:直接 validate 校验;嘿嘿,主要保证核心逻辑,简单点 if (StrUtil.isBlank(topic)) { log.warn("[subscribeToTopics][跳过空主题]"); continue; @@ -224,7 +230,7 @@ public class IotMqttUpstreamProtocol { return; } - mqttClient.publish(topic, io.vertx.core.buffer.Buffer.buffer(payload), DEFAULT_QOS, false, false) + mqttClient.publish(topic, Buffer.buffer(payload), DEFAULT_QOS, false, false) .onSuccess(v -> log.debug("[publishMessage][发送消息成功][topic: {}]", topic)) .onFailure(err -> log.error("[publishMessage][发送消息失败][topic: {}]", topic, err)); } @@ -238,6 +244,7 @@ public class IotMqttUpstreamProtocol { return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); } + // TODO @haohao:这个要删除哇? /** * 获取 MQTT 客户端 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java index 6b74b9e4e3..074acfca41 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java @@ -56,6 +56,7 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler { log.info("[handle][处理设备事件上报成功][topic: {}]", topic); // 发送响应消息 + // TODO @haohao:这里应该只 ack 哈;reply 在 biz 业务处理了。handleUpstreamDeviceMessage String method = "thing.event." + eventIdentifier + ".post"; sendResponse(topic, JSONUtil.parseObj(payload), method); } catch (Exception e) {