diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java index 8db8db0714..11fa28d272 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -48,15 +48,15 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber { log.error("[connectMqtt][连接 MQTT Broker 失败]", error); reconnectWithDelay(); @@ -178,12 +182,14 @@ public class IotMqttUpstreamProtocol { */ private void subscribeToTopics() { List topicList = emqxProperties.getMqttTopics(); + // TODO @haohao:建议 topicList 直接 validate 校验 if (CollUtil.isEmpty(topicList)) { log.warn("[subscribeToTopics][未配置 MQTT 主题,使用默认主题]"); topicList = List.of("/sys/#"); // 默认订阅所有系统主题 } for (String topic : topicList) { + // TODO @haohao:直接 validate 校验;嘿嘿,主要保证核心逻辑,简单点 if (StrUtil.isBlank(topic)) { log.warn("[subscribeToTopics][跳过空主题]"); continue; @@ -224,7 +230,7 @@ public class IotMqttUpstreamProtocol { return; } - mqttClient.publish(topic, io.vertx.core.buffer.Buffer.buffer(payload), DEFAULT_QOS, false, false) + mqttClient.publish(topic, Buffer.buffer(payload), DEFAULT_QOS, false, false) .onSuccess(v -> log.debug("[publishMessage][发送消息成功][topic: {}]", topic)) .onFailure(err -> log.error("[publishMessage][发送消息失败][topic: {}]", topic, err)); } @@ -238,6 +244,7 @@ public class IotMqttUpstreamProtocol { return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); } + // TODO @haohao:这个要删除哇? /** * 获取 MQTT 客户端 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java index 6b74b9e4e3..074acfca41 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java @@ -56,6 +56,7 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler { log.info("[handle][处理设备事件上报成功][topic: {}]", topic); // 发送响应消息 + // TODO @haohao:这里应该只 ack 哈;reply 在 biz 业务处理了。handleUpstreamDeviceMessage String method = "thing.event." + eventIdentifier + ".post"; sendResponse(topic, JSONUtil.parseObj(payload), method); } catch (Exception e) {