From 71b45a29a304e033ea29dafc546873caa50c6678 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Tue, 4 Mar 2025 20:13:19 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E8=AF=84=E5=AE=A1?= =?UTF-8?q?=E3=80=91IoT=EF=BC=9AMQTT=20=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../plugin/common/util/IotPluginCommonUtils.java | 4 ++-- .../emqx/upstream/IotDeviceUpstreamServer.java | 13 +++---------- .../upstream/router/IotDeviceAuthVertxHandler.java | 12 ++++++------ 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java index 753e62c94b..d60df9cc0d 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java @@ -50,10 +50,10 @@ public class IotPluginCommonUtils { } /** - * 将对象转换为JSON字符串后写入响应 + * 将对象转换为 JSON 字符串后写入响应 * * @param routingContext 路由上下文 - * @param data 要转换为JSON的数据对象 + * @param data 数据对象 */ @SuppressWarnings("deprecation") public static void writeJson(RoutingContext routingContext, Object data) { diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java index 54479df158..c9db423e99 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.plugin.emqx.upstream; +import cn.hutool.core.util.ArrayUtil; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties; import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler; @@ -167,33 +168,28 @@ public class IotDeviceUpstreamServer { */ private Future subscribeToTopics() { String[] topics = emqxProperties.getMqttTopics(); - if (topics == null || topics.length == 0) { + if (ArrayUtil.isEmpty(topics)) { log.warn("[subscribeToTopics] 未配置MQTT主题,跳过订阅"); return Future.succeededFuture(); } - log.info("[subscribeToTopics] 开始订阅设备上行消息主题"); Future compositeFuture = Future.succeededFuture(); - for (String topic : topics) { String trimmedTopic = topic.trim(); if (trimmedTopic.isEmpty()) { continue; } - compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value()) .map(ack -> { log.info("[subscribeToTopics] 成功订阅主题: {}", trimmedTopic); return null; }) .recover(err -> { - log.error("[subscribeToTopics] 订阅主题失败: {}, 原因: {}", - trimmedTopic, err.getMessage()); + log.error("[subscribeToTopics] 订阅主题失败: {}, 原因: {}", trimmedTopic, err.getMessage()); return Future.succeededFuture(); // 继续订阅其他主题 })); } - return compositeFuture; } @@ -205,7 +201,6 @@ public class IotDeviceUpstreamServer { log.warn("[stop] 服务未运行,无需停止"); return; } - log.info("[stop] 开始关闭服务"); isRunning = false; @@ -213,11 +208,9 @@ public class IotDeviceUpstreamServer { CompletableFuture serverFuture = server != null ? server.close().toCompletionStage().toCompletableFuture() : CompletableFuture.completedFuture(null); - CompletableFuture clientFuture = client != null ? client.disconnect().toCompletionStage().toCompletableFuture() : CompletableFuture.completedFuture(null); - CompletableFuture vertxFuture = vertx != null ? vertx.close().toCompletionStage().toCompletableFuture() : CompletableFuture.completedFuture(null); diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java index 350de674cd..3f3cf94e9a 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java @@ -14,8 +14,7 @@ import java.util.Collections; /** * IoT Emqx 连接认证的 Vert.x Handler - * ... + * MQTT HTTP * * @author haohao */ @@ -31,28 +30,29 @@ public class IotDeviceAuthVertxHandler implements Handler { @SuppressWarnings("unchecked") public void handle(RoutingContext routingContext) { try { + // 构建认证请求 DTO JsonObject json = routingContext.body().asJsonObject(); String clientId = json.getString("clientid"); String username = json.getString("username"); String password = json.getString("password"); - - // 构建认证请求DTO IotDeviceEmqxAuthReqDTO authReqDTO = new IotDeviceEmqxAuthReqDTO() .setClientId(clientId) .setUsername(username) .setPassword(password); - // 调用认证API + // 调用认证 API CommonResult authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO); if (authResult.getCode() != 0 || !authResult.getData()) { IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny")); return; } + // 响应结果 IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "allow")); } catch (Exception e) { - log.error("[handle][EMQX认证异常]", e); + log.error("[handle][EMQX 认证异常]", e); IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny")); } } + } \ No newline at end of file