From f58cf282dd0417b95c17ecd979017cf237d223e9 Mon Sep 17 00:00:00 2001 From: haohao <1036606149@qq.com> Date: Sun, 8 Jun 2025 22:24:32 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E6=96=B0=E5=A2=9E=20MQTT=20=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE=E6=94=AF=E6=8C=81=EF=BC=8C=E5=8C=85=E5=90=AB=E4=B8=8A?= =?UTF-8?q?=E8=A1=8C=E5=92=8C=E4=B8=8B=E8=A1=8C=E6=B6=88=E6=81=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8=EF=BC=8C=E5=AE=8C=E5=96=84=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E8=AE=A4=E8=AF=81=E5=92=8C=E5=B1=9E=E6=80=A7=E3=80=81=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E3=80=81=E6=9C=8D=E5=8A=A1=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E6=9B=B4=E6=96=B0=E9=85=8D=E7=BD=AE=E4=BB=A5?= =?UTF-8?q?=E5=90=AF=E7=94=A8=20EMQX=20=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yudao-module-iot-gateway/pom.xml | 6 + .../config/IotGatewayConfiguration.java | 25 +- .../mqtt/IotMqttDownstreamSubscriber.java | 161 +++++++++++ .../mqtt/IotMqttUpstreamProtocol.java | 267 ++++++++++++++++++ .../mqtt/router/IotMqttAbstractHandler.java | 38 +++ .../mqtt/router/IotMqttAuthRouter.java | 146 ++++++++++ .../mqtt/router/IotMqttEventHandler.java | 120 ++++++++ .../mqtt/router/IotMqttPropertyHandler.java | 147 ++++++++++ .../mqtt/router/IotMqttServiceHandler.java | 121 ++++++++ .../mqtt/router/IotMqttUpstreamRouter.java | 105 +++++++ .../protocol/mqtt/router/package-info.java | 22 ++ .../src/main/resources/application.yaml | 2 +- 12 files changed, 1157 insertions(+), 3 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAuthRouter.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/package-info.java diff --git a/yudao-module-iot/yudao-module-iot-gateway/pom.xml b/yudao-module-iot/yudao-module-iot-gateway/pom.xml index 2871738014..82fc691cda 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/pom.xml +++ b/yudao-module-iot/yudao-module-iot-gateway/pom.xml @@ -41,6 +41,12 @@ io.vertx vertx-web + + + + io.vertx + vertx-mqtt + diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 9a2e99dea0..d730e92782 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -1,9 +1,10 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; -import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -31,7 +32,27 @@ public class IotGatewayConfiguration { @Bean public IotHttpDownstreamSubscriber iotHttpDownstreamSubscriber(IotHttpUpstreamProtocol httpUpstreamProtocol, IotMessageBus messageBus) { - return new IotHttpDownstreamSubscriber(httpUpstreamProtocol,messageBus); + return new IotHttpDownstreamSubscriber(httpUpstreamProtocol, messageBus); + } + } + + /** + * IoT 网关 MQTT 协议配置类 + */ + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.emqx", name = "enabled", havingValue = "true") + @Slf4j + public static class MqttProtocolConfiguration { + + @Bean + public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) { + return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); + } + + @Bean + public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol, + IotMessageBus messageBus) { + return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java new file mode 100644 index 0000000000..53529eddba --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -0,0 +1,161 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; + +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 订阅者:接收下行给设备的消息 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotMqttDownstreamSubscriber implements IotMessageSubscriber { + + private final IotMqttUpstreamProtocol protocol; + private final IotMessageBus messageBus; + + @PostConstruct + public void init() { + messageBus.register(this); + } + + @Override + public String getTopic() { + return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId()); + } + + @Override + public String getGroup() { + // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group + return getTopic(); + } + + @Override + public void onMessage(IotDeviceMessage message) { + log.info("[onMessage][接收到下行消息:{}]", message); + + try { + // 根据消息类型处理不同的下行消息 + String messageType = message.getType(); + switch (messageType) { + case "property": + handlePropertyMessage(message); + break; + case "service": + handleServiceMessage(message); + break; + case "config": + handleConfigMessage(message); + break; + case "ota": + handleOtaMessage(message); + break; + default: + log.warn("[onMessage][未知的消息类型:{}]", messageType); + break; + } + } catch (Exception e) { + log.error("[onMessage][处理下行消息失败:{}]", message, e); + } + } + + /** + * 处理属性相关消息 + * + * @param message 设备消息 + */ + private void handlePropertyMessage(IotDeviceMessage message) { + String identifier = message.getIdentifier(); + String productKey = message.getProductKey(); + String deviceName = message.getDeviceName(); + + if ("set".equals(identifier)) { + // 属性设置 + String topic = IotDeviceTopicEnum.buildPropertySetTopic(productKey, deviceName); + JSONObject payload = buildDownstreamPayload(message, "thing.service.property.set"); + protocol.publishMessage(topic, payload.toString()); + log.info("[handlePropertyMessage][发送属性设置消息][topic: {}]", topic); + } else if ("get".equals(identifier)) { + // 属性获取 + String topic = IotDeviceTopicEnum.buildPropertyGetTopic(productKey, deviceName); + JSONObject payload = buildDownstreamPayload(message, "thing.service.property.get"); + protocol.publishMessage(topic, payload.toString()); + log.info("[handlePropertyMessage][发送属性获取消息][topic: {}]", topic); + } else { + log.warn("[handlePropertyMessage][未知的属性操作:{}]", identifier); + } + } + + /** + * 处理服务调用消息 + * + * @param message 设备消息 + */ + private void handleServiceMessage(IotDeviceMessage message) { + String identifier = message.getIdentifier(); + String productKey = message.getProductKey(); + String deviceName = message.getDeviceName(); + + String topic = IotDeviceTopicEnum.buildServiceTopic(productKey, deviceName, identifier); + JSONObject payload = buildDownstreamPayload(message, "thing.service." + identifier); + protocol.publishMessage(topic, payload.toString()); + log.info("[handleServiceMessage][发送服务调用消息][topic: {}]", topic); + } + + /** + * 处理配置设置消息 + * + * @param message 设备消息 + */ + private void handleConfigMessage(IotDeviceMessage message) { + String productKey = message.getProductKey(); + String deviceName = message.getDeviceName(); + + String topic = IotDeviceTopicEnum.buildConfigSetTopic(productKey, deviceName); + JSONObject payload = buildDownstreamPayload(message, "thing.service.config.set"); + protocol.publishMessage(topic, payload.toString()); + log.info("[handleConfigMessage][发送配置设置消息][topic: {}]", topic); + } + + /** + * 处理 OTA 升级消息 + * + * @param message 设备消息 + */ + private void handleOtaMessage(IotDeviceMessage message) { + String productKey = message.getProductKey(); + String deviceName = message.getDeviceName(); + + String topic = IotDeviceTopicEnum.buildOtaUpgradeTopic(productKey, deviceName); + JSONObject payload = buildDownstreamPayload(message, "thing.service.ota.upgrade"); + protocol.publishMessage(topic, payload.toString()); + log.info("[handleOtaMessage][发送 OTA 升级消息][topic: {}]", topic); + } + + /** + * 构建下行消息载荷 + * + * @param message 设备消息 + * @param method 方法名 + * @return JSON 载荷 + */ + private JSONObject buildDownstreamPayload(IotDeviceMessage message, String method) { + JSONObject payload = new JSONObject(); + payload.set("id", message.getMessageId()); + payload.set("version", "1.0"); + payload.set("method", method); + payload.set("params", message.getData()); + return payload; + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java new file mode 100644 index 0000000000..5d1c581794 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java @@ -0,0 +1,267 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.ObjUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAuthRouter; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * IoT 网关 MQTT 协议:接收设备上行消息 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotMqttUpstreamProtocol { + + /** + * 默认 QoS 级别 + */ + private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE; + + private final IotGatewayProperties.EmqxProperties emqxProperties; + + private Vertx vertx; + private MqttClient mqttClient; + private IotMqttUpstreamRouter messageRouter; + private IotMqttAuthRouter authRouter; + private IotDeviceMessageProducer deviceMessageProducer; + + /** + * 服务运行状态标志 + */ + private volatile boolean isRunning = false; + + @PostConstruct + public void start() { + if (isRunning) { + log.warn("[start][MQTT 协议服务已经在运行中,请勿重复启动]"); + return; + } + log.info("[start][开始启动 MQTT 协议服务]"); + + // 初始化组件 + this.vertx = Vertx.vertx(); + this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class); + this.messageRouter = new IotMqttUpstreamRouter(this); + this.authRouter = new IotMqttAuthRouter(this); + + // 创建 MQTT 客户端 + MqttClientOptions options = new MqttClientOptions() + .setClientId("yudao-iot-gateway-" + IdUtil.fastSimpleUUID()) + .setUsername(emqxProperties.getMqttUsername()) + .setPassword(emqxProperties.getMqttPassword()) + .setSsl(ObjUtil.defaultIfNull(emqxProperties.getMqttSsl(), false)); + + this.mqttClient = MqttClient.create(vertx, options); + + // 连接 MQTT Broker + connectMqtt(); + } + + @PreDestroy + public void stop() { + if (!isRunning) { + log.warn("[stop][MQTT 协议服务已经停止,无需再次停止]"); + return; + } + log.info("[stop][开始停止 MQTT 协议服务]"); + + // 1. 取消 MQTT 主题订阅 + if (mqttClient != null && mqttClient.isConnected()) { + List topicList = emqxProperties.getMqttTopics(); + if (CollUtil.isNotEmpty(topicList)) { + for (String topic : topicList) { + try { + mqttClient.unsubscribe(topic); + log.debug("[stop][取消订阅主题: {}]", topic); + } catch (Exception e) { + log.warn("[stop][取消订阅主题异常: {}]", topic, e); + } + } + } + } + + // 2. 关闭 MQTT 客户端 + try { + if (mqttClient != null && mqttClient.isConnected()) { + mqttClient.disconnect(); + } + } catch (Exception e) { + log.warn("[stop][关闭 MQTT 客户端异常]", e); + } + + // 3. 关闭 Vertx + try { + if (vertx != null) { + vertx.close(); + } + } catch (Exception e) { + log.warn("[stop][关闭 Vertx 异常]", e); + } + + // 4. 更新状态 + isRunning = false; + log.info("[stop][MQTT 协议服务已停止]"); + } + + /** + * 连接 MQTT Broker 并订阅主题 + */ + private void connectMqtt() { + // 检查必要的 MQTT 配置 + String host = emqxProperties.getMqttHost(); + Integer port = emqxProperties.getMqttPort(); + if (StrUtil.isBlank(host)) { + String msg = "[connectMqtt][MQTT Host 为空,无法连接]"; + log.error(msg); + return; + } + if (port == null) { + log.warn("[connectMqtt][MQTT Port 为 null,使用默认端口 1883]"); + port = 1883; // 默认 MQTT 端口 + } + + final Integer finalPort = port; + CompletableFuture connectFuture = mqttClient.connect(finalPort, host) + .toCompletionStage() + .toCompletableFuture() + .thenAccept(connAck -> { + log.info("[connectMqtt][MQTT 客户端连接成功]"); + // 设置断开重连监听器 + mqttClient.closeHandler(closeEvent -> { + log.warn("[closeHandler][MQTT 连接已断开,准备重连]"); + reconnectWithDelay(); + }); + // 设置消息处理器 + setupMessageHandler(); + // 订阅主题 + subscribeToTopics(); + }) + .exceptionally(error -> { + log.error("[connectMqtt][连接 MQTT Broker 失败]", error); + reconnectWithDelay(); + return null; + }); + + // 等待连接完成 + try { + connectFuture.get(10, TimeUnit.SECONDS); + isRunning = true; + log.info("[connectMqtt][MQTT 协议服务启动完成]"); + } catch (Exception e) { + log.error("[connectMqtt][MQTT 协议服务启动失败]", e); + } + } + + /** + * 设置 MQTT 消息处理器 + */ + private void setupMessageHandler() { + mqttClient.publishHandler(messageRouter::route); + log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]"); + } + + /** + * 订阅设备上行消息主题 + */ + private void subscribeToTopics() { + List topicList = emqxProperties.getMqttTopics(); + if (CollUtil.isEmpty(topicList)) { + log.warn("[subscribeToTopics][未配置 MQTT 主题,使用默认主题]"); + topicList = List.of("/sys/#"); // 默认订阅所有系统主题 + } + + for (String topic : topicList) { + if (StrUtil.isBlank(topic)) { + log.warn("[subscribeToTopics][跳过空主题]"); + continue; + } + + mqttClient.subscribe(topic, DEFAULT_QOS.value()) + .onSuccess(ack -> log.info("[subscribeToTopics][订阅主题成功: {}]", topic)) + .onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err)); + } + } + + /** + * 重连 MQTT 客户端 + */ + private void reconnectWithDelay() { + if (!isRunning) { + log.info("[reconnectWithDelay][服务已停止,不再尝试重连]"); + return; + } + + // 默认重连延迟 5 秒 + int reconnectDelayMs = 5000; + vertx.setTimer(reconnectDelayMs, id -> { + log.info("[reconnectWithDelay][开始重新连接 MQTT]"); + connectMqtt(); + }); + } + + /** + * 发布消息到 MQTT + * + * @param topic 主题 + * @param payload 消息内容 + */ + public void publishMessage(String topic, String payload) { + if (mqttClient == null || !mqttClient.isConnected()) { + log.warn("[publishMessage][MQTT 客户端未连接,无法发送消息][topic: {}]", topic); + return; + } + + mqttClient.publish(topic, io.vertx.core.buffer.Buffer.buffer(payload), DEFAULT_QOS, false, false) + .onSuccess(v -> log.debug("[publishMessage][发送消息成功][topic: {}]", topic)) + .onFailure(err -> log.error("[publishMessage][发送消息失败][topic: {}]", topic, err)); + } + + /** + * 获取服务器 ID + * + * @return 服务器 ID + */ + public String getServerId() { + return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); + } + + /** + * 获取 MQTT 客户端 + * + * @return MQTT 客户端 + */ + public MqttClient getMqttClient() { + return mqttClient; + } + + /** + * 获取认证路由器 + * + * @return 认证路由器 + */ + public IotMqttAuthRouter getAuthRouter() { + return authRouter; + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java new file mode 100644 index 0000000000..66b835bf03 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java @@ -0,0 +1,38 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 协议的路由处理器抽象基类 + *

+ * 提供通用的处理方法,所有 MQTT 消息处理器都应继承此类 + * + * @author 芋道源码 + */ +@Slf4j +public abstract class IotMqttAbstractHandler { + + /** + * 处理 MQTT 消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + public abstract void handle(String topic, String payload); + + /** + * 解析主题,获取主题各部分 + * + * @param topic 主题 + * @return 主题各部分数组,如果解析失败返回 null + */ + protected String[] parseTopic(String topic) { + String[] topicParts = topic.split("/"); + if (topicParts.length < 7) { + log.warn("[parseTopic][主题格式不正确][topic: {}]", topic); + return null; + } + return topicParts; + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAuthRouter.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAuthRouter.java new file mode 100644 index 0000000000..d1dda7e563 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAuthRouter.java @@ -0,0 +1,146 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 认证路由器 + *

+ * 处理设备的 MQTT 连接认证和连接状态管理 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotMqttAuthRouter { + + private final IotMqttUpstreamProtocol protocol; + private final IotDeviceMessageProducer deviceMessageProducer; + private final IotDeviceTokenService deviceTokenService; + private final IotDeviceCommonApi deviceCommonApi; + + public IotMqttAuthRouter(IotMqttUpstreamProtocol protocol) { + this.protocol = protocol; + this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class); + this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); + this.deviceCommonApi = SpringUtil.getBean(IotDeviceCommonApi.class); + } + + /** + * 处理设备认证 + * + * @param clientId 客户端 ID + * @param username 用户名 + * @param password 密码 + * @return 认证结果 + */ + public boolean authenticate(String clientId, String username, String password) { + try { + log.info("[authenticate][开始认证设备][clientId: {}][username: {}]", clientId, username); + + // 1. 参数校验 + if (StrUtil.isEmpty(clientId) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) { + log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientId, username); + return false; + } + + // 2. 执行认证 + CommonResult result = deviceCommonApi.authDevice(new IotDeviceAuthReqDTO() + .setClientId(clientId).setUsername(username).setPassword(password)); + result.checkError(); + if (!Boolean.TRUE.equals(result.getData())) { + log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientId, username); + return false; + } + + log.info("[authenticate][设备认证成功][clientId: {}][username: {}]", clientId, username); + return true; + } catch (Exception e) { + log.error("[authenticate][设备认证异常][clientId: {}][username: {}]", clientId, username, e); + return false; + } + } + + /** + * 处理设备连接事件 + * + * @param clientId 客户端 ID + * @param username 用户名 + */ + public void handleClientConnected(String clientId, String username) { + try { + log.info("[handleClientConnected][设备连接][clientId: {}][username: {}]", clientId, username); + + // 解析设备信息并发送上线消息 + handleDeviceStateChange(username, true); + } catch (Exception e) { + log.error("[handleClientConnected][处理设备连接事件异常][clientId: {}][username: {}]", clientId, username, e); + } + } + + /** + * 处理设备断开连接事件 + * + * @param clientId 客户端 ID + * @param username 用户名 + */ + public void handleClientDisconnected(String clientId, String username) { + try { + log.info("[handleClientDisconnected][设备断开连接][clientId: {}][username: {}]", clientId, username); + + // 解析设备信息并发送下线消息 + handleDeviceStateChange(username, false); + } catch (Exception e) { + log.error("[handleClientDisconnected][处理设备断开连接事件异常][clientId: {}][username: {}]", clientId, username, e); + } + } + + /** + * 处理设备状态变化 + * + * @param username 用户名 + * @param online 是否在线 + */ + private void handleDeviceStateChange(String username, boolean online) { + // 解析设备信息 + if (StrUtil.isEmpty(username) || "undefined".equals(username)) { + log.warn("[handleDeviceStateChange][用户名为空或未定义][username: {}]", username); + return; + } + + IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.parseUsername(username); + if (deviceInfo == null) { + log.warn("[handleDeviceStateChange][无法解析设备信息][username: {}]", username); + return; + } + + try { + // 发送设备状态消息 + IotDeviceMessage message = IotDeviceMessage.of( + deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId()); + + if (online) { + message = message.ofStateOnline(); + log.info("[handleDeviceStateChange][发送设备上线消息成功][username: {}]", username); + } else { + message = message.ofStateOffline(); + log.info("[handleDeviceStateChange][发送设备下线消息成功][username: {}]", username); + } + + deviceMessageProducer.sendDeviceMessage(message); + } catch (Exception e) { + log.error("[handleDeviceStateChange][发送设备状态消息失败][username: {}][online: {}]", username, online, e); + } + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java new file mode 100644 index 0000000000..49a77c0778 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java @@ -0,0 +1,120 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; +import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +/** + * IoT 网关 MQTT 事件处理器 + *

+ * 处理设备事件相关的 MQTT 消息 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotMqttEventHandler extends IotMqttAbstractHandler { + + private final IotMqttUpstreamProtocol protocol; + private final IotDeviceMessageProducer deviceMessageProducer; + + @Override + public void handle(String topic, String payload) { + try { + log.info("[handle][接收到设备事件上报][topic: {}]", topic); + + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = parseTopic(topic); + if (topicParts == null) { + return; + } + + // 构建设备消息 + String productKey = topicParts[2]; + String deviceName = topicParts[3]; + String eventIdentifier = getEventIdentifier(topicParts, topic); + if (eventIdentifier == null) { + return; + } + + Map eventData = parseEventDataFromPayload(jsonObject); + IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId()); + // 设置事件消息类型和标识符 + message.setType("event"); + message.setIdentifier(eventIdentifier); + message.setData(eventData); + + // 发送消息 + deviceMessageProducer.sendDeviceMessage(message); + log.info("[handle][处理设备事件上报成功][topic: {}]", topic); + + // 发送响应消息 + String method = "thing.event." + eventIdentifier + ".post"; + sendResponse(topic, jsonObject, method); + } catch (Exception e) { + log.error("[handle][处理设备事件上报失败][topic: {}][payload: {}]", topic, payload, e); + } + } + + /** + * 从主题部分中获取事件标识符 + * + * @param topicParts 主题各部分 + * @param topic 原始主题,用于日志 + * @return 事件标识符,如果获取失败返回 null + */ + private String getEventIdentifier(String[] topicParts, String topic) { + try { + return topicParts[6]; + } catch (ArrayIndexOutOfBoundsException e) { + log.warn("[getEventIdentifier][无法从主题中获取事件标识符][topic: {}]", topic); + return null; + } + } + + /** + * 从消息载荷解析事件数据 + * + * @param jsonObject 消息 JSON 对象 + * @return 事件数据映射 + */ + private Map parseEventDataFromPayload(JSONObject jsonObject) { + JSONObject params = jsonObject.getJSONObject("params"); + if (params == null) { + log.warn("[parseEventDataFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject); + return Map.of(); + } + return params; + } + + /** + * 发送响应消息 + * + * @param topic 原始主题 + * @param jsonObject 原始消息 JSON 对象 + * @param method 响应方法 + */ + private void sendResponse(String topic, JSONObject jsonObject, String method) { + String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic); + + // 构建响应消息 + JSONObject response = new JSONObject(); + response.set("id", jsonObject.getStr("id")); + response.set("code", 200); + response.set("method", method); + response.set("data", new JSONObject()); + + // 发送响应 + protocol.publishMessage(replyTopic, response.toString()); + log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java new file mode 100644 index 0000000000..95eccb1aae --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java @@ -0,0 +1,147 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; +import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +/** + * IoT 网关 MQTT 属性处理器 + *

+ * 处理设备属性相关的 MQTT 消息 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotMqttPropertyHandler extends IotMqttAbstractHandler { + + private final IotMqttUpstreamProtocol protocol; + private final IotDeviceMessageProducer deviceMessageProducer; + + @Override + public void handle(String topic, String payload) { + if (topic.endsWith(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic())) { + // 属性上报 + handlePropertyPost(topic, payload); + } else if (topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic())) { + // 属性设置响应 + handlePropertySetReply(topic, payload); + } else if (topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic())) { + // 属性获取响应 + handlePropertyGetReply(topic, payload); + } else { + log.warn("[handle][未知的属性主题][topic: {}]", topic); + } + } + + /** + * 处理设备属性上报消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + private void handlePropertyPost(String topic, String payload) { + try { + log.info("[handlePropertyPost][接收到设备属性上报][topic: {}]", topic); + + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = parseTopic(topic); + if (topicParts == null) { + return; + } + + // 构建设备消息 + String productKey = topicParts[2]; + String deviceName = topicParts[3]; + Map properties = parsePropertiesFromPayload(jsonObject); + + IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId()) + .ofPropertyReport(properties); + + // 发送消息 + deviceMessageProducer.sendDeviceMessage(message); + log.info("[handlePropertyPost][处理设备属性上报成功][topic: {}]", topic); + + // 发送响应消息 + sendResponse(topic, jsonObject, "thing.event.property.post"); + } catch (Exception e) { + log.error("[handlePropertyPost][处理设备属性上报失败][topic: {}][payload: {}]", topic, payload, e); + } + } + + /** + * 处理属性设置响应消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + private void handlePropertySetReply(String topic, String payload) { + try { + log.info("[handlePropertySetReply][接收到属性设置响应][topic: {}]", topic); + // TODO: 处理属性设置响应逻辑 + } catch (Exception e) { + log.error("[handlePropertySetReply][处理属性设置响应失败][topic: {}][payload: {}]", topic, payload, e); + } + } + + /** + * 处理属性获取响应消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + private void handlePropertyGetReply(String topic, String payload) { + try { + log.info("[handlePropertyGetReply][接收到属性获取响应][topic: {}]", topic); + // TODO: 处理属性获取响应逻辑 + } catch (Exception e) { + log.error("[handlePropertyGetReply][处理属性获取响应失败][topic: {}][payload: {}]", topic, payload, e); + } + } + + /** + * 从消息载荷解析属性 + * + * @param jsonObject 消息 JSON 对象 + * @return 属性映射 + */ + private Map parsePropertiesFromPayload(JSONObject jsonObject) { + JSONObject params = jsonObject.getJSONObject("params"); + if (params == null) { + log.warn("[parsePropertiesFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject); + return Map.of(); + } + return params; + } + + /** + * 发送响应消息 + * + * @param topic 原始主题 + * @param jsonObject 原始消息 JSON 对象 + * @param method 响应方法 + */ + private void sendResponse(String topic, JSONObject jsonObject, String method) { + String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic); + + // 构建响应消息 + JSONObject response = new JSONObject(); + response.set("id", jsonObject.getStr("id")); + response.set("code", 200); + response.set("method", method); + response.set("data", new JSONObject()); + + // 发送响应 + protocol.publishMessage(replyTopic, response.toString()); + log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java new file mode 100644 index 0000000000..0a08f0c9e5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java @@ -0,0 +1,121 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; +import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +/** + * IoT 网关 MQTT 服务处理器 + *

+ * 处理设备服务调用相关的 MQTT 消息 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotMqttServiceHandler extends IotMqttAbstractHandler { + + private final IotMqttUpstreamProtocol protocol; + private final IotDeviceMessageProducer deviceMessageProducer; + + @Override + public void handle(String topic, String payload) { + try { + log.info("[handle][接收到设备服务调用响应][topic: {}]", topic); + + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = parseTopic(topic); + if (topicParts == null) { + return; + } + + // 构建设备消息 + String productKey = topicParts[2]; + String deviceName = topicParts[3]; + String serviceIdentifier = getServiceIdentifier(topicParts, topic); + if (serviceIdentifier == null) { + return; + } + + Map serviceData = parseServiceDataFromPayload(jsonObject); + IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId()); + // 设置服务消息类型和标识符 + message.setType("service"); + message.setIdentifier(serviceIdentifier); + message.setData(serviceData); + + // 发送消息 + deviceMessageProducer.sendDeviceMessage(message); + log.info("[handle][处理设备服务调用响应成功][topic: {}]", topic); + + // 发送响应消息 + String method = "thing.service." + serviceIdentifier; + sendResponse(topic, jsonObject, method); + } catch (Exception e) { + log.error("[handle][处理设备服务调用响应失败][topic: {}][payload: {}]", topic, payload, e); + } + } + + /** + * 从主题部分中获取服务标识符 + * + * @param topicParts 主题各部分 + * @param topic 原始主题,用于日志 + * @return 服务标识符,如果获取失败返回 null + */ + private String getServiceIdentifier(String[] topicParts, String topic) { + try { + // 服务主题格式:/sys/{productKey}/{deviceName}/thing/service/{serviceIdentifier} + return topicParts[6]; + } catch (ArrayIndexOutOfBoundsException e) { + log.warn("[getServiceIdentifier][无法从主题中获取服务标识符][topic: {}]", topic); + return null; + } + } + + /** + * 从消息载荷解析服务数据 + * + * @param jsonObject 消息 JSON 对象 + * @return 服务数据映射 + */ + private Map parseServiceDataFromPayload(JSONObject jsonObject) { + JSONObject params = jsonObject.getJSONObject("params"); + if (params == null) { + log.warn("[parseServiceDataFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject); + return Map.of(); + } + return params; + } + + /** + * 发送响应消息 + * + * @param topic 原始主题 + * @param jsonObject 原始消息 JSON 对象 + * @param method 响应方法 + */ + private void sendResponse(String topic, JSONObject jsonObject, String method) { + String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic); + + // 构建响应消息 + JSONObject response = new JSONObject(); + response.set("id", jsonObject.getStr("id")); + response.set("code", 200); + response.set("method", method); + response.set("data", new JSONObject()); + + // 发送响应 + protocol.publishMessage(replyTopic, response.toString()); + log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java new file mode 100644 index 0000000000..c4b37ad148 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java @@ -0,0 +1,105 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; +import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import io.vertx.mqtt.messages.MqttPublishMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 上行路由器 + *

+ * 根据消息主题路由到不同的处理器 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotMqttUpstreamRouter { + + private final IotMqttUpstreamProtocol protocol; + private final IotDeviceMessageProducer deviceMessageProducer; + + // 处理器实例 + private IotMqttPropertyHandler propertyHandler; + private IotMqttEventHandler eventHandler; + private IotMqttServiceHandler serviceHandler; + + public IotMqttUpstreamRouter(IotMqttUpstreamProtocol protocol) { + this.protocol = protocol; + this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class); + // 初始化处理器 + this.propertyHandler = new IotMqttPropertyHandler(protocol, deviceMessageProducer); + this.eventHandler = new IotMqttEventHandler(protocol, deviceMessageProducer); + this.serviceHandler = new IotMqttServiceHandler(protocol, deviceMessageProducer); + } + + /** + * 路由 MQTT 消息 + * + * @param message MQTT 发布消息 + */ + public void route(MqttPublishMessage message) { + String topic = message.topicName(); + String payload = message.payload().toString(); + log.info("[route][接收到 MQTT 消息][topic: {}][payload: {}]", topic, payload); + + try { + if (StrUtil.isEmpty(payload)) { + log.warn("[route][消息内容为空][topic: {}]", topic); + return; + } + + // 根据主题路由到相应的处理器 + if (isPropertyTopic(topic)) { + propertyHandler.handle(topic, payload); + } else if (isEventTopic(topic)) { + eventHandler.handle(topic, payload); + } else if (isServiceTopic(topic)) { + serviceHandler.handle(topic, payload); + } else { + log.warn("[route][未知的消息类型][topic: {}]", topic); + } + } catch (Exception e) { + log.error("[route][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, payload, e); + } + } + + /** + * 判断是否为属性相关主题 + * + * @param topic 主题 + * @return 是否为属性主题 + */ + private boolean isPropertyTopic(String topic) { + return topic.endsWith(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic()) || + topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic()) || + topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic()); + } + + /** + * 判断是否为事件相关主题 + * + * @param topic 主题 + * @return 是否为事件主题 + */ + private boolean isEventTopic(String topic) { + return topic.contains(IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic()) && + topic.endsWith(IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic()); + } + + /** + * 判断是否为服务相关主题 + * + * @param topic 主题 + * @return 是否为服务主题 + */ + private boolean isServiceTopic(String topic) { + return topic.contains(IotDeviceTopicEnum.SERVICE_TOPIC_PREFIX.getTopic()) && + !isPropertyTopic(topic); // 排除属性相关的服务调用 + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/package-info.java new file mode 100644 index 0000000000..57d68d7497 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/package-info.java @@ -0,0 +1,22 @@ +/** + * MQTT 协议路由器包 + *

+ * 包含 MQTT 协议的所有路由处理器和抽象基类: + *

    + *
  • {@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAbstractHandler} + * - 抽象路由处理器基类
  • + *
  • {@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter} + * - 上行消息路由器
  • + *
  • {@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAuthRouter} + * - 认证路由器
  • + *
  • {@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttPropertyHandler} + * - 属性处理器
  • + *
  • {@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttEventHandler} + * - 事件处理器
  • + *
  • {@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttServiceHandler} + * - 服务处理器
  • + *
+ * + * @author 芋道源码 + */ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index b57cc266d1..678569f807 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -37,7 +37,7 @@ yudao: # 针对引入的 EMQX 组件的配置 # ==================================== emqx: - enabled: false + enabled: true mqtt-ssl: false mqtt-topics: - "/sys/#" # 系统主题(设备上报)