diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index f88f1f9db1..ba9f98d228 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -4,7 +4,6 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttHttpAuthProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -45,8 +44,11 @@ public class IotGatewayConfiguration { @Slf4j public static class MqttProtocolConfiguration { + /** + * MQTT 统一协议:集成上行协议和HTTP认证协议 + */ @Bean - public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) { + public IotMqttUpstreamProtocol iotMqttUnifiedProtocol(IotGatewayProperties gatewayProperties) { return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); } @@ -55,14 +57,6 @@ public class IotGatewayConfiguration { IotMessageBus messageBus) { return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus); } - - /** - * MQTT HTTP 认证协议:提供 HTTP 认证接口供 EMQX 调用 - */ - @Bean - public IotMqttHttpAuthProtocol mqttHttpAuthProtocol(IotGatewayProperties gatewayProperties) { - return new IotMqttHttpAuthProtocol(gatewayProperties.getProtocol().getEmqx()); - } } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java index 11fa28d272..7b4d4386a3 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -1,14 +1,14 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; +import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; -import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.RequiredArgsConstructor; @@ -27,7 +27,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber 成功认证 -POST http://localhost:8090/mqtt/auth/authenticate -Content-Type: application/json - -{ - "clientid": "test_product.test_device", - "username": "test_device&test_product", - "password": "test_device_secret_hmac_password" -} - -### 2. MQTT 设备认证接口 => 参数不完整(clientid 为空) -POST http://localhost:8090/mqtt/auth/authenticate -Content-Type: application/json - -{ - "clientid": "", - "username": "test_device&test_product", - "password": "test_device_secret" -} - -### 3. MQTT 设备认证接口 => 参数不完整(username 为空) -POST http://localhost:8090/mqtt/auth/authenticate -Content-Type: application/json - -{ - "clientid": "test_product.test_device", - "username": "", - "password": "test_device_secret" -} - -### 4. MQTT 设备认证接口 => 参数不完整(password 为空) -POST http://localhost:8090/mqtt/auth/authenticate -Content-Type: application/json - -{ - "clientid": "test_product.test_device", - "username": "test_device&test_product", - "password": "" -} - -### 5. MQTT 设备认证接口 => 认证失败(错误的设备密钥) -POST http://localhost:8090/mqtt/auth/authenticate -Content-Type: application/json - -{ - "clientid": "test_product.test_device", - "username": "test_device&test_product", - "password": "wrong_password" -} - -### 6. EMQX 客户端连接事件钩子 => 设备上线 -POST http://localhost:8090/mqtt/auth/connected -Content-Type: application/json - -{ - "clientid": "test_product.test_device", - "username": "test_device&test_product", - "timestamp": 1703808000000, - "peername": "192.168.1.100:52036", - "sockname": "127.0.0.1:1883" -} - -### 7. EMQX 客户端连接事件钩子 => 用户名为空 -POST http://localhost:8090/mqtt/auth/connected -Content-Type: application/json - -{ - "clientid": "test_product.test_device", - "username": "", - "timestamp": 1703808000000, - "peername": "192.168.1.100:52036", - "sockname": "127.0.0.1:1883" -} - -### 8. EMQX 客户端断开连接事件钩子 => 设备下线(正常断开) -POST http://localhost:8090/mqtt/auth/disconnected -Content-Type: application/json - -{ - "clientid": "test_product.test_device", - "username": "test_device&test_product", - "reason": "normal", - "timestamp": 1703808060000 -} - -### 9. 使用自定义端口测试(如果配置了不同端口) -### 假设配置了端口 8091: -### POST http://localhost:8091/mqtt/auth/authenticate -### Content-Type: application/json -### -### { -### "clientid": "test_product.test_device", -### "username": "test_device&test_product", -### "password": "test_device_secret_hmac_password" -### } - -### EMQX 配置参考: -### authentication = [ -### { -### mechanism = http -### method = post -### url = "http://localhost:8090/mqtt/auth/authenticate" # 使用配置的端口 -### body = { -### clientid = "${clientid}" -### username = "${username}" -### password = "${password}" -### } -### headers = { -### "content-type" = "application/json" -### } -### } -### ] -### -### webhooks = [ -### { -### name = "client_connected" -### url = "http://localhost:8090/mqtt/auth/connected" # 使用配置的端口 -### events = ["client.connected"] -### }, -### { -### name = "client_disconnected" -### url = "http://localhost:8090/mqtt/auth/disconnected" # 使用配置的端口 -### events = ["client.disconnected"] -### } -### ] \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java index ddbd81d66a..60587acdbc 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java @@ -6,15 +6,18 @@ import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServer; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -22,11 +25,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** - * IoT 网关 MQTT 协议:接收设备上行消息 + * IoT 网关 MQTT 统一协议 + *

+ * 集成了 MQTT 上行协议和 HTTP 认证协议的功能: + * 1. MQTT 客户端:连接 EMQX,处理设备上行和下行消息 + * 2. HTTP 认证服务:为 EMQX 提供设备认证接口 * * @author 芋道源码 */ -@RequiredArgsConstructor @Slf4j public class IotMqttUpstreamProtocol { @@ -37,30 +43,143 @@ public class IotMqttUpstreamProtocol { private final IotGatewayProperties.EmqxProperties emqxProperties; + // 共享资源 private Vertx vertx; + + // MQTT 客户端相关 private MqttClient mqttClient; - private IotMqttUpstreamRouter messageRouter; + private IotMqttUpstreamHandler upstreamHandler; + + // HTTP 认证服务相关 + private HttpServer httpAuthServer; + private IotMqttHttpAuthHandler authHandler; /** * 服务运行状态标志 */ private volatile boolean isRunning = false; + /** + * 构造函数 + */ + public IotMqttUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) { + this.emqxProperties = emqxProperties; + } + @PostConstruct public void start() { if (isRunning) { - log.warn("[start][MQTT 协议服务已经在运行中,请勿重复启动]"); + log.warn("[start][MQTT 统一协议服务已经在运行中,请勿重复启动]"); return; } - log.info("[start][开始启动 MQTT 协议服务]"); + log.info("[start][开始启动 MQTT 统一协议服务]"); - // 初始化组件 - this.vertx = Vertx.vertx(); - this.messageRouter = new IotMqttUpstreamRouter(this); + try { + // 1. 创建共享的 Vertx 实例 + this.vertx = Vertx.vertx(); + log.info("[start][共享 Vertx 实例创建成功]"); + + // 2. 启动 HTTP 认证服务 + startHttpAuthServer(); + + // 3. 启动 MQTT 客户端 + startMqttClient(); + + isRunning = true; + log.info("[start][MQTT 统一协议服务启动完成]"); + } catch (Exception e) { + log.error("[start][MQTT 统一协议服务启动失败]", e); + // 启动失败时清理资源 + stop(); + throw e; + } + } + + @PreDestroy + public void stop() { + if (!isRunning) { + log.warn("[stop][MQTT 统一协议服务已经停止,无需再次停止]"); + return; + } + log.info("[stop][开始停止 MQTT 统一协议服务]"); + + // 1. 停止 MQTT 客户端 + stopMqttClient(); + + // 2. 停止 HTTP 认证服务 + stopHttpAuthServer(); + + // 3. 关闭 Vertx 实例 + if (vertx != null) { + try { + vertx.close(); + log.info("[stop][Vertx 实例已关闭]"); + } catch (Exception e) { + log.warn("[stop][关闭 Vertx 实例失败]", e); + } + } + + isRunning = false; + log.info("[stop][MQTT 统一协议服务已停止]"); + } + + /** + * 启动 HTTP 认证服务 + */ + private void startHttpAuthServer() { + log.info("[startHttpAuthServer][开始启动 HTTP 认证服务]"); + + // 创建路由 + Router router = Router.router(vertx); + router.route().handler(BodyHandler.create()); + + // 创建认证处理器 + this.authHandler = new IotMqttHttpAuthHandler(); + + // 添加认证路由 + router.post("/mqtt/auth/authenticate").handler(authHandler::authenticate); + router.post("/mqtt/auth/connected").handler(authHandler::connected); + router.post("/mqtt/auth/disconnected").handler(authHandler::disconnected); + + // 启动 HTTP 服务器 + int authPort = emqxProperties.getHttpAuthPort(); + try { + httpAuthServer = vertx.createHttpServer() + .requestHandler(router) + .listen(authPort) + .result(); + log.info("[startHttpAuthServer][HTTP 认证服务启动成功,端口:{}]", authPort); + } catch (Exception e) { + log.error("[startHttpAuthServer][HTTP 认证服务启动失败]", e); + throw e; + } + } + + /** + * 停止 HTTP 认证服务 + */ + private void stopHttpAuthServer() { + if (httpAuthServer != null) { + try { + httpAuthServer.close().result(); + log.info("[stopHttpAuthServer][HTTP 认证服务已停止]"); + } catch (Exception e) { + log.error("[stopHttpAuthServer][HTTP 认证服务停止失败]", e); + } + } + } + + /** + * 启动 MQTT 客户端 + */ + private void startMqttClient() { + log.info("[startMqttClient][开始启动 MQTT 客户端]"); + + // 初始化消息处理器 + this.upstreamHandler = new IotMqttUpstreamHandler(); // 创建 MQTT 客户端 MqttClientOptions options = new MqttClientOptions() - // TODO @haohao:clientid 建议也配置文件;想通名字,会冲突哇? .setClientId("yudao-iot-gateway-" + IdUtil.fastSimpleUUID()) .setUsername(emqxProperties.getMqttUsername()) .setPassword(emqxProperties.getMqttPassword()) @@ -72,14 +191,10 @@ public class IotMqttUpstreamProtocol { connectMqtt(); } - @PreDestroy - public void stop() { - if (!isRunning) { - log.warn("[stop][MQTT 协议服务已经停止,无需再次停止]"); - return; - } - log.info("[stop][开始停止 MQTT 协议服务]"); - + /** + * 停止 MQTT 客户端 + */ + private void stopMqttClient() { // 1. 取消 MQTT 主题订阅 if (mqttClient != null && mqttClient.isConnected()) { List topicList = emqxProperties.getMqttTopics(); @@ -87,9 +202,9 @@ public class IotMqttUpstreamProtocol { for (String topic : topicList) { try { mqttClient.unsubscribe(topic); - log.debug("[stop][取消订阅主题: {}]", topic); + log.debug("[stopMqttClient][取消订阅主题: {}]", topic); } catch (Exception e) { - log.warn("[stop][取消订阅主题异常: {}]", topic, e); + log.warn("[stopMqttClient][取消订阅主题异常: {}]", topic, e); } } } @@ -99,33 +214,20 @@ public class IotMqttUpstreamProtocol { try { if (mqttClient != null && mqttClient.isConnected()) { mqttClient.disconnect(); + log.info("[stopMqttClient][MQTT 客户端已断开]"); } } catch (Exception e) { - log.warn("[stop][关闭 MQTT 客户端异常]", e); + log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e); } - - // 3. 关闭 Vertx - try { - if (vertx != null) { - vertx.close(); - } - } catch (Exception e) { - log.warn("[stop][关闭 Vertx 异常]", e); - } - - // 4. 更新状态 - isRunning = false; - log.info("[stop][MQTT 协议服务已停止]"); } /** * 连接 MQTT Broker 并订阅主题 */ private void connectMqtt() { - // 检查必要的 MQTT 配置 - // TODO @haohao:这些通过配置文件的 validate 去做哈,简化下; String host = emqxProperties.getMqttHost(); Integer port = emqxProperties.getMqttPort(); + if (StrUtil.isBlank(host)) { String msg = "[connectMqtt][MQTT Host 为空,无法连接]"; log.error(msg); @@ -133,7 +235,7 @@ public class IotMqttUpstreamProtocol { } if (port == null) { log.warn("[connectMqtt][MQTT Port 为 null,使用默认端口 1883]"); - port = 1883; // 默认 MQTT 端口 + port = 1883; } final Integer finalPort = port; @@ -152,7 +254,6 @@ public class IotMqttUpstreamProtocol { // 订阅主题 subscribeToTopics(); }) - // TODO @haohao:这个要不要改成,必须连接成功?不做重试;不然启动也蛮危险的? .exceptionally(error -> { log.error("[connectMqtt][连接 MQTT Broker 失败]", error); reconnectWithDelay(); @@ -162,10 +263,9 @@ public class IotMqttUpstreamProtocol { // 等待连接完成 try { connectFuture.get(10, TimeUnit.SECONDS); - isRunning = true; - log.info("[connectMqtt][MQTT 协议服务启动完成]"); + log.info("[connectMqtt][MQTT 客户端启动完成]"); } catch (Exception e) { - log.error("[connectMqtt][MQTT 协议服务启动失败]", e); + log.error("[connectMqtt][MQTT 客户端启动失败]", e); } } @@ -173,7 +273,7 @@ public class IotMqttUpstreamProtocol { * 设置 MQTT 消息处理器 */ private void setupMessageHandler() { - mqttClient.publishHandler(messageRouter::route); + mqttClient.publishHandler(upstreamHandler::handle); log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]"); } @@ -182,76 +282,54 @@ public class IotMqttUpstreamProtocol { */ private void subscribeToTopics() { List topicList = emqxProperties.getMqttTopics(); - // TODO @haohao:建议 topicList 直接 validate 校验 if (CollUtil.isEmpty(topicList)) { - log.warn("[subscribeToTopics][未配置 MQTT 主题,使用默认主题]"); - topicList = List.of("/sys/#"); // 默认订阅所有系统主题 + log.warn("[subscribeToTopics][没有配置要订阅的主题]"); + return; } for (String topic : topicList) { - // TODO @haohao:直接 validate 校验;嘿嘿,主要保证核心逻辑,简单点 - if (StrUtil.isBlank(topic)) { - log.warn("[subscribeToTopics][跳过空主题]"); - continue; - } - - mqttClient.subscribe(topic, DEFAULT_QOS.value()) - .onSuccess(ack -> log.info("[subscribeToTopics][订阅主题成功: {}]", topic)) - .onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err)); + mqttClient.subscribe(topic, DEFAULT_QOS.value(), subscribeResult -> { + if (subscribeResult.succeeded()) { + log.info("[subscribeToTopics][订阅主题成功: {}]", topic); + } else { + log.error("[subscribeToTopics][订阅主题失败: {}]", topic, subscribeResult.cause()); + } + }); } } /** - * 重连 MQTT 客户端 + * 延迟重连 */ private void reconnectWithDelay() { - if (!isRunning) { - log.info("[reconnectWithDelay][服务已停止,不再尝试重连]"); - return; - } - - // 默认重连延迟 5 秒 - int reconnectDelayMs = 5000; - vertx.setTimer(reconnectDelayMs, id -> { - log.info("[reconnectWithDelay][开始重新连接 MQTT]"); - connectMqtt(); + vertx.setTimer(5000, timerId -> { + if (isRunning && (mqttClient == null || !mqttClient.isConnected())) { + log.info("[reconnectWithDelay][开始重连 MQTT Broker]"); + connectMqtt(); + } }); } /** - * 发布消息到 MQTT + * 发布消息到 MQTT Broker * * @param topic 主题 * @param payload 消息内容 */ public void publishMessage(String topic, String payload) { - if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("[publishMessage][MQTT 客户端未连接,无法发送消息][topic: {}]", topic); - return; + if (mqttClient != null && mqttClient.isConnected()) { + mqttClient.publish(topic, Buffer.buffer(payload), DEFAULT_QOS, false, false); + log.debug("[publishMessage][发布消息成功][topic: {}]", topic); + } else { + log.warn("[publishMessage][MQTT 客户端未连接,无法发布消息][topic: {}]", topic); } - - mqttClient.publish(topic, Buffer.buffer(payload), DEFAULT_QOS, false, false) - .onSuccess(v -> log.debug("[publishMessage][发送消息成功][topic: {}]", topic)) - .onFailure(err -> log.error("[publishMessage][发送消息失败][topic: {}]", topic, err)); } /** * 获取服务器 ID - * - * @return 服务器 ID */ public String getServerId() { return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); } - // TODO @haohao:这个要删除哇? - /** - * 获取 MQTT 客户端 - * - * @return MQTT 客户端 - */ - public MqttClient getMqttClient() { - return mqttClient; - } - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java deleted file mode 100644 index 66b835bf03..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java +++ /dev/null @@ -1,38 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; - -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 MQTT 协议的路由处理器抽象基类 - *

- * 提供通用的处理方法,所有 MQTT 消息处理器都应继承此类 - * - * @author 芋道源码 - */ -@Slf4j -public abstract class IotMqttAbstractHandler { - - /** - * 处理 MQTT 消息 - * - * @param topic 主题 - * @param payload 消息内容 - */ - public abstract void handle(String topic, String payload); - - /** - * 解析主题,获取主题各部分 - * - * @param topic 主题 - * @return 主题各部分数组,如果解析失败返回 null - */ - protected String[] parseTopic(String topic) { - String[] topicParts = topic.split("/"); - if (topicParts.length < 7) { - log.warn("[parseTopic][主题格式不正确][topic: {}]", topic); - return null; - } - return topicParts; - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java deleted file mode 100644 index 074acfca41..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java +++ /dev/null @@ -1,109 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; - -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; -import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.StandardCharsets; - -/** - * IoT 网关 MQTT 事件处理器 - *

- * 处理设备事件相关的 MQTT 消息 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -@Slf4j -public class IotMqttEventHandler extends IotMqttAbstractHandler { - - private final IotMqttUpstreamProtocol protocol; - private final IotDeviceMessageProducer deviceMessageProducer; - private final IotDeviceMessageService deviceMessageService; - - @Override - public void handle(String topic, String payload) { - try { - log.info("[handle][接收到设备事件上报][topic: {}]", topic); - - // 解析消息内容 - String[] topicParts = parseTopic(topic); - if (topicParts == null) { - return; - } - - // 构建设备消息 - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - String eventIdentifier = getEventIdentifier(topicParts, topic); - if (eventIdentifier == null) { - return; - } - - // 使用 IotDeviceMessageService 解码消息 - byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName); - - // 发送消息 - deviceMessageProducer.sendDeviceMessage(message); - log.info("[handle][处理设备事件上报成功][topic: {}]", topic); - - // 发送响应消息 - // TODO @haohao:这里应该只 ack 哈;reply 在 biz 业务处理了。handleUpstreamDeviceMessage - String method = "thing.event." + eventIdentifier + ".post"; - sendResponse(topic, JSONUtil.parseObj(payload), method); - } catch (Exception e) { - log.error("[handle][处理设备事件上报失败][topic: {}][payload: {}]", topic, payload, e); - } - } - - /** - * 从主题部分中获取事件标识符 - * - * @param topicParts 主题各部分 - * @param topic 原始主题,用于日志 - * @return 事件标识符,如果获取失败返回 null - */ - private String getEventIdentifier(String[] topicParts, String topic) { - try { - return topicParts[6]; - } catch (ArrayIndexOutOfBoundsException e) { - log.warn("[getEventIdentifier][无法从主题中获取事件标识符][topic: {}]", topic); - return null; - } - } - - /** - * 发送响应消息 - * - * @param topic 原始主题 - * @param jsonObject 原始消息 JSON 对象 - * @param method 响应方法 - */ - private void sendResponse(String topic, JSONObject jsonObject, String method) { - try { - String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic); - - // 构建响应消息 - JSONObject response = new JSONObject(); - response.set("id", jsonObject.getStr("id")); - response.set("code", 200); - response.set("method", method); - response.set("data", new JSONObject()); - - // 发送响应 - protocol.publishMessage(replyTopic, response.toString()); - log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic); - } catch (Exception e) { - log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e); - } - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java deleted file mode 100644 index 64acb82964..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java +++ /dev/null @@ -1,171 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; - -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; -import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.StandardCharsets; - -/** - * IoT 网关 MQTT 属性处理器 - *

- * 处理设备属性相关的 MQTT 消息 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -@Slf4j -public class IotMqttPropertyHandler extends IotMqttAbstractHandler { - - private final IotMqttUpstreamProtocol protocol; - private final IotDeviceMessageProducer deviceMessageProducer; - private final IotDeviceMessageService deviceMessageService; - - @Override - public void handle(String topic, String payload) { - if (topic.endsWith(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic())) { - // 属性上报 - handlePropertyPost(topic, payload); - } else if (topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic())) { - // 属性设置响应 - handlePropertySetReply(topic, payload); - } else if (topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic())) { - // 属性获取响应 - handlePropertyGetReply(topic, payload); - } else { - log.warn("[handle][未知的属性主题][topic: {}]", topic); - } - } - - /** - * 处理设备属性上报消息 - * - * @param topic 主题 - * @param payload 消息内容 - */ - private void handlePropertyPost(String topic, String payload) { - try { - log.info("[handlePropertyPost][接收到设备属性上报][topic: {}]", topic); - - // 解析主题获取设备信息 - String[] topicParts = parseTopic(topic); - if (topicParts == null) { - return; - } - - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - - // 使用 IotDeviceMessageService 解码消息 - byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName); - - // 发送消息 - deviceMessageProducer.sendDeviceMessage(message); - log.info("[handlePropertyPost][处理设备属性上报成功][topic: {}]", topic); - - // 发送响应消息 - sendResponse(topic, JSONUtil.parseObj(payload), "thing.event.property.post"); - } catch (Exception e) { - log.error("[handlePropertyPost][处理设备属性上报失败][topic: {}][payload: {}]", topic, payload, e); - } - } - - /** - * 处理属性设置响应消息 - * - * @param topic 主题 - * @param payload 消息内容 - */ - private void handlePropertySetReply(String topic, String payload) { - try { - log.info("[handlePropertySetReply][接收到属性设置响应][topic: {}]", topic); - - // 解析主题获取设备信息 - String[] topicParts = parseTopic(topic); - if (topicParts == null) { - return; - } - - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - - // 使用 IotDeviceMessageService 解码消息 - byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName); - - // 发送消息 - deviceMessageProducer.sendDeviceMessage(message); - log.info("[handlePropertySetReply][处理属性设置响应成功][topic: {}]", topic); - } catch (Exception e) { - log.error("[handlePropertySetReply][处理属性设置响应失败][topic: {}][payload: {}]", topic, payload, e); - } - } - - /** - * 处理属性获取响应消息 - * - * @param topic 主题 - * @param payload 消息内容 - */ - private void handlePropertyGetReply(String topic, String payload) { - try { - log.info("[handlePropertyGetReply][接收到属性获取响应][topic: {}]", topic); - - // 解析主题获取设备信息 - String[] topicParts = parseTopic(topic); - if (topicParts == null) { - return; - } - - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - - // 使用 IotDeviceMessageService 解码消息 - byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName); - - // 发送消息 - deviceMessageProducer.sendDeviceMessage(message); - log.info("[handlePropertyGetReply][处理属性获取响应成功][topic: {}]", topic); - } catch (Exception e) { - log.error("[handlePropertyGetReply][处理属性获取响应失败][topic: {}][payload: {}]", topic, payload, e); - } - } - - /** - * 发送响应消息 - * - * @param topic 原始主题 - * @param jsonObject 原始消息 JSON 对象 - * @param method 响应方法 - */ - private void sendResponse(String topic, JSONObject jsonObject, String method) { - try { - String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic); - - // 构建响应消息 - JSONObject response = new JSONObject(); - response.set("id", jsonObject.getStr("id")); - response.set("code", 200); - response.set("method", method); - response.set("data", new JSONObject()); - - // 发送响应 - protocol.publishMessage(replyTopic, response.toString()); - log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic); - } catch (Exception e) { - log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e); - } - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java deleted file mode 100644 index 85947e75d3..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java +++ /dev/null @@ -1,109 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; - -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; -import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.StandardCharsets; - -/** - * IoT 网关 MQTT 服务处理器 - *

- * 处理设备服务调用相关的 MQTT 消息 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -@Slf4j -public class IotMqttServiceHandler extends IotMqttAbstractHandler { - - private final IotMqttUpstreamProtocol protocol; - private final IotDeviceMessageProducer deviceMessageProducer; - private final IotDeviceMessageService deviceMessageService; - - @Override - public void handle(String topic, String payload) { - try { - log.info("[handle][接收到设备服务调用响应][topic: {}]", topic); - - // 解析消息内容 - String[] topicParts = parseTopic(topic); - if (topicParts == null) { - return; - } - - // 构建设备消息 - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - String serviceIdentifier = getServiceIdentifier(topicParts, topic); - if (serviceIdentifier == null) { - return; - } - - // 使用 IotDeviceMessageService 解码消息 - byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName); - - // 发送消息 - deviceMessageProducer.sendDeviceMessage(message); - log.info("[handle][处理设备服务调用响应成功][topic: {}]", topic); - - // 发送响应消息 - String method = "thing.service." + serviceIdentifier; - sendResponse(topic, JSONUtil.parseObj(payload), method); - } catch (Exception e) { - log.error("[handle][处理设备服务调用响应失败][topic: {}][payload: {}]", topic, payload, e); - } - } - - /** - * 从主题部分中获取服务标识符 - * - * @param topicParts 主题各部分 - * @param topic 原始主题,用于日志 - * @return 服务标识符,如果获取失败返回 null - */ - private String getServiceIdentifier(String[] topicParts, String topic) { - try { - // 服务主题格式:/sys/{productKey}/{deviceName}/thing/service/{serviceIdentifier} - return topicParts[6]; - } catch (ArrayIndexOutOfBoundsException e) { - log.warn("[getServiceIdentifier][无法从主题中获取服务标识符][topic: {}]", topic); - return null; - } - } - - /** - * 发送响应消息 - * - * @param topic 原始主题 - * @param jsonObject 原始消息 JSON 对象 - * @param method 响应方法 - */ - private void sendResponse(String topic, JSONObject jsonObject, String method) { - try { - String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic); - - // 构建响应消息 - JSONObject response = new JSONObject(); - response.set("id", jsonObject.getStr("id")); - response.set("code", 200); - response.set("method", method); - response.set("data", new JSONObject()); - - // 发送响应 - protocol.publishMessage(replyTopic, response.toString()); - log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic); - } catch (Exception e) { - log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e); - } - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java new file mode 100644 index 0000000000..9bfcb35c2d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -0,0 +1,145 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.mqtt.messages.MqttPublishMessage; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; + +/** + * IoT 网关 MQTT 协议的【上行】处理器 + *

+ * 处理设备上行消息,包括事件上报、属性上报、服务调用响应等 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttUpstreamHandler { + + private final IotDeviceMessageProducer deviceMessageProducer; + private final IotDeviceMessageService deviceMessageService; + + public IotMqttUpstreamHandler() { + this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class); + this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + } + + /** + * 处理 MQTT 发布消息 + */ + public void handle(MqttPublishMessage message) { + String topic = message.topicName(); + String payload = message.payload().toString(StandardCharsets.UTF_8); + + if (StrUtil.isBlank(topic)) { + log.warn("[handle][主题为空,忽略消息]"); + return; + } + + if (StrUtil.isBlank(payload)) { + log.warn("[handle][消息内容为空][topic: {}]", topic); + return; + } + + log.debug("[handle][收到 MQTT 消息][topic: {}]", topic); + handle(topic, payload); + } + + /** + * 处理 MQTT 消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + public void handle(String topic, String payload) { + try { + // 1. 识别并验证消息类型 + String messageType = getMessageType(topic); + if (messageType == null) { + log.warn("[handle][未知的消息类型][topic: {}]", topic); + return; + } + + // 2. 处理消息 + processMessage(topic, payload, messageType); + + } catch (Exception e) { + log.error("[handle][处理消息失败][topic: {}][payload: {}]", topic, payload, e); + } + } + + /** + * 处理消息的统一逻辑 + */ + private void processMessage(String topic, String payload, String messageType) { + log.info("[processMessage][接收到{}][topic: {}]", messageType, topic); + + // 解析主题获取设备信息 + String[] topicParts = parseTopic(topic); + if (topicParts == null) { + return; + } + + String productKey = topicParts[2]; + String deviceName = topicParts[3]; + + // 解码消息 + byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); + IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( + messageBytes, productKey, deviceName); + + // 发送消息到队列 + deviceMessageProducer.sendDeviceMessage(message); + + // 记录成功日志 + log.info("[processMessage][处理{}成功][topic: {}]", messageType, topic); + } + + /** + * 识别消息类型 + * + * @param topic 主题 + * @return 消息类型描述,如果不支持返回 null + */ + private String getMessageType(String topic) { + // 设备事件上报: /sys/{productKey}/{deviceName}/thing/event/{eventIdentifier}/post + if (topic.contains("/thing/event/") && topic.endsWith("/post")) { + return "设备事件上报"; + } + + // 设备属性操作: /sys/{productKey}/{deviceName}/thing/property/post + // 或属性响应: /sys/{productKey}/{deviceName}/thing/service/property/set_reply + if (topic.endsWith("/thing/property/post") || + topic.contains("/thing/service/property/set") || + topic.contains("/thing/service/property/get")) { + return "设备属性操作"; + } + + // 设备服务调用: /sys/{productKey}/{deviceName}/thing/service/{serviceIdentifier} + if (topic.contains("/thing/service/") && !topic.contains("/property/")) { + return "设备服务调用"; + } + + // 不支持的消息类型 + return null; + } + + /** + * 解析主题,获取主题各部分 + * + * @param topic 主题 + * @return 主题各部分数组,如果解析失败返回 null + */ + private String[] parseTopic(String topic) { + String[] topicParts = topic.split("/"); + if (topicParts.length < 7) { + log.warn("[parseTopic][主题格式不正确][topic: {}]", topic); + return null; + } + return topicParts; + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java deleted file mode 100644 index 4b10ed0bc5..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java +++ /dev/null @@ -1,108 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; -import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.mqtt.messages.MqttPublishMessage; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 MQTT 上行路由器 - *

- * 根据消息主题路由到不同的处理器 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -@Slf4j -public class IotMqttUpstreamRouter { - - private final IotMqttUpstreamProtocol protocol; - private final IotDeviceMessageProducer deviceMessageProducer; - private final IotDeviceMessageService deviceMessageService; - - // 处理器实例 - private IotMqttPropertyHandler propertyHandler; - private IotMqttEventHandler eventHandler; - private IotMqttServiceHandler serviceHandler; - - public IotMqttUpstreamRouter(IotMqttUpstreamProtocol protocol) { - this.protocol = protocol; - this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class); - this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); - // 初始化处理器 - this.propertyHandler = new IotMqttPropertyHandler(protocol, deviceMessageProducer, deviceMessageService); - this.eventHandler = new IotMqttEventHandler(protocol, deviceMessageProducer, deviceMessageService); - this.serviceHandler = new IotMqttServiceHandler(protocol, deviceMessageProducer, deviceMessageService); - } - - /** - * 路由 MQTT 消息 - * - * @param message MQTT 发布消息 - */ - public void route(MqttPublishMessage message) { - String topic = message.topicName(); - String payload = message.payload().toString(); - log.info("[route][接收到 MQTT 消息][topic: {}][payload: {}]", topic, payload); - - try { - if (StrUtil.isEmpty(payload)) { - log.warn("[route][消息内容为空][topic: {}]", topic); - return; - } - - // 根据主题路由到相应的处理器 - if (isPropertyTopic(topic)) { - propertyHandler.handle(topic, payload); - } else if (isEventTopic(topic)) { - eventHandler.handle(topic, payload); - } else if (isServiceTopic(topic)) { - serviceHandler.handle(topic, payload); - } else { - log.warn("[route][未知的消息类型][topic: {}]", topic); - } - } catch (Exception e) { - log.error("[route][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, payload, e); - } - } - - /** - * 判断是否为属性相关主题 - * - * @param topic 主题 - * @return 是否为属性主题 - */ - private boolean isPropertyTopic(String topic) { - return topic.endsWith(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic()) || - topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic()) || - topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic()); - } - - /** - * 判断是否为事件相关主题 - * - * @param topic 主题 - * @return 是否为事件主题 - */ - private boolean isEventTopic(String topic) { - return topic.contains(IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic()) && - topic.endsWith(IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic()); - } - - /** - * 判断是否为服务相关主题 - * - * @param topic 主题 - * @return 是否为服务主题 - */ - private boolean isServiceTopic(String topic) { - return topic.contains(IotDeviceTopicEnum.SERVICE_TOPIC_PREFIX.getTopic()) && - !isPropertyTopic(topic); // 排除属性相关的服务调用 - } - -} \ No newline at end of file