# Conflicts:
#	yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceGroupService.java
This commit is contained in:
YunaiV 2025-03-17 18:50:27 +08:00
commit 0f4f5b484b
2 changed files with 31 additions and 26 deletions

View File

@ -6,8 +6,8 @@ import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.group.IotDeviceGroupPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.group.IotDeviceGroupSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceGroupDO;
import jakarta.validation.Valid;
import javax.validation.Valid;
import java.util.Collection;
import java.util.List;
@ -49,7 +49,7 @@ public interface IotDeviceGroupService {
*/
default List<IotDeviceGroupDO> validateDeviceGroupExists(Collection<Long> ids) {
if (CollUtil.isEmpty(ids)) {
return ListUtil.of();
return ListUtil.empty();
}
return convertList(ids, this::validateDeviceGroupExists);
}

View File

@ -85,11 +85,12 @@ public class IotDeviceUpstreamServer {
}
log.info("[start][开始启动服务]");
// TODO @haohao建议先启动 MQTT Broker再启动 HTTP Server类似 jdbc 先连接了在启动 tomcat 的味道
// 1. 启动 HTTP 服务器
CompletableFuture<Void> httpFuture = server.listen(emqxProperties.getAuthPort())
.toCompletionStage()
.toCompletableFuture()
.thenAccept(v -> log.info("[start][HTTP服务器启动完成,端口: {}]", server.actualPort()));
.thenAccept(v -> log.info("[start][HTTP 服务器启动完成,端口: {}]", server.actualPort()));
// 2. 连接 MQTT Broker
CompletableFuture<Void> mqttFuture = connectMqtt()
@ -98,16 +99,16 @@ public class IotDeviceUpstreamServer {
.thenAccept(v -> {
// 2.1 添加 MQTT 断开重连监听器
client.closeHandler(closeEvent -> {
log.warn("[closeHandler][MQTT连接已断开,准备重连]");
log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
reconnectWithDelay();
});
// 2. 设置 MQTT 消息处理器
// 2.2 设置 MQTT 消息处理器
setupMessageHandler();
});
// 3. 等待所有服务启动完成
CompletableFuture.allOf(httpFuture, mqttFuture)
.orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS) // TODO @芋艿JDK8 不兼容
.whenComplete((result, error) -> {
if (error != null) {
log.error("[start][服务启动失败]", error);
@ -123,7 +124,7 @@ public class IotDeviceUpstreamServer {
*/
private void setupMessageHandler() {
client.publishHandler(mqttMessageHandler::handle);
log.debug("[setupMessageHandler][MQTT消息处理器设置完成]");
log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]");
}
/**
@ -203,26 +204,30 @@ public class IotDeviceUpstreamServer {
isRunning = false;
try {
CompletableFuture<Void> serverFuture = server != null
? server.close().toCompletionStage().toCompletableFuture()
: CompletableFuture.completedFuture(null);
CompletableFuture<Void> clientFuture = client != null
? client.disconnect().toCompletionStage().toCompletableFuture()
: CompletableFuture.completedFuture(null);
CompletableFuture<Void> vertxFuture = vertx != null
? vertx.close().toCompletionStage().toCompletableFuture()
: CompletableFuture.completedFuture(null);
// 关闭 HTTP 服务器
if (server != null) {
server.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
// 等待所有资源关闭
CompletableFuture.allOf(serverFuture, clientFuture, vertxFuture)
.orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.whenComplete((result, error) -> {
if (error != null) {
log.error("[stop][服务关闭过程中发生异常]", error);
} else {
log.info("[stop][所有服务关闭完成]");
}
});
// 关闭 MQTT 客户端
if (client != null) {
client.disconnect()
.toCompletionStage()
.toCompletableFuture()
.join();
}
// 关闭 Vertx 实例
if (vertx!= null) {
vertx.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
log.info("[stop][关闭完成]");
} catch (Exception e) {
log.error("[stop][关闭服务异常]", e);
throw new RuntimeException("关闭 IoT 设备上行服务失败", e);