From d1b2fb41ae70a06cc8374a33f6a94d6af8e80617 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Thu, 12 Jun 2025 22:04:18 +0800 Subject: [PATCH] =?UTF-8?q?review=EF=BC=9A=E3=80=90IoT=20=E7=89=A9?= =?UTF-8?q?=E8=81=94=E7=BD=91=E3=80=91mqtt=20=E5=8D=8F=E8=AE=AE=E7=9A=84?= =?UTF-8?q?=20review?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 3 -- .../http/IotHttpUpstreamProtocol.java | 15 ++++--- .../mqtt/IotMqttDownstreamSubscriber.java | 2 + .../mqtt/IotMqttUpstreamProtocol.java | 42 +++++++++---------- .../mqtt/router/IotMqttAbstractHandler.java | 1 + .../mqtt/router/IotMqttUpstreamHandler.java | 12 +++++- 6 files changed, 41 insertions(+), 34 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index ba9f98d228..32bba95332 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -44,9 +44,6 @@ public class IotGatewayConfiguration { @Slf4j public static class MqttProtocolConfiguration { - /** - * MQTT 统一协议:集成上行协议和HTTP认证协议 - */ @Bean public IotMqttUpstreamProtocol iotMqttUnifiedProtocol(IotGatewayProperties gatewayProperties) { return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpUpstreamProtocol.java index a1ec8c9653..82d651db80 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpUpstreamProtocol.java @@ -11,7 +11,7 @@ import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.BodyHandler; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import lombok.RequiredArgsConstructor; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; /** @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; * * @author 芋道源码 */ -@RequiredArgsConstructor @Slf4j public class IotHttpUpstreamProtocol extends AbstractVerticle { @@ -27,6 +26,14 @@ public class IotHttpUpstreamProtocol extends AbstractVerticle { private HttpServer httpServer; + @Getter + private final String serverId; + + public IotHttpUpstreamProtocol(IotGatewayProperties.HttpProperties httpProperties) { + this.httpProperties = httpProperties; + this.serverId = IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort()); + } + @Override @PostConstruct public void start() { @@ -67,8 +74,4 @@ public class IotHttpUpstreamProtocol extends AbstractVerticle { } } - public String getServerId() { - return IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort()); - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java index 9beeb42034..59cfab0702 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -61,6 +61,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber - * 集成了 MQTT 上行协议和 HTTP 认证协议的功能: - * 1. MQTT 客户端:连接 EMQX,处理设备上行和下行消息 - * 2. HTTP 认证服务:为 EMQX 提供设备认证接口 + * 1. MQTT 客户端:连接 EMQX,消费处理设备上行和下行消息 + * 2. HTTP 认证服务:为 EMQX 提供设备认证、连接、断开接口 * * @author 芋道源码 */ @Slf4j public class IotMqttUpstreamProtocol { + // TODO @haohao:是不是也丢到配置里? /** * 默认 QoS 级别 - 至少一次 */ private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE; + // TODO @haohao:这个也是; /** * 连接超时时间(秒) */ private static final int CONNECT_TIMEOUT_SECONDS = 10; + // TODO @haohao:重连也是; /** * 重连延迟时间(毫秒) */ @@ -53,15 +57,18 @@ public class IotMqttUpstreamProtocol { private final IotGatewayProperties.EmqxProperties emqxProperties; - // 共享资源 private Vertx vertx; + @Getter + private final String serverId; + // MQTT 客户端相关 private MqttClient mqttClient; private IotMqttUpstreamHandler upstreamHandler; // HTTP 认证服务相关 private HttpServer httpAuthServer; + // TODO @haohao:authHandler 可以 local 哈; private IotMqttHttpAuthHandler authHandler; /** @@ -69,11 +76,9 @@ public class IotMqttUpstreamProtocol { */ private volatile boolean isRunning = false; - /** - * 构造函数 - */ public IotMqttUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) { this.emqxProperties = emqxProperties; + this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); } @PostConstruct @@ -98,6 +103,7 @@ public class IotMqttUpstreamProtocol { isRunning = true; log.info("[start][MQTT 统一协议服务启动完成]"); } catch (Exception e) { + // TODO @haohao:失败,是不是直接 System.exit 哈! log.error("[start][MQTT 统一协议服务启动失败]", e); // 启动失败时清理资源 stop(); @@ -169,6 +175,7 @@ public class IotMqttUpstreamProtocol { * 停止 HTTP 认证服务 */ private void stopHttpAuthServer() { + // TODO @haohao:一些 if return 最好搞下; if (httpAuthServer != null) { try { httpAuthServer.close().result(); @@ -195,8 +202,7 @@ public class IotMqttUpstreamProtocol { .setClientId(emqxProperties.getMqttClientId()) .setUsername(emqxProperties.getMqttUsername()) .setPassword(emqxProperties.getMqttPassword()) - .setSsl(ObjUtil.defaultIfNull(emqxProperties.getMqttSsl(), false)); - + .setSsl(emqxProperties.getMqttSsl()); this.mqttClient = MqttClient.create(vertx, options); // 连接 MQTT Broker @@ -256,6 +262,7 @@ public class IotMqttUpstreamProtocol { .toCompletionStage() .toCompletableFuture() .thenAccept(connAck -> { + // TODO @haohao:是不是可以连接完,然后在执行里面;不用 通过 thenAccept 哈; log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port); // 设置断开重连监听器 mqttClient.closeHandler(closeEvent -> { @@ -268,6 +275,7 @@ public class IotMqttUpstreamProtocol { subscribeToTopics(); }) .exceptionally(error -> { + // TODO @haohao:这里的异常,是不是不用重连哈?因为直接就退出了。然后有 closeHandler 监听重连了; log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, error); // 连接失败时也要尝试重连 reconnectWithDelay(); @@ -297,16 +305,10 @@ public class IotMqttUpstreamProtocol { */ private void subscribeToTopics() { List topicList = emqxProperties.getMqttTopics(); - // @NotEmpty 注解已保证 topicList 不为空,无需重复校验 log.info("[subscribeToTopics][开始订阅主题,共 {} 个]", topicList.size()); for (String topic : topicList) { - if (StrUtil.isBlank(topic)) { - log.warn("[subscribeToTopics][跳过空主题]"); - continue; - } - mqttClient.subscribe(topic, DEFAULT_QOS.value(), subscribeResult -> { if (subscribeResult.succeeded()) { log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, DEFAULT_QOS.value()); @@ -322,6 +324,7 @@ public class IotMqttUpstreamProtocol { */ private void reconnectWithDelay() { vertx.setTimer(RECONNECT_DELAY_MS, timerId -> { + // TODO @haohao:if return,括号少一些; if (isRunning && (mqttClient == null || !mqttClient.isConnected())) { log.info("[reconnectWithDelay][开始重连 MQTT Broker,延迟 {} 毫秒]", RECONNECT_DELAY_MS); try { @@ -350,11 +353,4 @@ public class IotMqttUpstreamProtocol { } } - /** - * 获取服务器 ID - */ - public String getServerId() { - return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); - } - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java index 11ec92e6b5..a0af65ba18 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; +// TODO @haohao:是不是不用基类哈; /** * IoT 网关 MQTT 协议的处理器抽象基类 *

diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java index 75d014c541..cbdab348f5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; @@ -20,9 +21,10 @@ import java.nio.charset.StandardCharsets; public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { private final IotDeviceMessageService deviceMessageService; + private final String serverId; - public IotMqttUpstreamHandler(cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol protocol) { + public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol) { this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); this.serverId = protocol.getServerId(); } @@ -32,6 +34,7 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { */ public void handle(MqttPublishMessage message) { String topic = message.topicName(); + // TODO @haohao: message.payload().getBytes(); String payload = message.payload().toString(StandardCharsets.UTF_8); log.debug("[handle][收到 MQTT 消息][topic: {}]", topic); @@ -44,6 +47,7 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { // 1. 识别并验证消息类型 String messageType = getMessageType(topic); if (messageType == null) { + // TODO @haohao:log 是不是把 payload 也打印下哈 log.warn("[doHandle][未知的消息类型][topic: {}]", topic); return; } @@ -56,9 +60,11 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { * 处理消息的统一逻辑 */ private void processMessage(String topic, String payload, String messageType) { + // TODO @haohao:messageType 解析,是不是作用不大哈? log.info("[processMessage][接收到{}][topic: {}]", messageType, topic); // 解析主题获取设备信息 + // TODO @haohao:不一定是 7 个哈;阿里云 topic 有点差异的;可以考虑解析到 topicParts[2]、topicParts[3] 的 topic String[] topicParts = parseTopic(topic); if (topicParts == null) { return; @@ -66,19 +72,21 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { String productKey = topicParts[2]; String deviceName = topicParts[3]; + // TODO @haohao:解析不到,可以打个 error log; // 解码消息 byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( messageBytes, productKey, deviceName); - // 发送消息到队列(需要补充设备信息) + // 发送消息到队列 deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); // 记录成功日志 log.info("[processMessage][处理{}成功][topic: {}]", messageType, topic); } + // TODO @haohao:合并下处理;不搞成每个 topic 一个处理; /** * 识别消息类型 *