From 6639d371328a51bdc70ea43e09d74f35f3d6af0e Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 17 Mar 2025 18:50:12 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E8=AF=84=E5=AE=A1?= =?UTF-8?q?=E3=80=91IoT=EF=BC=9A=E6=95=B4=E4=BD=93=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/iot/job/rule/IotRuleSceneJob.java | 3 +- .../service/device/IotDeviceGroupService.java | 3 +- .../device/data/IotDeviceLogServiceImpl.java | 5 +- .../upstream/IotDeviceUpstreamServer.java | 53 ++++++++++--------- 4 files changed, 36 insertions(+), 28 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/rule/IotRuleSceneJob.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/rule/IotRuleSceneJob.java index 2cda2fc20b..594f9ef0b0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/rule/IotRuleSceneJob.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/rule/IotRuleSceneJob.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.job.rule; +import cn.hutool.core.map.MapUtil; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerTypeEnum; import cn.iocoder.yudao.module.iot.service.rule.IotRuleSceneService; import jakarta.annotation.Resource; @@ -41,7 +42,7 @@ public class IotRuleSceneJob extends QuartzJobBean { * @return JobData Map */ public static Map buildJobDataMap(Long ruleSceneId) { - return Map.of(JOB_DATA_KEY_RULE_SCENE_ID, ruleSceneId); + return MapUtil.of(JOB_DATA_KEY_RULE_SCENE_ID, ruleSceneId); } /** diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceGroupService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceGroupService.java index 45e6ab25ef..5d074adb55 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceGroupService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceGroupService.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.iot.service.device; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.collection.ListUtil; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.group.IotDeviceGroupPageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.group.IotDeviceGroupSaveReqVO; @@ -48,7 +49,7 @@ public interface IotDeviceGroupService { */ default List validateDeviceGroupExists(Collection ids) { if (CollUtil.isEmpty(ids)) { - return List.of(); + return ListUtil.empty(); } return convertList(ids, this::validateDeviceGroupExists); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDeviceLogServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDeviceLogServiceImpl.java index 1df4d4cd44..2ed2312bbe 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDeviceLogServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDeviceLogServiceImpl.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.iot.service.device.data; import cn.hutool.core.date.LocalDateTimeUtil; +import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.pojo.PageResult; @@ -87,7 +88,7 @@ public class IotDeviceLogServiceImpl implements IotDeviceLogService { Long timeMillis = timestamp.getTime(); // 消息数量转换 Integer count = ((Number) map.get("data")).intValue(); - return Map.of(timeMillis, count); + return MapUtil.of(timeMillis, count); }) .collect(Collectors.toList()); } @@ -103,7 +104,7 @@ public class IotDeviceLogServiceImpl implements IotDeviceLogService { Long timeMillis = timestamp.getTime(); // 消息数量转换 Integer count = ((Number) map.get("data")).intValue(); - return Map.of(timeMillis, count); + return MapUtil.of(timeMillis, count); }) .collect(Collectors.toList()); } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java index 8911a76a80..00792ebcf9 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java @@ -85,11 +85,12 @@ public class IotDeviceUpstreamServer { } log.info("[start][开始启动服务]"); + // TODO @haohao:建议先启动 MQTT Broker,再启动 HTTP Server。类似 jdbc 先连接了,在启动 tomcat 的味道 // 1. 启动 HTTP 服务器 CompletableFuture httpFuture = server.listen(emqxProperties.getAuthPort()) .toCompletionStage() .toCompletableFuture() - .thenAccept(v -> log.info("[start][HTTP服务器启动完成,端口: {}]", server.actualPort())); + .thenAccept(v -> log.info("[start][HTTP 服务器启动完成,端口: {}]", server.actualPort())); // 2. 连接 MQTT Broker CompletableFuture mqttFuture = connectMqtt() @@ -98,16 +99,16 @@ public class IotDeviceUpstreamServer { .thenAccept(v -> { // 2.1 添加 MQTT 断开重连监听器 client.closeHandler(closeEvent -> { - log.warn("[closeHandler][MQTT连接已断开,准备重连]"); + log.warn("[closeHandler][MQTT 连接已断开,准备重连]"); reconnectWithDelay(); }); - // 2. 设置 MQTT 消息处理器 + // 2.2 设置 MQTT 消息处理器 setupMessageHandler(); }); // 3. 等待所有服务启动完成 CompletableFuture.allOf(httpFuture, mqttFuture) - .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS) // TODO @芋艿:JDK8 不兼容 .whenComplete((result, error) -> { if (error != null) { log.error("[start][服务启动失败]", error); @@ -123,7 +124,7 @@ public class IotDeviceUpstreamServer { */ private void setupMessageHandler() { client.publishHandler(mqttMessageHandler::handle); - log.debug("[setupMessageHandler][MQTT消息处理器设置完成]"); + log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]"); } /** @@ -203,26 +204,30 @@ public class IotDeviceUpstreamServer { isRunning = false; try { - CompletableFuture serverFuture = server != null - ? server.close().toCompletionStage().toCompletableFuture() - : CompletableFuture.completedFuture(null); - CompletableFuture clientFuture = client != null - ? client.disconnect().toCompletionStage().toCompletableFuture() - : CompletableFuture.completedFuture(null); - CompletableFuture vertxFuture = vertx != null - ? vertx.close().toCompletionStage().toCompletableFuture() - : CompletableFuture.completedFuture(null); + // 关闭 HTTP 服务器 + if (server != null) { + server.close() + .toCompletionStage() + .toCompletableFuture() + .join(); + } - // 等待所有资源关闭 - CompletableFuture.allOf(serverFuture, clientFuture, vertxFuture) - .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS) - .whenComplete((result, error) -> { - if (error != null) { - log.error("[stop][服务关闭过程中发生异常]", error); - } else { - log.info("[stop][所有服务关闭完成]"); - } - }); + // 关闭 MQTT 客户端 + if (client != null) { + client.disconnect() + .toCompletionStage() + .toCompletableFuture() + .join(); + } + + // 关闭 Vertx 实例 + if (vertx!= null) { + vertx.close() + .toCompletionStage() + .toCompletableFuture() + .join(); + } + log.info("[stop][关闭完成]"); } catch (Exception e) { log.error("[stop][关闭服务异常]", e); throw new RuntimeException("关闭 IoT 设备上行服务失败", e);