【代码评审】IoT:MQTT 插件

This commit is contained in:
YunaiV 2025-03-04 20:13:19 +08:00
parent 3c9985978b
commit 71b45a29a3
3 changed files with 11 additions and 18 deletions

View File

@ -50,10 +50,10 @@ public class IotPluginCommonUtils {
} }
/** /**
* 将对象转换为JSON字符串后写入响应 * 将对象转换为 JSON 字符串后写入响应
* *
* @param routingContext 路由上下文 * @param routingContext 路由上下文
* @param data 要转换为JSON的数据对象 * @param data 数据对象
*/ */
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public static void writeJson(RoutingContext routingContext, Object data) { public static void writeJson(RoutingContext routingContext, Object data) {

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.upstream; package cn.iocoder.yudao.module.iot.plugin.emqx.upstream;
import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties; import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler; import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler;
@ -167,33 +168,28 @@ public class IotDeviceUpstreamServer {
*/ */
private Future<Void> subscribeToTopics() { private Future<Void> subscribeToTopics() {
String[] topics = emqxProperties.getMqttTopics(); String[] topics = emqxProperties.getMqttTopics();
if (topics == null || topics.length == 0) { if (ArrayUtil.isEmpty(topics)) {
log.warn("[subscribeToTopics] 未配置MQTT主题跳过订阅"); log.warn("[subscribeToTopics] 未配置MQTT主题跳过订阅");
return Future.succeededFuture(); return Future.succeededFuture();
} }
log.info("[subscribeToTopics] 开始订阅设备上行消息主题"); log.info("[subscribeToTopics] 开始订阅设备上行消息主题");
Future<Void> compositeFuture = Future.succeededFuture(); Future<Void> compositeFuture = Future.succeededFuture();
for (String topic : topics) { for (String topic : topics) {
String trimmedTopic = topic.trim(); String trimmedTopic = topic.trim();
if (trimmedTopic.isEmpty()) { if (trimmedTopic.isEmpty()) {
continue; continue;
} }
compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value()) compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value())
.<Void>map(ack -> { .<Void>map(ack -> {
log.info("[subscribeToTopics] 成功订阅主题: {}", trimmedTopic); log.info("[subscribeToTopics] 成功订阅主题: {}", trimmedTopic);
return null; return null;
}) })
.recover(err -> { .recover(err -> {
log.error("[subscribeToTopics] 订阅主题失败: {}, 原因: {}", log.error("[subscribeToTopics] 订阅主题失败: {}, 原因: {}", trimmedTopic, err.getMessage());
trimmedTopic, err.getMessage());
return Future.<Void>succeededFuture(); // 继续订阅其他主题 return Future.<Void>succeededFuture(); // 继续订阅其他主题
})); }));
} }
return compositeFuture; return compositeFuture;
} }
@ -205,7 +201,6 @@ public class IotDeviceUpstreamServer {
log.warn("[stop] 服务未运行,无需停止"); log.warn("[stop] 服务未运行,无需停止");
return; return;
} }
log.info("[stop] 开始关闭服务"); log.info("[stop] 开始关闭服务");
isRunning = false; isRunning = false;
@ -213,11 +208,9 @@ public class IotDeviceUpstreamServer {
CompletableFuture<Void> serverFuture = server != null CompletableFuture<Void> serverFuture = server != null
? server.close().toCompletionStage().toCompletableFuture() ? server.close().toCompletionStage().toCompletableFuture()
: CompletableFuture.completedFuture(null); : CompletableFuture.completedFuture(null);
CompletableFuture<Void> clientFuture = client != null CompletableFuture<Void> clientFuture = client != null
? client.disconnect().toCompletionStage().toCompletableFuture() ? client.disconnect().toCompletionStage().toCompletableFuture()
: CompletableFuture.completedFuture(null); : CompletableFuture.completedFuture(null);
CompletableFuture<Void> vertxFuture = vertx != null CompletableFuture<Void> vertxFuture = vertx != null
? vertx.close().toCompletionStage().toCompletableFuture() ? vertx.close().toCompletionStage().toCompletableFuture()
: CompletableFuture.completedFuture(null); : CompletableFuture.completedFuture(null);

View File

@ -14,8 +14,7 @@ import java.util.Collections;
/** /**
* IoT Emqx 连接认证的 Vert.x Handler * IoT Emqx 连接认证的 Vert.x Handler
* <a href= * <a href="https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">MQTT HTTP</a>
* "https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">...</a>
* *
* @author haohao * @author haohao
*/ */
@ -31,28 +30,29 @@ public class IotDeviceAuthVertxHandler implements Handler<RoutingContext> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void handle(RoutingContext routingContext) { public void handle(RoutingContext routingContext) {
try { try {
// 构建认证请求 DTO
JsonObject json = routingContext.body().asJsonObject(); JsonObject json = routingContext.body().asJsonObject();
String clientId = json.getString("clientid"); String clientId = json.getString("clientid");
String username = json.getString("username"); String username = json.getString("username");
String password = json.getString("password"); String password = json.getString("password");
// 构建认证请求DTO
IotDeviceEmqxAuthReqDTO authReqDTO = new IotDeviceEmqxAuthReqDTO() IotDeviceEmqxAuthReqDTO authReqDTO = new IotDeviceEmqxAuthReqDTO()
.setClientId(clientId) .setClientId(clientId)
.setUsername(username) .setUsername(username)
.setPassword(password); .setPassword(password);
// 调用认证API // 调用认证 API
CommonResult<Boolean> authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO); CommonResult<Boolean> authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO);
if (authResult.getCode() != 0 || !authResult.getData()) { if (authResult.getCode() != 0 || !authResult.getData()) {
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny")); IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny"));
return; return;
} }
// 响应结果
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "allow")); IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "allow"));
} catch (Exception e) { } catch (Exception e) {
log.error("[handle][EMQX认证异常]", e); log.error("[handle][EMQX 认证异常]", e);
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny")); IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny"));
} }
} }
} }