From 02c3aa748b8ed45be1c438cdc91563422b388606 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Fri, 30 May 2025 22:34:43 +0800 Subject: [PATCH] =?UTF-8?q?reactor=EF=BC=9A=E3=80=90IoT=20=E7=89=A9?= =?UTF-8?q?=E8=81=94=E7=BD=91=E3=80=91=E6=B8=85=E7=90=86=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/api/device/IotDeviceUpstreamApi.java | 10 - .../IotPluginInstanceHeartbeatReqDTO.java | 44 --- .../api/device/IoTDeviceUpstreamApiImpl.java | 10 - .../plugin/IotPluginInstanceService.java | 8 - .../plugin/IotPluginInstanceServiceImpl.java | 48 --- ...otNetComponentCommonAutoConfiguration.java | 36 +- .../IotNetComponentCommonProperties.java | 34 -- .../IotDeviceDownstreamHandler.java | 55 ---- .../downstream/IotDeviceDownstreamServer.java | 80 ----- .../IotNetComponentInstanceHeartbeatJob.java | 111 ------- .../heartbeat/IotNetComponentRegistry.java | 98 ------ .../upstream/IotDeviceUpstreamClient.java | 4 - .../main/resources/META-INF/spring.factories | 2 - .../IotNetComponentEmqxAutoConfiguration.java | 53 +-- .../IotDeviceDownstreamHandlerImpl.java | 253 +++++++------- .../upstream/IotDeviceUpstreamServer.java | 6 +- .../IotNetComponentHttpAutoConfiguration.java | 12 - .../IotDeviceDownstreamHandlerImpl.java | 90 +++-- .../IotNetComponentServerConfiguration.java | 43 --- .../IotNetComponentServerProperties.java | 8 +- .../IotComponentDownstreamHandlerImpl.java | 65 ---- .../IotComponentDownstreamServer.java | 310 ------------------ .../heartbeat/IotComponentHeartbeatJob.java | 98 ------ .../upstream/IotComponentUpstreamClient.java | 8 +- .../src/main/resources/application.yml | 5 - 25 files changed, 168 insertions(+), 1323 deletions(-) delete mode 100644 yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotPluginInstanceHeartbeatReqDTO.java delete mode 100644 yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/downstream/IotDeviceDownstreamHandler.java delete mode 100644 yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/downstream/IotDeviceDownstreamServer.java delete mode 100644 yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/heartbeat/IotNetComponentInstanceHeartbeatJob.java delete mode 100644 yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/heartbeat/IotNetComponentRegistry.java delete mode 100644 yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/resources/META-INF/spring.factories delete mode 100644 yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamHandlerImpl.java delete mode 100644 yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamServer.java delete mode 100644 yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/heartbeat/IotComponentHeartbeatJob.java diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java index e88706ac59..9bc20f7c70 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java @@ -80,14 +80,4 @@ public interface IotDeviceUpstreamApi { @PostMapping(PREFIX + "/authenticate-emqx-connection") CommonResult authenticateEmqxConnection(@Valid @RequestBody IotDeviceEmqxAuthReqDTO authReqDTO); - // ========== 插件相关 ========== - - /** - * 心跳插件实例 - * - * @param heartbeatReqDTO 心跳插件实例 DTO - */ - @PostMapping(PREFIX + "/heartbeat-plugin-instance") - CommonResult heartbeatPluginInstance(@Valid @RequestBody IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO); - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotPluginInstanceHeartbeatReqDTO.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotPluginInstanceHeartbeatReqDTO.java deleted file mode 100644 index 9125b5f242..0000000000 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotPluginInstanceHeartbeatReqDTO.java +++ /dev/null @@ -1,44 +0,0 @@ -package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream; - -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import lombok.Data; - -/** - * IoT 插件实例心跳 Request DTO - * - * @author 芋道源码 - */ -@Data -public class IotPluginInstanceHeartbeatReqDTO { - - /** - * 请求编号 - */ - @NotEmpty(message = "请求编号不能为空") - private String processId; - - /** - * 插件包标识符 - */ - @NotEmpty(message = "插件包标识符不能为空") - private String pluginKey; - - /** - * 插件实例所在 IP - */ - @NotEmpty(message = "插件实例所在 IP 不能为空") - private String hostIp; - /** - * 插件实例的进程编号 - */ - @NotNull(message = "插件实例的进程编号不能为空") - private Integer downstreamPort; - - /** - * 是否在线 - */ - @NotNull(message = "是否在线不能为空") - private Boolean online; - -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java index 9f637a6bee..6672fcf734 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java @@ -21,8 +21,6 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi { @Resource private IotDeviceUpstreamService deviceUpstreamService; - @Resource - private IotPluginInstanceService pluginInstanceService; // ========== 设备相关 ========== @@ -68,12 +66,4 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi { return success(result); } - // ========== 插件相关 ========== - - @Override - public CommonResult heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) { - pluginInstanceService.heartbeatPluginInstance(heartbeatReqDTO); - return success(true); - } - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceService.java index 56e1bf0f08..49351930fc 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceService.java @@ -1,6 +1,5 @@ package cn.iocoder.yudao.module.iot.service.plugin; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginConfigDO; import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO; import org.springframework.web.multipart.MultipartFile; @@ -14,13 +13,6 @@ import java.time.LocalDateTime; */ public interface IotPluginInstanceService { - /** - * 心跳插件实例 - * - * @param heartbeatReqDTO 心跳插件实例 DTO - */ - void heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO); - /** * 离线超时插件实例 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceServiceImpl.java index 14912edff7..ead0fa86d3 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceServiceImpl.java @@ -2,8 +2,6 @@ package cn.iocoder.yudao.module.iot.service.plugin; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginConfigDO; import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO; import cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInstanceMapper; @@ -11,7 +9,6 @@ import cn.iocoder.yudao.module.iot.dal.redis.plugin.DevicePluginProcessIdRedisDA import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; import org.springframework.web.multipart.MultipartFile; @@ -31,10 +28,6 @@ import java.util.concurrent.TimeUnit; @Slf4j public class IotPluginInstanceServiceImpl implements IotPluginInstanceService { - @Resource - @Lazy // 延迟加载,避免循环依赖 - private IotPluginConfigService pluginConfigService; - @Resource private IotPluginInstanceMapper pluginInstanceMapper; @@ -47,47 +40,6 @@ public class IotPluginInstanceServiceImpl implements IotPluginInstanceService { @Value("${pf4j.pluginsDir}") private String pluginsDir; - @Override - public void heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) { - // 情况一:已存在,则进行更新 - IotPluginInstanceDO instance = TenantUtils.executeIgnore( - () -> pluginInstanceMapper.selectByProcessId(heartbeatReqDTO.getProcessId())); - if (instance != null) { - IotPluginInstanceDO.IotPluginInstanceDOBuilder updateObj = IotPluginInstanceDO.builder().id(instance.getId()) - .hostIp(heartbeatReqDTO.getHostIp()).downstreamPort(heartbeatReqDTO.getDownstreamPort()) - .online(heartbeatReqDTO.getOnline()).heartbeatTime(LocalDateTime.now()); - if (Boolean.TRUE.equals(heartbeatReqDTO.getOnline())) { - if (Boolean.FALSE.equals(instance.getOnline())) { // 当前处于离线时,才需要更新上线时间 - updateObj.onlineTime(LocalDateTime.now()); - } - } else { - updateObj.offlineTime(LocalDateTime.now()); - } - TenantUtils.execute(instance.getTenantId(), - () -> pluginInstanceMapper.updateById(updateObj.build())); - return; - } - - // 情况二:不存在,则创建 - IotPluginConfigDO info = TenantUtils.executeIgnore( - () -> pluginConfigService.getPluginConfigByPluginKey(heartbeatReqDTO.getPluginKey())); - if (info == null) { - log.error("[heartbeatPluginInstance][心跳({}) 对应的插件不存在]", heartbeatReqDTO); - return; - } - IotPluginInstanceDO.IotPluginInstanceDOBuilder insertObj = IotPluginInstanceDO.builder() - .pluginId(info.getId()).processId(heartbeatReqDTO.getProcessId()) - .hostIp(heartbeatReqDTO.getHostIp()).downstreamPort(heartbeatReqDTO.getDownstreamPort()) - .online(heartbeatReqDTO.getOnline()).heartbeatTime(LocalDateTime.now()); - if (Boolean.TRUE.equals(heartbeatReqDTO.getOnline())) { - insertObj.onlineTime(LocalDateTime.now()); - } else { - insertObj.offlineTime(LocalDateTime.now()); - } - TenantUtils.execute(info.getTenantId(), - () -> pluginInstanceMapper.insert(insertObj.build())); - } - @Override public int offlineTimeoutPluginInstance(LocalDateTime maxHeartbeatTime) { List list = pluginInstanceMapper.selectListByHeartbeatTimeLt(maxHeartbeatTime); diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonAutoConfiguration.java index 5208c1e66f..714b39e647 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonAutoConfiguration.java @@ -1,19 +1,11 @@ package cn.iocoder.yudao.module.iot.net.component.core.config; -import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; -import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamServer; -import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentInstanceHeartbeatJob; -import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry; import cn.iocoder.yudao.module.iot.net.component.core.upstream.IotDeviceUpstreamClient; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableScheduling; -// TODO @haohao:应该不用写 spring.factories 拉,因为被 imports 替代啦 /** * IoT 网络组件的通用自动配置类 * @@ -24,33 +16,6 @@ import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling // 开启定时任务,因为 IotNetComponentInstanceHeartbeatJob 是一个定时任务 public class IotNetComponentCommonAutoConfiguration { - /** - * 创建 EMQX 设备下行服务器 - *

- * 当 yudao.iot.component.emqx.enabled = true 时,优先使用 emqxDeviceDownstreamHandler - */ - @Bean - @ConditionalOnProperty(prefix = "yudao.iot.component.emqx", name = "enabled", havingValue = "true") - public IotDeviceDownstreamServer emqxDeviceDownstreamServer( - IotNetComponentCommonProperties properties, - @Qualifier("emqxDeviceDownstreamHandler") IotDeviceDownstreamHandler deviceDownstreamHandler) { - return new IotDeviceDownstreamServer(properties, deviceDownstreamHandler); - } - - /** - * 创建网络组件实例心跳任务 - */ - @Bean(initMethod = "init", destroyMethod = "stop") - public IotNetComponentInstanceHeartbeatJob pluginInstanceHeartbeatJob( - IotDeviceUpstreamApi deviceUpstreamApi, - IotNetComponentCommonProperties commonProperties, - IotNetComponentRegistry componentRegistry) { - return new IotNetComponentInstanceHeartbeatJob( - deviceUpstreamApi, - commonProperties, - componentRegistry); - } - /** * 创建设备上行客户端 */ @@ -58,4 +23,5 @@ public class IotNetComponentCommonAutoConfiguration { public IotDeviceUpstreamClient deviceUpstreamClient() { return new IotDeviceUpstreamClient(); } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonProperties.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonProperties.java index a4fb09e609..99312994f8 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonProperties.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonProperties.java @@ -21,38 +21,4 @@ public class IotNetComponentCommonProperties { */ private String pluginKey; - /** - * 组件实例心跳超时时间,单位:毫秒 - *

- * 默认值:30 秒 - */ - private Long instanceHeartbeatTimeout = 30000L; - - /** - * 网络组件消息转发配置 - */ - // private ForwardMessage forwardMessage = new ForwardMessage(); - - /** - * 消息转发配置 - */ - /* - * @Data - * public static class ForwardMessage { - * - * /** - * 是否转发所有设备消息到 EMQX - *

- * 默认为 true 开启 - */ - // private boolean forwardAllDeviceMessageToEmqx = true; - - /** - * 是否转发所有设备消息到 HTTP - *

- * 默认为 false 关闭 - */ - // private boolean forwardAllDeviceMessageToHttp = false; - // } - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/downstream/IotDeviceDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/downstream/IotDeviceDownstreamHandler.java deleted file mode 100644 index e69e4c41d4..0000000000 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/downstream/IotDeviceDownstreamHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -package cn.iocoder.yudao.module.iot.net.component.core.downstream; - -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; - -/** - * IoT 设备下行处理器 - *

- * 目的:每个 plugin 需要实现,用于处理 server 下行的指令(请求),从而实现从 server => plugin => device 的下行流程 - * - * @author 芋道源码 - */ -public interface IotDeviceDownstreamHandler { - - /** - * 调用设备服务 - * - * @param invokeReqDTO 调用设备服务的请求 - * @return 是否成功 - */ - CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO); - - /** - * 获取设备属性 - * - * @param getReqDTO 获取设备属性的请求 - * @return 是否成功 - */ - CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO); - - /** - * 设置设备属性 - * - * @param setReqDTO 设置设备属性的请求 - * @return 是否成功 - */ - CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO); - - /** - * 设置设备配置 - * - * @param setReqDTO 设置设备配置的请求 - * @return 是否成功 - */ - CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO); - - /** - * 升级设备 OTA - * - * @param upgradeReqDTO 升级设备 OTA 的请求 - * @return 是否成功 - */ - CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO); - -} diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/downstream/IotDeviceDownstreamServer.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/downstream/IotDeviceDownstreamServer.java deleted file mode 100644 index 1f58eb2ed2..0000000000 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/downstream/IotDeviceDownstreamServer.java +++ /dev/null @@ -1,80 +0,0 @@ -package cn.iocoder.yudao.module.iot.net.component.core.downstream; - -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; -import cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonProperties; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 设备下行服务,直接转发给 device 设备 - * - * @author 芋道源码 - */ -@Slf4j -@RequiredArgsConstructor -public class IotDeviceDownstreamServer { - - private final IotNetComponentCommonProperties properties; - private final IotDeviceDownstreamHandler deviceDownstreamHandler; - - /** - * 调用设备服务 - * - * @param invokeReqDTO 调用设备服务的请求 - * @return 是否成功 - */ - public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) { - return deviceDownstreamHandler.invokeDeviceService(invokeReqDTO); - } - - /** - * 获取设备属性 - * - * @param getReqDTO 获取设备属性的请求 - * @return 是否成功 - */ - public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) { - return deviceDownstreamHandler.getDeviceProperty(getReqDTO); - } - - /** - * 设置设备属性 - * - * @param setReqDTO 设置设备属性的请求 - * @return 是否成功 - */ - public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) { - return deviceDownstreamHandler.setDeviceProperty(setReqDTO); - } - - /** - * 设置设备配置 - * - * @param setReqDTO 设置设备配置的请求 - * @return 是否成功 - */ - public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) { - return deviceDownstreamHandler.setDeviceConfig(setReqDTO); - } - - /** - * 升级设备 OTA - * - * @param upgradeReqDTO 升级设备 OTA 的请求 - * @return 是否成功 - */ - public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) { - return deviceDownstreamHandler.upgradeDeviceOta(upgradeReqDTO); - } - - /** - * 获得内部组件标识 - * - * @return 组件标识 - */ - public String getComponentId() { - return properties.getPluginKey(); - } - -} diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/heartbeat/IotNetComponentInstanceHeartbeatJob.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/heartbeat/IotNetComponentInstanceHeartbeatJob.java deleted file mode 100644 index 395b765b0f..0000000000 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/heartbeat/IotNetComponentInstanceHeartbeatJob.java +++ /dev/null @@ -1,111 +0,0 @@ -package cn.iocoder.yudao.module.iot.net.component.core.heartbeat; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.system.SystemUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; -import cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonProperties; -import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry.IotNetComponentInfo; -import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.Scheduled; - -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -/** - * IoT 网络组件实例心跳定时任务 - *

- * 将组件的状态,定时上报给 server 服务器 - * - * @author haohao - */ -@RequiredArgsConstructor -@Slf4j -public class IotNetComponentInstanceHeartbeatJob { - - private final IotDeviceUpstreamApi deviceUpstreamApi; - private final IotNetComponentCommonProperties commonProperties; - private final IotNetComponentRegistry componentRegistry; - - /** - * 初始化方法,由 Spring 调用:注册当前组件并发送上线心跳 - */ - public void init() { - // 发送所有组件的上线心跳 - Collection components = componentRegistry.getAllComponents(); - if (CollUtil.isEmpty(components)) { - return; - } - for (IotNetComponentInfo component : components) { - try { - CommonResult result = deviceUpstreamApi.heartbeatPluginInstance( - buildPluginInstanceHeartbeatReqDTO(component, true)); - log.info("[init][组件({})上线结果:{}]", component.getPluginKey(), result); - } catch (Exception e) { - log.error("[init][组件({})上线发送异常]", component.getPluginKey(), e); - } - } - } - - /** - * 停止方法,由 Spring 调用:发送下线心跳并注销组件 - */ - public void stop() { - // 发送所有组件的下线心跳 - Collection components = componentRegistry.getAllComponents(); - if (CollUtil.isEmpty(components)) { - return; - } - for (IotNetComponentInfo component : components) { - try { - CommonResult result = deviceUpstreamApi.heartbeatPluginInstance( - buildPluginInstanceHeartbeatReqDTO(component, false)); - log.info("[stop][组件({})下线结果:{}]", component.getPluginKey(), result); - } catch (Exception e) { - log.error("[stop][组件({})下线发送异常]", component.getPluginKey(), e); - } - } - - // 注销当前组件 - componentRegistry.unregisterComponent(commonProperties.getPluginKey()); - } - - /** - * 定时发送心跳 - */ - @Scheduled(initialDelay = 1, fixedRate = 1, timeUnit = TimeUnit.MINUTES) // 1 分钟执行一次 - public void execute() { - // 发送所有组件的心跳 - Collection components = componentRegistry.getAllComponents(); - if (CollUtil.isEmpty(components)) { - return; - } - for (IotNetComponentInfo component : components) { - try { - CommonResult result = deviceUpstreamApi.heartbeatPluginInstance( - buildPluginInstanceHeartbeatReqDTO(component, true)); - log.info("[execute][组件({})心跳结果:{}]", component.getPluginKey(), result); - } catch (Exception e) { - log.error("[execute][组件({})心跳发送异常]", component.getPluginKey(), e); - } - } - } - - /** - * 构建心跳 DTO - * - * @param component 组件信息 - * @param online 是否在线 - * @return 心跳 DTO - */ - private IotPluginInstanceHeartbeatReqDTO buildPluginInstanceHeartbeatReqDTO(IotNetComponentInfo component, - Boolean online) { - return new IotPluginInstanceHeartbeatReqDTO() - .setPluginKey(component.getPluginKey()).setProcessId(component.getProcessId()) - .setHostIp(component.getHostIp()).setDownstreamPort(component.getDownstreamPort()) - .setOnline(online); - } -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/heartbeat/IotNetComponentRegistry.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/heartbeat/IotNetComponentRegistry.java deleted file mode 100644 index ce8f4de66e..0000000000 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/heartbeat/IotNetComponentRegistry.java +++ /dev/null @@ -1,98 +0,0 @@ -package cn.iocoder.yudao.module.iot.net.component.core.heartbeat; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.map.MapUtil; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * IoT 网络组件注册表 - *

- * 用于管理多个网络组件的注册信息,解决多组件心跳问题 - * - * @author haohao - */ -@Component -@Slf4j -public class IotNetComponentRegistry { - - /** - * 网络组件信息 - */ - @Data - public static class IotNetComponentInfo { - - /** - * 组件 Key - */ - private final String pluginKey; - - /** - * 主机 IP - */ - private final String hostIp; - - /** - * 下游端口 - */ - private final Integer downstreamPort; - - /** - * 进程 ID - */ - private final String processId; - } - - /** - * 组件映射表:key 为组件 Key - */ - private final Map components = new ConcurrentHashMap<>(); - - /** - * 注册网络组件 - * - * @param pluginKey 组件 Key - * @param hostIp 主机 IP - * @param downstreamPort 下游端口 - * @param processId 进程 ID - */ - public void registerComponent(String pluginKey, String hostIp, Integer downstreamPort, String processId) { - log.info("[registerComponent][注册网络组件, pluginKey={}, hostIp={}, downstreamPort={}, processId={}]", - pluginKey, hostIp, downstreamPort, processId); - components.put(pluginKey, new IotNetComponentInfo(pluginKey, hostIp, downstreamPort, processId)); - } - - /** - * 注销网络组件 - * - * @param pluginKey 组件 Key - */ - public void unregisterComponent(String pluginKey) { - log.info("[unregisterComponent][注销网络组件, pluginKey={}]", pluginKey); - components.remove(pluginKey); - } - - /** - * 获取所有网络组件 - * - * @return 所有组件集合 - */ - public Collection getAllComponents() { - return CollUtil.isEmpty(components) ? CollUtil.newArrayList() : components.values(); - } - - /** - * 获取指定网络组件 - * - * @param pluginKey 组件 Key - * @return 组件信息 - */ - public IotNetComponentInfo getComponent(String pluginKey) { - return MapUtil.isEmpty(components) ? null : components.get(pluginKey); - } -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/upstream/IotDeviceUpstreamClient.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/upstream/IotDeviceUpstreamClient.java index 6364f5c72d..07315c2c83 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/upstream/IotDeviceUpstreamClient.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/upstream/IotDeviceUpstreamClient.java @@ -53,8 +53,4 @@ public class IotDeviceUpstreamClient implements IotDeviceUpstreamApi { return deviceUpstreamApi.reportDeviceProperty(reportReqDTO); } - @Override - public CommonResult heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) { - return deviceUpstreamApi.heartbeatPluginInstance(heartbeatReqDTO); - } } diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/resources/META-INF/spring.factories b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 1fb7cb13a7..0000000000 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,2 +0,0 @@ -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ - cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonAutoConfiguration \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/config/IotNetComponentEmqxAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/config/IotNetComponentEmqxAutoConfiguration.java index a20daf2518..68b10ee17e 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/config/IotNetComponentEmqxAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/config/IotNetComponentEmqxAutoConfiguration.java @@ -2,13 +2,7 @@ package cn.iocoder.yudao.module.iot.net.component.emqx.config; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjUtil; -import cn.hutool.system.SystemUtil; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonProperties; -import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; -import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry; -import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils; -import cn.iocoder.yudao.module.iot.net.component.emqx.downstream.IotDeviceDownstreamHandlerImpl; import cn.iocoder.yudao.module.iot.net.component.emqx.upstream.IotDeviceUpstreamServer; import io.vertx.core.Vertx; import io.vertx.mqtt.MqttClient; @@ -37,16 +31,6 @@ import org.springframework.context.event.EventListener; @Slf4j public class IotNetComponentEmqxAutoConfiguration { - /** - * 组件 key - */ - private static final String PLUGIN_KEY = "emqx"; - - // TODO @haohao:这个是不是要去掉哈。 - public IotNetComponentEmqxAutoConfiguration() { - // 构造函数中不输出日志,移到 initialize 方法中 - } - /** * 初始化 EMQX 组件 * @@ -57,21 +41,7 @@ public class IotNetComponentEmqxAutoConfiguration { log.info("[IotNetComponentEmqxAutoConfiguration][开始初始化]"); // 从应用上下文中获取需要的 Bean - IotNetComponentRegistry componentRegistry = event.getApplicationContext() - .getBean(IotNetComponentRegistry.class); - IotNetComponentCommonProperties commonProperties = event.getApplicationContext() - .getBean(IotNetComponentCommonProperties.class); - - // 设置当前组件的核心标识 - // 注意:这里只为当前 EMQX 组件设置 pluginKey,不影响其他组件 - commonProperties.setPluginKey(PLUGIN_KEY); - - // 将 EMQX 组件注册到组件注册表 - componentRegistry.registerComponent( - PLUGIN_KEY, - SystemUtil.getHostInfo().getAddress(), - 0, // 内嵌模式固定为 0 - IotNetComponentCommonUtils.getProcessId()); + // TODO @芋艿:看看要不要监听下 log.info("[initialize][IoT EMQX 组件初始化完成]"); } @@ -89,15 +59,6 @@ public class IotNetComponentEmqxAutoConfiguration { */ @Bean public MqttClient mqttClient(@Qualifier("emqxVertx") Vertx vertx, IotNetComponentEmqxProperties emqxProperties) { - // 使用 debug 级别记录详细配置,减少生产环境日志 - if (log.isDebugEnabled()) { - log.debug("MQTT 配置: host={}, port={}, username={}, ssl={}", - emqxProperties.getMqttHost(), emqxProperties.getMqttPort(), - emqxProperties.getMqttUsername(), emqxProperties.getMqttSsl()); - } else { - log.info("MQTT 连接至: {}:{}", emqxProperties.getMqttHost(), emqxProperties.getMqttPort()); - } - MqttClientOptions options = new MqttClientOptions() .setClientId("yudao-iot-downstream-" + IdUtil.fastSimpleUUID()) .setUsername(emqxProperties.getMqttUsername()) @@ -115,16 +76,8 @@ public class IotNetComponentEmqxAutoConfiguration { IotDeviceUpstreamApi deviceUpstreamApi, IotNetComponentEmqxProperties emqxProperties, @Qualifier("emqxVertx") Vertx vertx, - MqttClient mqttClient, - IotNetComponentRegistry componentRegistry) { - return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi, vertx, mqttClient, componentRegistry); + MqttClient mqttClient) { + return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi, vertx, mqttClient); } - /** - * 创建设备下行处理器 - */ - @Bean(name = "emqxDeviceDownstreamHandler") - public IotDeviceDownstreamHandler deviceDownstreamHandler(MqttClient mqttClient) { - return new IotDeviceDownstreamHandlerImpl(mqttClient); - } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/downstream/IotDeviceDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/downstream/IotDeviceDownstreamHandlerImpl.java index 7dfcc4535a..d8e91a676f 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/downstream/IotDeviceDownstreamHandlerImpl.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/downstream/IotDeviceDownstreamHandlerImpl.java @@ -1,136 +1,121 @@ package cn.iocoder.yudao.module.iot.net.component.emqx.downstream; -import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; -import cn.iocoder.yudao.module.iot.net.component.core.constants.IotDeviceTopicEnum; -import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; -import cn.iocoder.yudao.module.iot.net.component.core.message.IotMqttMessage; -import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils; -import io.netty.handler.codec.mqtt.MqttQoS; -import io.vertx.core.buffer.Buffer; -import io.vertx.mqtt.MqttClient; -import lombok.extern.slf4j.Slf4j; - -import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.MQTT_TOPIC_ILLEGAL; - -/** - * EMQX 网络组件的 {@link IotDeviceDownstreamHandler} 实现类 - * - * @author 芋道源码 - */ -@Slf4j -public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler { - - /** - * MQTT 客户端 - */ - private final MqttClient mqttClient; - - /** - * 构造函数 - * - * @param mqttClient MQTT 客户端 - */ - public IotDeviceDownstreamHandlerImpl(MqttClient mqttClient) { - this.mqttClient = mqttClient; - } - - @Override - public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO reqDTO) { - log.info("[invokeService][开始调用设备服务][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); - - // 验证参数 - if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null || reqDTO.getIdentifier() == null) { - log.error("[invokeService][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); - return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); - } - - try { - // 构建请求主题 - String topic = IotDeviceTopicEnum.buildServiceTopic(reqDTO.getProductKey(), reqDTO.getDeviceName(), - reqDTO.getIdentifier()); - - // 构建请求消息 - String requestId = StrUtil.isNotEmpty(reqDTO.getRequestId()) ? reqDTO.getRequestId() - : IotNetComponentCommonUtils.generateRequestId(); - IotMqttMessage message = IotMqttMessage.createServiceInvokeMessage( - requestId, reqDTO.getIdentifier(), reqDTO.getParams()); - - // 发送消息 - publishMessage(topic, message.toJsonObject()); - - log.info("[invokeService][调用设备服务成功][requestId: {}][topic: {}]", requestId, topic); - return CommonResult.success(true); - } catch (Exception e) { - log.error("[invokeService][调用设备服务异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e); - return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); - } - } - - @Override - public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) { - // 暂未实现,返回成功 - return CommonResult.success(true); - } - - @Override - public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO reqDTO) { - log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); - - // 验证参数 - if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null) { - log.error("[setProperty][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); - return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); - } - - try { - // 构建请求主题 - String topic = IotDeviceTopicEnum.buildPropertySetTopic(reqDTO.getProductKey(), reqDTO.getDeviceName()); - - // 构建请求消息 - String requestId = StrUtil.isNotEmpty(reqDTO.getRequestId()) ? reqDTO.getRequestId() - : IotNetComponentCommonUtils.generateRequestId(); - IotMqttMessage message = IotMqttMessage.createPropertySetMessage(requestId, reqDTO.getProperties()); - - // 发送消息 - publishMessage(topic, message.toJsonObject()); - - log.info("[setProperty][设置设备属性成功][requestId: {}][topic: {}]", requestId, topic); - return CommonResult.success(true); - } catch (Exception e) { - log.error("[setProperty][设置设备属性异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e); - return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); - } - } - - @Override - public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) { - // 暂未实现,返回成功 - return CommonResult.success(true); - } - - @Override - public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) { - // 暂未实现,返回成功 - return CommonResult.success(true); - } - - /** - * 发布 MQTT 消息 - * - * @param topic 主题 - * @param payload 消息内容 - */ - private void publishMessage(String topic, JSONObject payload) { - mqttClient.publish( - topic, - Buffer.buffer(payload.toString()), - MqttQoS.AT_LEAST_ONCE, - false, - false); - log.info("[publishMessage][发送消息成功][topic: {}][payload: {}]", topic, payload); - } -} \ No newline at end of file +// TODO @芋艿:后续再支持下;@haohao;改成消费者 +///** +// * EMQX 网络组件的 {@link IotDeviceDownstreamHandler} 实现类 +// * +// * @author 芋道源码 +// */ +//@Slf4j +//public class IotDeviceDownstreamHandlerImpl { +// +// /** +// * MQTT 客户端 +// */ +// private final MqttClient mqttClient; +// +// /** +// * 构造函数 +// * +// * @param mqttClient MQTT 客户端 +// */ +// public IotDeviceDownstreamHandlerImpl(MqttClient mqttClient) { +// this.mqttClient = mqttClient; +// } +// +// @Override +// public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO reqDTO) { +// log.info("[invokeService][开始调用设备服务][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); +// +// // 验证参数 +// if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null || reqDTO.getIdentifier() == null) { +// log.error("[invokeService][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); +// return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); +// } +// +// try { +// // 构建请求主题 +// String topic = IotDeviceTopicEnum.buildServiceTopic(reqDTO.getProductKey(), reqDTO.getDeviceName(), +// reqDTO.getIdentifier()); +// +// // 构建请求消息 +// String requestId = StrUtil.isNotEmpty(reqDTO.getRequestId()) ? reqDTO.getRequestId() +// : IotNetComponentCommonUtils.generateRequestId(); +// IotMqttMessage message = IotMqttMessage.createServiceInvokeMessage( +// requestId, reqDTO.getIdentifier(), reqDTO.getParams()); +// +// // 发送消息 +// publishMessage(topic, message.toJsonObject()); +// +// log.info("[invokeService][调用设备服务成功][requestId: {}][topic: {}]", requestId, topic); +// return CommonResult.success(true); +// } catch (Exception e) { +// log.error("[invokeService][调用设备服务异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e); +// return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); +// } +// } +// +// @Override +// public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) { +// // 暂未实现,返回成功 +// return CommonResult.success(true); +// } +// +// @Override +// public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO reqDTO) { +// log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); +// +// // 验证参数 +// if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null) { +// log.error("[setProperty][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); +// return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); +// } +// +// try { +// // 构建请求主题 +// String topic = IotDeviceTopicEnum.buildPropertySetTopic(reqDTO.getProductKey(), reqDTO.getDeviceName()); +// +// // 构建请求消息 +// String requestId = StrUtil.isNotEmpty(reqDTO.getRequestId()) ? reqDTO.getRequestId() +// : IotNetComponentCommonUtils.generateRequestId(); +// IotMqttMessage message = IotMqttMessage.createPropertySetMessage(requestId, reqDTO.getProperties()); +// +// // 发送消息 +// publishMessage(topic, message.toJsonObject()); +// +// log.info("[setProperty][设置设备属性成功][requestId: {}][topic: {}]", requestId, topic); +// return CommonResult.success(true); +// } catch (Exception e) { +// log.error("[setProperty][设置设备属性异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e); +// return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); +// } +// } +// +// @Override +// public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) { +// // 暂未实现,返回成功 +// return CommonResult.success(true); +// } +// +// @Override +// public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) { +// // 暂未实现,返回成功 +// return CommonResult.success(true); +// } +// +// /** +// * 发布 MQTT 消息 +// * +// * @param topic 主题 +// * @param payload 消息内容 +// */ +// private void publishMessage(String topic, JSONObject payload) { +// mqttClient.publish( +// topic, +// Buffer.buffer(payload.toString()), +// MqttQoS.AT_LEAST_ONCE, +// false, +// false); +// log.info("[publishMessage][发送消息成功][topic: {}][payload: {}]", topic, payload); +// } +//} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/upstream/IotDeviceUpstreamServer.java index 71aee5847b..6b83aa3795 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/upstream/IotDeviceUpstreamServer.java @@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.iot.net.component.emqx.upstream; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry; import cn.iocoder.yudao.module.iot.net.component.emqx.config.IotNetComponentEmqxProperties; import cn.iocoder.yudao.module.iot.net.component.emqx.upstream.router.IotDeviceAuthVertxHandler; import cn.iocoder.yudao.module.iot.net.component.emqx.upstream.router.IotDeviceMqttMessageHandler; @@ -40,7 +39,6 @@ public class IotDeviceUpstreamServer { private final MqttClient client; private final IotNetComponentEmqxProperties emqxProperties; private final IotDeviceMqttMessageHandler mqttMessageHandler; - private final IotNetComponentRegistry componentRegistry; /** * 服务运行状态标志 @@ -50,12 +48,10 @@ public class IotDeviceUpstreamServer { public IotDeviceUpstreamServer(IotNetComponentEmqxProperties emqxProperties, IotDeviceUpstreamApi deviceUpstreamApi, Vertx vertx, - MqttClient client, - IotNetComponentRegistry componentRegistry) { + MqttClient client) { this.vertx = vertx; this.emqxProperties = emqxProperties; this.client = client; - this.componentRegistry = componentRegistry; // 创建 Router 实例 Router router = Router.router(vertx); diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/config/IotNetComponentHttpAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/config/IotNetComponentHttpAutoConfiguration.java index 686c0e25aa..d65a5025e5 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/config/IotNetComponentHttpAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/config/IotNetComponentHttpAutoConfiguration.java @@ -5,8 +5,6 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; -import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; -import cn.iocoder.yudao.module.iot.net.component.http.downstream.IotDeviceDownstreamHandlerImpl; import cn.iocoder.yudao.module.iot.net.component.http.upstream.IotDeviceUpstreamServer; import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; @@ -89,14 +87,4 @@ public class IotNetComponentHttpAutoConfiguration { return new IotDeviceUpstreamServer(vertx, properties, deviceUpstreamApi, deviceMessageProducer); } - /** - * 创建设备下行处理器 - * - * @return 设备下行处理器 - */ - @Bean(name = "httpDeviceDownstreamHandler") - public IotDeviceDownstreamHandler deviceDownstreamHandler() { - return new IotDeviceDownstreamHandlerImpl(); - } - } diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/downstream/IotDeviceDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/downstream/IotDeviceDownstreamHandlerImpl.java index ed26bc02fa..f0994036f5 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/downstream/IotDeviceDownstreamHandlerImpl.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/downstream/IotDeviceDownstreamHandlerImpl.java @@ -1,50 +1,44 @@ package cn.iocoder.yudao.module.iot.net.component.http.downstream; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; -import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; -import lombok.extern.slf4j.Slf4j; - -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.NOT_IMPLEMENTED; - -/** - * HTTP 网络组件的 {@link IotDeviceDownstreamHandler} 实现类 - *

- * 但是:由于设备通过 HTTP 短链接接入,导致其实无法下行指导给 device 设备,所以基本都是直接返回失败!!! - * 类似 MQTT、WebSocket、TCP 网络组件,是可以实现下行指令的。 - * - * @author 芋道源码 - */ -@Slf4j -public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler { - - /** - * 不支持的错误消息 - */ - private static final String NOT_SUPPORTED_MSG = "HTTP 不支持设备下行通信"; - - @Override - public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) { - return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); - } - - @Override - public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) { - return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); - } - - @Override - public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) { - return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); - } - - @Override - public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) { - return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); - } - - @Override - public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) { - return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); - } -} +// TODO @芋艿:实现下; +///** +// * HTTP 网络组件的 {@link IotDeviceDownstreamHandler} 实现类 +// *

+// * 但是:由于设备通过 HTTP 短链接接入,导致其实无法下行指导给 device 设备,所以基本都是直接返回失败!!! +// * 类似 MQTT、WebSocket、TCP 网络组件,是可以实现下行指令的。 +// * +// * @author 芋道源码 +// */ +//@Slf4j +//public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler { +// +// /** +// * 不支持的错误消息 +// */ +// private static final String NOT_SUPPORTED_MSG = "HTTP 不支持设备下行通信"; +// +// @Override +// public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) { +// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); +// } +// +// @Override +// public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) { +// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); +// } +// +// @Override +// public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) { +// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); +// } +// +// @Override +// public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) { +// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); +// } +// +// @Override +// public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) { +// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG); +// } +//} diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerConfiguration.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerConfiguration.java index abec49908d..33fd957993 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerConfiguration.java @@ -1,10 +1,6 @@ package cn.iocoder.yudao.module.iot.net.component.server.config; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; -import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamHandlerImpl; -import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamServer; -import cn.iocoder.yudao.module.iot.net.component.server.heartbeat.IotComponentHeartbeatJob; import cn.iocoder.yudao.module.iot.net.component.server.upstream.IotComponentUpstreamClient; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -54,45 +50,6 @@ public class IotNetComponentServerConfiguration { return new IotComponentUpstreamClient(properties, restTemplate); } - /** - * 配置设备下行处理器 - * - * @return 下行处理器 - */ - @Bean - @Primary - public IotDeviceDownstreamHandler deviceDownstreamHandler() { - return new IotComponentDownstreamHandlerImpl(); - } - - /** - * 配置下行服务器 - * - * @param properties 配置 - * @param downstreamHandler 下行处理器 - * @return 下行服务器 - */ - @Bean(initMethod = "start", destroyMethod = "stop") - public IotComponentDownstreamServer deviceDownstreamServer(IotNetComponentServerProperties properties, - @org.springframework.beans.factory.annotation.Qualifier("deviceDownstreamHandler") IotDeviceDownstreamHandler downstreamHandler) { - return new IotComponentDownstreamServer(properties, downstreamHandler); - } - - /** - * 配置心跳任务 - * - * @param deviceUpstreamApi 上行接口 - * @param downstreamServer 下行服务器 - * @param properties 配置 - * @return 心跳任务 - */ - @Bean(initMethod = "init", destroyMethod = "stop") - public IotComponentHeartbeatJob heartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi, - IotComponentDownstreamServer downstreamServer, - IotNetComponentServerProperties properties) { - return new IotComponentHeartbeatJob(deviceUpstreamApi, downstreamServer, properties); - } - /** * 配置默认的设备上行客户端,避免在独立运行模式下的循环依赖问题 * diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerProperties.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerProperties.java index 7b641debda..bb5a9731c9 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerProperties.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerProperties.java @@ -47,10 +47,4 @@ public class IotNetComponentServerProperties { */ private String serverKey = "yudao-module-iot-net-component-server"; - /** - * 心跳发送频率,单位:毫秒 - *

- * 默认:30 秒 - */ - private Long heartbeatInterval = 30000L; -} \ No newline at end of file +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamHandlerImpl.java deleted file mode 100644 index c6509ada10..0000000000 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamHandlerImpl.java +++ /dev/null @@ -1,65 +0,0 @@ -package cn.iocoder.yudao.module.iot.net.component.server.downstream; - -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; -import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; -import lombok.extern.slf4j.Slf4j; - -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.SUCCESS; - -/** - * 网络组件下行处理器实现 - *

- * 处理来自主程序的设备控制指令 - * - * @author haohao - */ -@Slf4j -public class IotComponentDownstreamHandlerImpl implements IotDeviceDownstreamHandler { - - @Override - public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) { - log.info("[invokeDeviceService][收到服务调用请求:{}]", invokeReqDTO); - // 在这里处理服务调用,可以根据设备类型转发到对应的处理器 - // 如 MQTT 设备、HTTP 设备等的具体实现 - - // 这里仅作为示例,实际应根据接入的组件进行转发 - return CommonResult.success(true); - } - - @Override - public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) { - log.info("[getDeviceProperty][收到属性获取请求:{}]", getReqDTO); - // 在这里处理属性获取请求 - - // 这里仅作为示例,实际应根据接入的组件进行转发 - return CommonResult.success(true); - } - - @Override - public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) { - log.info("[setDeviceProperty][收到属性设置请求:{}]", setReqDTO); - // 在这里处理属性设置请求 - - // 这里仅作为示例,实际应根据接入的组件进行转发 - return CommonResult.success(true); - } - - @Override - public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) { - log.info("[setDeviceConfig][收到配置设置请求:{}]", setReqDTO); - // 在这里处理配置设置请求 - - // 这里仅作为示例,实际应根据接入的组件进行转发 - return CommonResult.success(true); - } - - @Override - public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) { - log.info("[upgradeDeviceOta][收到OTA升级请求:{}]", upgradeReqDTO); - // 在这里处理OTA升级请求 - - // 这里仅作为示例,实际应根据接入的组件进行转发 - return CommonResult.success(true); - } -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamServer.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamServer.java deleted file mode 100644 index 388a50bdfb..0000000000 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamServer.java +++ /dev/null @@ -1,310 +0,0 @@ -package cn.iocoder.yudao.module.iot.net.component.server.downstream; - -import cn.hutool.core.util.IdUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; -import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; -import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties; -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpServer; -import io.vertx.core.json.Json; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.Router; -import io.vertx.ext.web.RoutingContext; -import io.vertx.ext.web.handler.BodyHandler; -import lombok.extern.slf4j.Slf4j; - -import java.util.List; -import java.util.Map; - -/** - * 组件下行服务器,接收来自主程序的控制指令 - * - * @author haohao - */ -@Slf4j -public class IotComponentDownstreamServer { - - public static final String SERVICE_INVOKE_PATH = "/sys/:productKey/:deviceName/thing/service/:identifier"; - public static final String PROPERTY_SET_PATH = "/sys/:productKey/:deviceName/thing/service/property/set"; - public static final String PROPERTY_GET_PATH = "/sys/:productKey/:deviceName/thing/service/property/get"; - public static final String CONFIG_SET_PATH = "/sys/:productKey/:deviceName/thing/service/config/set"; - public static final String OTA_UPGRADE_PATH = "/sys/:productKey/:deviceName/thing/service/ota/upgrade"; - - private final Vertx vertx; - private final HttpServer server; - private final IotNetComponentServerProperties properties; - private final IotDeviceDownstreamHandler downstreamHandler; - - public IotComponentDownstreamServer(IotNetComponentServerProperties properties, - IotDeviceDownstreamHandler downstreamHandler) { - this.properties = properties; - this.downstreamHandler = downstreamHandler; - // 创建 Vertx 实例 - this.vertx = Vertx.vertx(); - // 创建 Router 实例 - Router router = Router.router(vertx); - router.route().handler(BodyHandler.create()); // 处理 Body - - // 服务调用路由 - router.post(SERVICE_INVOKE_PATH).handler(this::handleServiceInvoke); - // 属性设置路由 - router.post(PROPERTY_SET_PATH).handler(this::handlePropertySet); - // 属性获取路由 - router.post(PROPERTY_GET_PATH).handler(this::handlePropertyGet); - // 配置设置路由 - router.post(CONFIG_SET_PATH).handler(this::handleConfigSet); - // OTA 升级路由 - router.post(OTA_UPGRADE_PATH).handler(this::handleOtaUpgrade); - - // 创建 HttpServer 实例 - this.server = vertx.createHttpServer().requestHandler(router); - } - - /** - * 启动服务器 - */ - public void start() { - log.info("[start][开始启动下行服务器]"); - server.listen(properties.getDownstreamPort()) - .toCompletionStage() - .toCompletableFuture() - .join(); - log.info("[start][下行服务器启动完成,端口({})]", server.actualPort()); - } - - /** - * 停止服务器 - */ - public void stop() { - log.info("[stop][开始关闭下行服务器]"); - try { - // 关闭 HTTP 服务器 - if (server != null) { - server.close() - .toCompletionStage() - .toCompletableFuture() - .join(); - } - - // 关闭 Vertx 实例 - if (vertx != null) { - vertx.close() - .toCompletionStage() - .toCompletableFuture() - .join(); - } - log.info("[stop][下行服务器关闭完成]"); - } catch (Exception e) { - log.error("[stop][下行服务器关闭异常]", e); - throw new RuntimeException(e); - } - } - - /** - * 获取服务器端口 - * - * @return 端口号 - */ - public int getPort() { - return server.actualPort(); - } - - /** - * 处理服务调用请求 - */ - private void handleServiceInvoke(RoutingContext ctx) { - try { - // 解析路径参数 - String productKey = ctx.pathParam("productKey"); - String deviceName = ctx.pathParam("deviceName"); - String identifier = ctx.pathParam("identifier"); - - // 解析请求体 - JsonObject body = ctx.body().asJsonObject(); - String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); - Object params = body.getMap().get("params"); - - // 创建请求对象 - IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO(); - reqDTO.setRequestId(requestId); - reqDTO.setProductKey(productKey); - reqDTO.setDeviceName(deviceName); - reqDTO.setIdentifier(identifier); - reqDTO.setParams((Map) params); - - // 调用处理器 - CommonResult result = downstreamHandler.invokeDeviceService(reqDTO); - - // 响应结果 - ctx.response() - .putHeader("Content-Type", "application/json") - .end(Json.encode(result)); - } catch (Exception e) { - log.error("[handleServiceInvoke][处理服务调用请求失败]", e); - ctx.response() - .setStatusCode(500) - .putHeader("Content-Type", "application/json") - .end(Json.encode(CommonResult.error(500, "处理服务调用请求失败:" + e.getMessage()))); - } - } - - /** - * 处理属性设置请求 - */ - private void handlePropertySet(RoutingContext ctx) { - try { - // 解析路径参数 - String productKey = ctx.pathParam("productKey"); - String deviceName = ctx.pathParam("deviceName"); - - // 解析请求体 - JsonObject body = ctx.body().asJsonObject(); - String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); - Object properties = body.getMap().get("properties"); - - // 创建请求对象 - IotDevicePropertySetReqDTO reqDTO = new IotDevicePropertySetReqDTO(); - reqDTO.setRequestId(requestId); - reqDTO.setProductKey(productKey); - reqDTO.setDeviceName(deviceName); - reqDTO.setProperties((Map) properties); - - // 调用处理器 - CommonResult result = downstreamHandler.setDeviceProperty(reqDTO); - - // 响应结果 - ctx.response() - .putHeader("Content-Type", "application/json") - .end(Json.encode(result)); - } catch (Exception e) { - log.error("[handlePropertySet][处理属性设置请求失败]", e); - ctx.response() - .setStatusCode(500) - .putHeader("Content-Type", "application/json") - .end(Json.encode(CommonResult.error(500, "处理属性设置请求失败:" + e.getMessage()))); - } - } - - /** - * 处理属性获取请求 - */ - private void handlePropertyGet(RoutingContext ctx) { - try { - // 解析路径参数 - String productKey = ctx.pathParam("productKey"); - String deviceName = ctx.pathParam("deviceName"); - - // 解析请求体 - JsonObject body = ctx.body().asJsonObject(); - String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); - Object identifiers = body.getMap().get("identifiers"); - - // 创建请求对象 - IotDevicePropertyGetReqDTO reqDTO = new IotDevicePropertyGetReqDTO(); - reqDTO.setRequestId(requestId); - reqDTO.setProductKey(productKey); - reqDTO.setDeviceName(deviceName); - reqDTO.setIdentifiers((List) identifiers); - - // 调用处理器 - CommonResult result = downstreamHandler.getDeviceProperty(reqDTO); - - // 响应结果 - ctx.response() - .putHeader("Content-Type", "application/json") - .end(Json.encode(result)); - } catch (Exception e) { - log.error("[handlePropertyGet][处理属性获取请求失败]", e); - ctx.response() - .setStatusCode(500) - .putHeader("Content-Type", "application/json") - .end(Json.encode(CommonResult.error(500, "处理属性获取请求失败:" + e.getMessage()))); - } - } - - /** - * 处理配置设置请求 - */ - private void handleConfigSet(RoutingContext ctx) { - try { - // 解析路径参数 - String productKey = ctx.pathParam("productKey"); - String deviceName = ctx.pathParam("deviceName"); - - // 解析请求体 - JsonObject body = ctx.body().asJsonObject(); - String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); - Object config = body.getMap().get("config"); - - // 创建请求对象 - IotDeviceConfigSetReqDTO reqDTO = new IotDeviceConfigSetReqDTO(); - reqDTO.setRequestId(requestId); - reqDTO.setProductKey(productKey); - reqDTO.setDeviceName(deviceName); - reqDTO.setConfig((Map) config); - - // 调用处理器 - CommonResult result = downstreamHandler.setDeviceConfig(reqDTO); - - // 响应结果 - ctx.response() - .putHeader("Content-Type", "application/json") - .end(Json.encode(result)); - } catch (Exception e) { - log.error("[handleConfigSet][处理配置设置请求失败]", e); - ctx.response() - .setStatusCode(500) - .putHeader("Content-Type", "application/json") - .end(Json.encode(CommonResult.error(500, "处理配置设置请求失败:" + e.getMessage()))); - } - } - - /** - * 处理 OTA 升级请求 - */ - private void handleOtaUpgrade(RoutingContext ctx) { - try { - // 解析路径参数 - String productKey = ctx.pathParam("productKey"); - String deviceName = ctx.pathParam("deviceName"); - - // 解析请求体 - JsonObject body = ctx.body().asJsonObject(); - String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); - Object data = body.getMap().get("data"); - - // 创建请求对象 - IotDeviceOtaUpgradeReqDTO reqDTO = new IotDeviceOtaUpgradeReqDTO(); - reqDTO.setRequestId(requestId); - reqDTO.setProductKey(productKey); - reqDTO.setDeviceName(deviceName); - - // 数据采用 IotDeviceOtaUpgradeReqDTO.build 方法转换 - if (data instanceof Map) { - IotDeviceOtaUpgradeReqDTO builtDTO = IotDeviceOtaUpgradeReqDTO.build((Map) data); - reqDTO.setFirmwareId(builtDTO.getFirmwareId()); - reqDTO.setVersion(builtDTO.getVersion()); - reqDTO.setSignMethod(builtDTO.getSignMethod()); - reqDTO.setFileSign(builtDTO.getFileSign()); - reqDTO.setFileSize(builtDTO.getFileSize()); - reqDTO.setFileUrl(builtDTO.getFileUrl()); - reqDTO.setInformation(builtDTO.getInformation()); - } - - // 调用处理器 - CommonResult result = downstreamHandler.upgradeDeviceOta(reqDTO); - - // 响应结果 - ctx.response() - .putHeader("Content-Type", "application/json") - .end(Json.encode(result)); - } catch (Exception e) { - log.error("[handleOtaUpgrade][处理OTA升级请求失败]", e); - ctx.response() - .setStatusCode(500) - .putHeader("Content-Type", "application/json") - .end(Json.encode(CommonResult.error(500, "处理OTA升级请求失败:" + e.getMessage()))); - } - } -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/heartbeat/IotComponentHeartbeatJob.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/heartbeat/IotComponentHeartbeatJob.java deleted file mode 100644 index 624d8f1ba8..0000000000 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/heartbeat/IotComponentHeartbeatJob.java +++ /dev/null @@ -1,98 +0,0 @@ -package cn.iocoder.yudao.module.iot.net.component.server.heartbeat; - -import cn.hutool.system.SystemUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; -import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties; -import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamServer; -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -// TODO @haohao:有办法服用 yudao-module-iot-net-component-core 的么?就是 server,只是一个启动器,没什么特殊的功能; -/** - * IoT 组件心跳任务 - *

- * 定期向主程序发送心跳,报告组件服务状态 - * - * @author haohao - */ -@Slf4j -public class IotComponentHeartbeatJob { - - private final IotDeviceUpstreamApi deviceUpstreamApi; - private final IotComponentDownstreamServer downstreamServer; - private final IotNetComponentServerProperties properties; - private ScheduledExecutorService executorService; - - public IotComponentHeartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi, - IotComponentDownstreamServer downstreamServer, - IotNetComponentServerProperties properties) { - this.deviceUpstreamApi = deviceUpstreamApi; - this.downstreamServer = downstreamServer; - this.properties = properties; - } - - /** - * 初始化心跳任务 - */ - public void init() { - log.info("[init][开始初始化心跳任务]"); - // 创建一个单线程的调度线程池 - executorService = new ScheduledThreadPoolExecutor(1); - // 延迟 5 秒后开始执行,避免服务刚启动就发送心跳 - executorService.scheduleAtFixedRate(this::sendHeartbeat, - 5000, properties.getHeartbeatInterval(), TimeUnit.MILLISECONDS); - log.info("[init][心跳任务初始化完成]"); - } - - /** - * 停止心跳任务 - */ - public void stop() { - log.info("[stop][开始停止心跳任务]"); - if (executorService != null) { - executorService.shutdown(); - executorService = null; - } - log.info("[stop][心跳任务已停止]"); - } - - /** - * 发送心跳 - */ - private void sendHeartbeat() { - try { - // 创建心跳请求 - IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO = new IotPluginInstanceHeartbeatReqDTO(); - // 设置插件标识 - heartbeatReqDTO.setPluginKey(properties.getServerKey()); - // 设置进程ID - heartbeatReqDTO.setProcessId(String.valueOf(ProcessHandle.current().pid())); - // 设置IP和端口 - try { - String hostIp = SystemUtil.getHostInfo().getAddress(); - heartbeatReqDTO.setHostIp(hostIp); - heartbeatReqDTO.setDownstreamPort(downstreamServer.getPort()); - } catch (Exception e) { - log.warn("[sendHeartbeat][获取本地主机信息异常]", e); - } - // 设置在线状态 - heartbeatReqDTO.setOnline(true); - - // 发送心跳 - CommonResult result = deviceUpstreamApi.heartbeatPluginInstance(heartbeatReqDTO); - if (result != null && result.isSuccess()) { - log.debug("[sendHeartbeat][发送心跳成功:{}]", heartbeatReqDTO); - } else { - log.error("[sendHeartbeat][发送心跳失败:{}, 结果:{}]", heartbeatReqDTO, result); - } - } catch (Exception e) { - log.error("[sendHeartbeat][发送心跳异常]", e); - } - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/upstream/IotComponentUpstreamClient.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/upstream/IotComponentUpstreamClient.java index f39c1d0a35..53ea8f15b7 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/upstream/IotComponentUpstreamClient.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/upstream/IotComponentUpstreamClient.java @@ -69,12 +69,6 @@ public class IotComponentUpstreamClient implements IotDeviceUpstreamApi { return doPost(url, reportReqDTO); } - @Override - public CommonResult heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) { - String url = properties.getUpstreamUrl() + URL_PREFIX + "/heartbeat-plugin-instance"; - return doPost(url, heartbeatReqDTO); - } - @SuppressWarnings("unchecked") private CommonResult doPost(String url, T requestBody) { try { @@ -87,4 +81,4 @@ public class IotComponentUpstreamClient implements IotDeviceUpstreamApi { return CommonResult.error(INTERNAL_SERVER_ERROR); } } -} \ No newline at end of file +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml index 76385c51fe..f1b104bb9c 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml @@ -17,8 +17,6 @@ yudao: base-package: cn.iocoder.yudao # 主项目包路径,确保正确 iot: component: - # 这里可以覆盖或添加 component-core 中的通用配置 - instance-heartbeat-timeout: 30000 # 心跳超时时间 # 网络组件服务器专用配置 server: @@ -33,9 +31,6 @@ yudao: # 组件服务唯一标识 server-key: yudao-module-iot-net-component-server - # 心跳频率,单位:毫秒 - heartbeat-interval: 30000 - # ==================================== # 针对引入的 HTTP 组件的配置 # ====================================