From a9733b4d2af6caf65445ce2ce4aac3295c106618 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 16 Mar 2025 23:11:04 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E8=AF=84=E5=AE=A1?= =?UTF-8?q?=E3=80=91IoT=EF=BC=9A=E6=95=B4=E4=BD=93=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotPluginCommonAutoConfiguration.java | 5 +- .../IotDeviceConfigSetVertxHandler.java | 10 ++- .../IotDeviceOtaUpgradeVertxHandler.java | 11 ++-- .../common/pojo/IotStandardResponse.java | 3 +- .../common/util/IotPluginCommonUtils.java | 1 + .../yudao-module-iot-plugin-emqx/pom.xml | 1 + .../IotPluginEmqxAutoConfiguration.java | 4 +- .../emqx/config/IotPluginEmqxProperties.java | 9 +-- .../IotDeviceDownstreamHandlerImpl.java | 9 ++- .../upstream/IotDeviceUpstreamServer.java | 62 +++++++++---------- .../router/IotDeviceAuthVertxHandler.java | 3 +- .../router/IotDeviceMqttMessageHandler.java | 2 +- .../router/IotDeviceWebhookVertxHandler.java | 5 +- .../http/config/IotHttpVertxPlugin.java | 4 +- .../router/IotDeviceUpstreamVertxHandler.java | 35 ++++++----- .../yudao-module-iot-plugin-mqtt/pom.xml | 1 + .../yudao/module/iot/plugin/MqttPlugin.java | 1 + .../iot/plugin/MqttServerExtension.java | 1 + 18 files changed, 83 insertions(+), 84 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/config/IotPluginCommonAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/config/IotPluginCommonAutoConfiguration.java index 111189875d..ba7d56fe61 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/config/IotPluginCommonAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/config/IotPluginCommonAutoConfiguration.java @@ -43,8 +43,9 @@ public class IotPluginCommonAutoConfiguration { } @Bean(initMethod = "init", destroyMethod = "stop") - public IotPluginInstanceHeartbeatJob pluginInstanceHeartbeatJob( - IotDeviceUpstreamApi deviceDataApi, IotDeviceDownstreamServer deviceDownstreamServer, IotPluginCommonProperties commonProperties) { + public IotPluginInstanceHeartbeatJob pluginInstanceHeartbeatJob(IotDeviceUpstreamApi deviceDataApi, + IotDeviceDownstreamServer deviceDownstreamServer, + IotPluginCommonProperties commonProperties) { return new IotPluginInstanceHeartbeatJob(deviceDataApi, deviceDownstreamServer, commonProperties); } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDeviceConfigSetVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDeviceConfigSetVertxHandler.java index b9bd4a52f1..1693f128d6 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDeviceConfigSetVertxHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDeviceConfigSetVertxHandler.java @@ -25,6 +25,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC @RequiredArgsConstructor public class IotDeviceConfigSetVertxHandler implements Handler { + // TODO @haohao:是不是可以把 PATH、Method 所有的,抽到一个枚举类里?因为 topic、path、method 相当于不同的几个表达? public static final String PATH = "/sys/:productKey/:deviceName/thing/service/config/set"; public static final String METHOD = "thing.service.config.set"; @@ -57,12 +58,9 @@ public class IotDeviceConfigSetVertxHandler implements Handler { CommonResult result = deviceDownstreamHandler.setDeviceConfig(reqDTO); // 3. 响应结果 - IotStandardResponse response; - if (result.isSuccess()) { - response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData()); - } else { - response = IotStandardResponse.error(reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg()); - } + IotStandardResponse response = result.isSuccess() ? + IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData()) + : IotStandardResponse.error(reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg()); IotPluginCommonUtils.writeJsonResponse(routingContext, response); } catch (Exception e) { log.error("[handle][请求参数({}) 配置设置异常]", reqDTO, e); diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDeviceOtaUpgradeVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDeviceOtaUpgradeVertxHandler.java index a49b84acca..b417229aae 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDeviceOtaUpgradeVertxHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDeviceOtaUpgradeVertxHandler.java @@ -62,15 +62,14 @@ public class IotDeviceOtaUpgradeVertxHandler implements Handler CommonResult result = deviceDownstreamHandler.upgradeDeviceOta(reqDTO); // 3. 响应结果 - IotStandardResponse response; - if (result.isSuccess()) { - response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData()); - } else { - response = IotStandardResponse.error(reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg()); - } + // TODO @haohao:可以考虑 IotStandardResponse.of(requestId, method, CommonResult) + IotStandardResponse response = result.isSuccess() ? + IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData()) + :IotStandardResponse.error(reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg()); IotPluginCommonUtils.writeJsonResponse(routingContext, response); } catch (Exception e) { log.error("[handle][请求参数({}) OTA 升级异常]", reqDTO, e); + // TODO @haohao:可以考虑 IotStandardResponse.of(requestId, method, ErrorCode) IotStandardResponse errorResponse = IotStandardResponse.error( reqDTO.getRequestId(), METHOD, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg()); IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse); diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/pojo/IotStandardResponse.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/pojo/IotStandardResponse.java index e31f40dd8d..131eb1b9ce 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/pojo/IotStandardResponse.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/pojo/IotStandardResponse.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.plugin.common.pojo; import lombok.Data; -import lombok.experimental.Accessors; // TODO @芋艿:1)后续考虑,要不要叫 IoT 网关之类的 Response;2)包名 pojo /** @@ -12,7 +11,6 @@ import lombok.experimental.Accessors; * @author haohao */ @Data -@Accessors(chain = true) public class IotStandardResponse { /** @@ -92,4 +90,5 @@ public class IotStandardResponse { .setMethod(method) .setVersion("1.0"); } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java index 2e09c3c5c3..34c6c0fe2b 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java @@ -72,4 +72,5 @@ public class IotPluginCommonUtils { .putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) .end(JsonUtils.toJsonString(response)); } + } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml index 818c08b333..8620ecaa65 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml @@ -15,6 +15,7 @@ 1.0.0 ${project.artifactId} + 物联网 插件模块 - emqx 插件 diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java index 382bb9ecf2..e1d11504cf 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java @@ -14,7 +14,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** - * IoT 插件 Emqx 的专用自动配置类 + * IoT 插件 EMQX 的专用自动配置类 * * @author haohao */ @@ -34,7 +34,7 @@ public class IotPluginEmqxAutoConfiguration { .setClientId("yudao-iot-downstream-" + IdUtil.fastSimpleUUID()) .setUsername(emqxProperties.getMqttUsername()) .setPassword(emqxProperties.getMqttPassword()) - .setSsl(emqxProperties.isMqttSsl()); + .setSsl(emqxProperties.getMqttSsl()); return MqttClient.create(vertx, options); } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java index 4117e71820..219fe0360f 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java @@ -14,6 +14,8 @@ import org.springframework.validation.annotation.Validated; @Data public class IotPluginEmqxProperties { + // TODO @haohao:参数校验,加下,啊哈 + /** * 服务主机 */ @@ -21,12 +23,11 @@ public class IotPluginEmqxProperties { /** * 服务端口 */ - private int mqttPort; + private Integer mqttPort; /** * 服务用户名 */ private String mqttUsername; - /** * 服务密码 */ @@ -34,7 +35,7 @@ public class IotPluginEmqxProperties { /** * 是否启用 SSL */ - private boolean mqttSsl; + private Boolean mqttSsl; /** * 订阅的主题列表 @@ -44,6 +45,6 @@ public class IotPluginEmqxProperties { /** * 认证端口 */ - private int authPort; + private Integer authPort; } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java index c1e64afb97..f5c19224af 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java @@ -25,6 +25,7 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle private static final String SYS_TOPIC_PREFIX = "/sys/"; + // TODO @haohao:是不是可以类似 IotDeviceConfigSetVertxHandler 的建议,抽到统一的枚举类 // TODO @haohao:讨论,感觉 mqtt 和 http,可以做个相对统一的格式哈。;回复 都使用 Alink 格式,方便后续扩展。 // 设备服务调用 标准 JSON // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier} @@ -63,7 +64,6 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle // 构建请求消息 String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId(); JSONObject request = buildServiceRequest(requestId, reqDTO.getIdentifier(), reqDTO.getParams()); - // 发送消息 publishMessage(topic, request); @@ -82,9 +82,8 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle @Override public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO reqDTO) { - log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); - // 验证参数 + log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null) { log.error("[setProperty][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); @@ -96,7 +95,6 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle // 构建请求消息 String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId(); JSONObject request = buildPropertySetRequest(requestId, reqDTO.getProperties()); - // 发送消息 publishMessage(topic, request); @@ -132,6 +130,7 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + PROPERTY_SET_TOPIC; } + // TODO @haohao:这个,后面搞个对象,会不会好点哈? /** * 构建服务调用请求 */ @@ -168,7 +167,7 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle } /** - * 生成请求ID + * 生成请求 ID */ private String generateRequestId() { return IdUtil.fastSimpleUUID(); diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java index 040985ba9a..8911a76a80 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java @@ -36,10 +36,6 @@ public class IotDeviceUpstreamServer { * 连接超时时间(毫秒) */ private static final int CONNECTION_TIMEOUT_MS = 10000; - /** - * 主题分隔符 - */ - private static final String TOPIC_SEPARATOR = ","; /** * 默认 QoS 级别 */ @@ -84,42 +80,40 @@ public class IotDeviceUpstreamServer { */ public void start() { if (isRunning) { - log.warn("服务已经在运行中,请勿重复启动"); + log.warn("[start][服务已经在运行中,请勿重复启动]"); return; } - - log.info("[start] 开始启动服务"); + log.info("[start][开始启动服务]"); // 1. 启动 HTTP 服务器 CompletableFuture httpFuture = server.listen(emqxProperties.getAuthPort()) .toCompletionStage() .toCompletableFuture() - .thenAccept(v -> log.info("[start] HTTP服务器启动完成,端口: {}", server.actualPort())); + .thenAccept(v -> log.info("[start][HTTP服务器启动完成,端口: {}]", server.actualPort())); // 2. 连接 MQTT Broker CompletableFuture mqttFuture = connectMqtt() .toCompletionStage() .toCompletableFuture() .thenAccept(v -> { - // 3. 添加 MQTT 断开重连监听器 + // 2.1 添加 MQTT 断开重连监听器 client.closeHandler(closeEvent -> { - log.warn("[closeHandler] MQTT连接已断开,准备重连"); + log.warn("[closeHandler][MQTT连接已断开,准备重连]"); reconnectWithDelay(); }); - - // 4. 设置 MQTT 消息处理器 + // 2. 设置 MQTT 消息处理器 setupMessageHandler(); }); - // 等待所有服务启动完成 + // 3. 等待所有服务启动完成 CompletableFuture.allOf(httpFuture, mqttFuture) .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS) .whenComplete((result, error) -> { if (error != null) { - log.error("[start] 服务启动失败", error); + log.error("[start][服务启动失败]", error); } else { isRunning = true; - log.info("[start] 所有服务启动完成"); + log.info("[start][所有服务启动完成]"); } }); } @@ -129,7 +123,7 @@ public class IotDeviceUpstreamServer { */ private void setupMessageHandler() { client.publishHandler(mqttMessageHandler::handle); - log.debug("[setupMessageHandler] MQTT消息处理器设置完成"); + log.debug("[setupMessageHandler][MQTT消息处理器设置完成]"); } /** @@ -137,12 +131,12 @@ public class IotDeviceUpstreamServer { */ private void reconnectWithDelay() { if (!isRunning) { - log.info("[reconnectWithDelay] 服务已停止,不再尝试重连"); + log.info("[reconnectWithDelay][服务已停止,不再尝试重连]"); return; } vertx.setTimer(RECONNECT_DELAY_MS, id -> { - log.info("[reconnectWithDelay] 开始重新连接MQTT"); + log.info("[reconnectWithDelay][开始重新连接 MQTT]"); connectMqtt(); }); } @@ -155,28 +149,28 @@ public class IotDeviceUpstreamServer { private Future connectMqtt() { return client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost()) .compose(connAck -> { - log.info("[connectMqtt] MQTT客户端连接成功"); + log.info("[connectMqtt][MQTT客户端连接成功]"); return subscribeToTopics(); }) - .recover(err -> { - log.error("[connectMqtt] 连接MQTT Broker失败: {}", err.getMessage()); + .recover(error -> { + log.error("[connectMqtt][连接MQTT Broker失败:]", error); reconnectWithDelay(); - return Future.failedFuture(err); + return Future.failedFuture(error); }); } /** * 订阅设备上行消息主题 * - * @return 订阅结果的Future + * @return 订阅结果的 Future */ private Future subscribeToTopics() { String[] topics = emqxProperties.getMqttTopics(); if (ArrayUtil.isEmpty(topics)) { - log.warn("[subscribeToTopics] 未配置MQTT主题,跳过订阅"); + log.warn("[subscribeToTopics][未配置MQTT主题,跳过订阅]"); return Future.succeededFuture(); } - log.info("[subscribeToTopics] 开始订阅设备上行消息主题"); + log.info("[subscribeToTopics][开始订阅设备上行消息主题]"); Future compositeFuture = Future.succeededFuture(); for (String topic : topics) { @@ -186,11 +180,11 @@ public class IotDeviceUpstreamServer { } compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value()) .map(ack -> { - log.info("[subscribeToTopics] 成功订阅主题: {}", trimmedTopic); + log.info("[subscribeToTopics][成功订阅主题: {}]", trimmedTopic); return null; }) - .recover(err -> { - log.error("[subscribeToTopics] 订阅主题失败: {}, 原因: {}", trimmedTopic, err.getMessage()); + .recover(error -> { + log.error("[subscribeToTopics][订阅主题失败: {}]", trimmedTopic, error); return Future.succeededFuture(); // 继续订阅其他主题 })); } @@ -202,10 +196,10 @@ public class IotDeviceUpstreamServer { */ public void stop() { if (!isRunning) { - log.warn("[stop] 服务未运行,无需停止"); + log.warn("[stop][服务未运行,无需停止]"); return; } - log.info("[stop] 开始关闭服务"); + log.info("[stop][开始关闭服务]"); isRunning = false; try { @@ -224,14 +218,14 @@ public class IotDeviceUpstreamServer { .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS) .whenComplete((result, error) -> { if (error != null) { - log.error("[stop] 服务关闭过程中发生异常", error); + log.error("[stop][服务关闭过程中发生异常]", error); } else { - log.info("[stop] 所有服务关闭完成"); + log.info("[stop][所有服务关闭完成]"); } }); } catch (Exception e) { - log.error("[stop] 关闭服务异常", e); - throw new RuntimeException("关闭IoT设备上行服务失败", e); + log.error("[stop][关闭服务异常]", e); + throw new RuntimeException("关闭 IoT 设备上行服务失败", e); } } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java index fcb2286158..e9206d5b64 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java @@ -15,7 +15,7 @@ import java.util.Collections; /** * IoT EMQX 连接认证的 Vert.x Handler * - * EMQX HTTP + * 参考:EMQX HTTP * * 注意:该处理器需要返回特定格式:{"result": "allow"} 或 {"result": "deny"}, * 以符合 EMQX 认证插件的要求,因此不使用 IotStandardResponse 实体类 @@ -31,7 +31,6 @@ public class IotDeviceAuthVertxHandler implements Handler { private final IotDeviceUpstreamApi deviceUpstreamApi; @Override - @SuppressWarnings("unchecked") public void handle(RoutingContext routingContext) { try { // 构建认证请求 DTO diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java index 6cf8d84c5c..00fa1b96d7 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java @@ -23,7 +23,7 @@ import java.util.Map; /** * IoT 设备 MQTT 消息处理器 * - * 参考:"设备属性、事件、服务"> + * 参考:设备属性、事件、服务 */ @Slf4j public class IotDeviceMqttMessageHandler { diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceWebhookVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceWebhookVertxHandler.java index 93fb01bc0a..21b49e097c 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceWebhookVertxHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceWebhookVertxHandler.java @@ -18,7 +18,7 @@ import java.util.Collections; /** * IoT EMQX Webhook 事件处理的 Vert.x Handler * - * EMQX Webhook + * 参考:EMQX Webhook * * 注意:该处理器需要返回特定格式:{"result": "success"} 或 {"result": "error"}, * 以符合 EMQX Webhook 插件的要求,因此不使用 IotStandardResponse 实体类。 @@ -51,8 +51,7 @@ public class IotDeviceWebhookVertxHandler implements Handler { handleClientDisconnected(clientId, username); break; default: - log.info("[handle][未处理的 Webhook 事件] event={}, clientId={}, username={}", event, clientId, - username); + log.info("[handle][未处理的 Webhook 事件] event={}, clientId={}, username={}", event, clientId, username); break; } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/config/IotHttpVertxPlugin.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/config/IotHttpVertxPlugin.java index 674980d005..f704c18443 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/config/IotHttpVertxPlugin.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/config/IotHttpVertxPlugin.java @@ -23,10 +23,8 @@ public class IotHttpVertxPlugin extends SpringPlugin { public void start() { log.info("[HttpVertxPlugin][HttpVertxPlugin 插件启动开始...]"); try { - // 1. 获取插件上下文 ApplicationContext pluginContext = getApplicationContext(); Assert.notNull(pluginContext, "pluginContext 不能为空"); - log.info("[HttpVertxPlugin][HttpVertxPlugin 插件启动成功...]"); } catch (Exception e) { log.error("[HttpVertxPlugin][HttpVertxPlugin 插件开启动异常...]", e); @@ -43,6 +41,7 @@ public class IotHttpVertxPlugin extends SpringPlugin { } } + // TODO @芋艿:思考下,未来要不要。。。 @Override protected ApplicationContext createApplicationContext() { // 创建插件自己的 ApplicationContext @@ -52,6 +51,7 @@ public class IotHttpVertxPlugin extends SpringPlugin { // 继续使用插件自己的 ClassLoader 以加载插件内部的类 pluginContext.setClassLoader(getWrapper().getPluginClassLoader()); // 扫描当前插件的自动配置包 + // TODO @芋艿:后续看看,怎么配置类包 pluginContext.scan("cn.iocoder.yudao.module.iot.plugin.http.config"); pluginContext.refresh(); return pluginContext; diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceUpstreamVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceUpstreamVertxHandler.java index f6c7cc3a27..79d465ea03 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceUpstreamVertxHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceUpstreamVertxHandler.java @@ -34,6 +34,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC @Slf4j public class IotDeviceUpstreamVertxHandler implements Handler { + // TODO @haohao:要不要类似 IotDeviceConfigSetVertxHandler 写的,把这些 PATH、METHOD 之类的抽走 /** * 属性上报路径 */ @@ -49,6 +50,7 @@ public class IotDeviceUpstreamVertxHandler implements Handler { private final IotDeviceUpstreamApi deviceUpstreamApi; + // TODO @haohao:要不要分成多个 Handler?每个只解决一个问题哈。 @Override public void handle(RoutingContext routingContext) { String path = routingContext.request().path(); @@ -102,7 +104,6 @@ public class IotDeviceUpstreamVertxHandler implements Handler { IotPluginCommonUtils.writeJsonResponse(routingContext, response); } catch (Exception e) { log.error("[handle][处理上行请求异常] path={}", path, e); - // 构建错误响应 String method = path.contains("/property/") ? PROPERTY_METHOD : EVENT_METHOD_PREFIX + (routingContext.pathParams().containsKey("identifier") ? routingContext.pathParam("identifier") @@ -115,27 +116,28 @@ public class IotDeviceUpstreamVertxHandler implements Handler { /** * 更新设备状态 * - * @param productKey 产品Key + * @param productKey 产品 Key * @param deviceName 设备名称 */ private void updateDeviceState(String productKey, String deviceName) { - deviceUpstreamApi.updateDeviceState(((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setState(IotDeviceStateEnum.ONLINE.getState())); + deviceUpstreamApi.updateDeviceState(((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO() + .setRequestId(IdUtil.fastSimpleUUID()).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()) + .setProductKey(productKey).setDeviceName(deviceName)).setState(IotDeviceStateEnum.ONLINE.getState())); } /** * 解析属性上报请求 * - * @param productKey 产品Key + * @param productKey 产品 Key * @param deviceName 设备名称 - * @param requestId 请求ID + * @param requestId 请求 ID * @param body 请求体 - * @return 属性上报请求DTO + * @return 属性上报请求 DTO */ @SuppressWarnings("unchecked") private IotDevicePropertyReportReqDTO parsePropertyReportRequest(String productKey, String deviceName, String requestId, JsonObject body) { // 按照标准 JSON 格式处理属性数据 Map properties = new HashMap<>(); - // 优先使用 params 字段,符合标准 Map params = body.getJsonObject("params") != null ? body.getJsonObject("params").getMap() : null; if (params != null) { // 将标准格式的 params 转换为平台需要的 properties 格式 @@ -153,24 +155,25 @@ public class IotDeviceUpstreamVertxHandler implements Handler { } // 构建属性上报请求 DTO - return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setProperties(properties); + return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId) + .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()) + .setProductKey(productKey).setDeviceName(deviceName)).setProperties(properties); } /** * 解析事件上报请求 * - * @param productKey 产品Key + * @param productKey 产品K ey * @param deviceName 设备名称 * @param identifier 事件标识符 - * @param requestId 请求ID + * @param requestId 请求 ID * @param body 请求体 - * @return 事件上报请求DTO + * @return 事件上报请求 DTO */ private IotDeviceEventReportReqDTO parseEventReportRequest(String productKey, String deviceName, String identifier, String requestId, JsonObject body) { - // 按照标准JSON格式处理事件参数 + // 按照标准 JSON 格式处理事件参数 Map params; - // 优先使用params字段,符合标准 - if (body.getJsonObject("params") != null) { + if (body.containsKey("params")) { params = body.getJsonObject("params").getMap(); } else { // 兼容旧格式 @@ -178,6 +181,8 @@ public class IotDeviceUpstreamVertxHandler implements Handler { } // 构建事件上报请求 DTO - return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setIdentifier(identifier).setParams(params); + return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId) + .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()) + .setProductKey(productKey).setDeviceName(deviceName)).setIdentifier(identifier).setParams(params); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/pom.xml b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/pom.xml index e007596dc0..f1fba50590 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/pom.xml @@ -14,6 +14,7 @@ yudao-module-iot-plugin-mqtt ${project.artifactId} + 物联网 插件模块 - mqtt 插件 diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java index 54ff31f36b..7883fa8b12 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.pf4j.Plugin; import org.pf4j.PluginWrapper; +// TODO @芋艿:暂未实现 @Slf4j public class MqttPlugin extends Plugin { diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java index 868d238ee9..dd0c5da372 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-mqtt/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java @@ -21,6 +21,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +// TODO @芋艿:暂未实现 /** * 根据官方示例,整合常见 MQTT 功能到 PF4J 的 Extension 类中 */