From 7670ac19e5b4cb096653919d18b2343602b844c3 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Thu, 30 Jan 2025 23:36:18 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BB=A3=E7=A0=81=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E3=80=91IoT=EF=BC=9A=E5=A2=9E=E5=8A=A0=20plugin=20=E6=8F=92?= =?UTF-8?q?=E4=BB=B6=E7=9A=84=E5=BF=83=E8=B7=B3=E6=9C=BA=E5=88=B6=EF=BC=8C?= =?UTF-8?q?=E4=BB=A5=E5=8F=8A=20Job=20=E8=B6=85=E6=97=B6=E7=A6=BB=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/api/device/IotDeviceUpstreamApi.java | 13 ++ .../IotPluginInstanceHeartbeatReqDTO.java | 44 ++++++ .../api/device/IoTDeviceUpstreamApiImpl.java | 14 ++ .../plugin/IotPluginInstanceDO.java | 2 +- .../dal/mysql/plugin/IotPluginInfoMapper.java | 4 + .../mysql/plugin/IotPluginInstanceMapper.java | 17 ++- .../iot/job/plugin/IotPluginInstancesJob.java | 37 +++++ .../iot/job/plugin/PluginInstancesJob.java | 30 ---- .../service/plugin/IotPluginInfoService.java | 4 + .../plugin/IotPluginInfoServiceImpl.java | 6 + .../plugin/IotPluginInstanceService.java | 17 ++- .../plugin/IotPluginInstanceServiceImpl.java | 129 +++++++++--------- .../IotPluginCommonAutoConfiguration.java | 9 ++ .../IotPluginInstanceHeartbeatJob.java | 49 +++++++ .../upstream/IotDeviceUpstreamClient.java | 7 + .../src/main/resources/application.yaml | 2 + 16 files changed, 279 insertions(+), 105 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotPluginInstanceHeartbeatReqDTO.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/plugin/IotPluginInstancesJob.java delete mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/plugin/PluginInstancesJob.java create mode 100644 yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/heartbeta/IotPluginInstanceHeartbeatJob.java diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java index b1a59f6fe2..a6601d551b 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java @@ -4,6 +4,7 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; import cn.iocoder.yudao.module.iot.enums.ApiConstants; import jakarta.validation.Valid; import org.springframework.web.bind.annotation.PostMapping; @@ -20,6 +21,8 @@ public interface IotDeviceUpstreamApi { String PREFIX = ApiConstants.PREFIX + "/device/upstream"; + // ========== 设备相关 ========== + /** * 更新设备状态 * @@ -44,4 +47,14 @@ public interface IotDeviceUpstreamApi { @PostMapping(PREFIX + "/report-event") CommonResult reportDeviceEvent(@Valid @RequestBody IotDeviceEventReportReqDTO reportReqDTO); + // ========== 插件相关 ========== + + /** + * 心跳插件实例 + * + * @param heartbeatReqDTO 心跳插件实例 DTO + */ + @PostMapping(PREFIX + "/heartbeat-plugin-instance") + CommonResult heartbeatPluginInstance(@Valid @RequestBody IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO); + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotPluginInstanceHeartbeatReqDTO.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotPluginInstanceHeartbeatReqDTO.java new file mode 100644 index 0000000000..9125b5f242 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotPluginInstanceHeartbeatReqDTO.java @@ -0,0 +1,44 @@ +package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream; + +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * IoT 插件实例心跳 Request DTO + * + * @author 芋道源码 + */ +@Data +public class IotPluginInstanceHeartbeatReqDTO { + + /** + * 请求编号 + */ + @NotEmpty(message = "请求编号不能为空") + private String processId; + + /** + * 插件包标识符 + */ + @NotEmpty(message = "插件包标识符不能为空") + private String pluginKey; + + /** + * 插件实例所在 IP + */ + @NotEmpty(message = "插件实例所在 IP 不能为空") + private String hostIp; + /** + * 插件实例的进程编号 + */ + @NotNull(message = "插件实例的进程编号不能为空") + private Integer downstreamPort; + + /** + * 是否在线 + */ + @NotNull(message = "是否在线不能为空") + private Boolean online; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java index e86ec5a66f..6a437dfca0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java @@ -4,7 +4,9 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceUpstreamService; +import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.RestController; @@ -21,6 +23,10 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi { @Resource private IotDeviceUpstreamService deviceUpstreamService; + @Resource + private IotPluginInstanceService pluginInstanceService; + + // ========== 设备相关 ========== @Override public CommonResult updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) { @@ -40,4 +46,12 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi { return success(true); } + // ========== 插件相关 ========== + + @Override + public CommonResult heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) { + pluginInstanceService.heartbeatPluginInstance(heartbeatReqDTO); + return success(true); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/plugin/IotPluginInstanceDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/plugin/IotPluginInstanceDO.java index e03aa9df8a..1def801855 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/plugin/IotPluginInstanceDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/plugin/IotPluginInstanceDO.java @@ -67,6 +67,6 @@ public class IotPluginInstanceDO extends BaseDO { * * 目的:心路时间超过一定时间后,会被进行下线处理 */ - private Long heartbeatTime; + private LocalDateTime heartbeatTime; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/plugin/IotPluginInfoMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/plugin/IotPluginInfoMapper.java index 300339f77e..88058185f0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/plugin/IotPluginInfoMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/plugin/IotPluginInfoMapper.java @@ -25,4 +25,8 @@ public interface IotPluginInfoMapper extends BaseMapperX { .orderByAsc(IotPluginInfoDO::getId)); } + default IotPluginInfoDO selectByPluginKey(String pluginKey) { + return selectOne(IotPluginInfoDO::getPluginKey, pluginKey); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/plugin/IotPluginInstanceMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/plugin/IotPluginInstanceMapper.java index 3e47eabca0..9bd697dde9 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/plugin/IotPluginInstanceMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/plugin/IotPluginInstanceMapper.java @@ -1,18 +1,23 @@ package cn.iocoder.yudao.module.iot.dal.mysql.plugin; import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX; -import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX; import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import org.apache.ibatis.annotations.Mapper; +import java.time.LocalDateTime; +import java.util.List; + @Mapper public interface IotPluginInstanceMapper extends BaseMapperX { - // TODO @芋艿:方法名,重构 - default IotPluginInstanceDO selectByMainIdAndPluginId(String mainId, Long pluginId) { - return selectOne(new LambdaQueryWrapperX() - .eq(IotPluginInstanceDO::getProcessId, mainId) - .eq(IotPluginInstanceDO::getPluginId, pluginId)); + default IotPluginInstanceDO selectByProcessId(String processId) { + return selectOne(IotPluginInstanceDO::getProcessId, processId); + } + + default List selectListByHeartbeatTimeLt(LocalDateTime heartbeatTime) { + return selectList(new LambdaQueryWrapper() + .lt(IotPluginInstanceDO::getHeartbeatTime, heartbeatTime)); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/plugin/IotPluginInstancesJob.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/plugin/IotPluginInstancesJob.java new file mode 100644 index 0000000000..261af3e58a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/plugin/IotPluginInstancesJob.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.module.iot.job.plugin; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler; +import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.time.Duration; +import java.time.LocalDateTime; + +/** + * IoT 插件实例离线检查 Job + * + * @author 芋道源码 + */ +@Component +public class IotPluginInstancesJob implements JobHandler { + + /** + * 插件离线超时时间 + * + * TODO 芋艿:暂定 10 分钟,后续看要不要做配置 + */ + public static final Duration OFFLINE_TIMEOUT = Duration.ofMinutes(10); + + @Resource + private IotPluginInstanceService pluginInstanceService; + + @Override + public String execute(String param) { + int count = pluginInstanceService.offlineTimeoutPluginInstance( + LocalDateTime.now().minus(OFFLINE_TIMEOUT)); + return StrUtil.format("离线超时插件实例数量为: {}", count); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/plugin/PluginInstancesJob.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/plugin/PluginInstancesJob.java deleted file mode 100644 index ae6ed4271f..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/plugin/PluginInstancesJob.java +++ /dev/null @@ -1,30 +0,0 @@ -package cn.iocoder.yudao.module.iot.job.plugin; - -import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; -import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.concurrent.TimeUnit; - -// TODO 芋艿:后续再看看 -/** - * 插件实例 Job - * - * @author 芋道源码 - */ -@Component -public class PluginInstancesJob { - - @Resource - private IotPluginInstanceService pluginInstanceService; - - @Scheduled(initialDelay = 60, fixedRate = 60, timeUnit = TimeUnit.SECONDS) - public void updatePluginInstances() { - TenantUtils.executeIgnore(() -> { - pluginInstanceService.reportPluginInstances(); - }); - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInfoService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInfoService.java index 5ee7dbab15..3ac8680b8d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInfoService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInfoService.java @@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoSav import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO; import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum; import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; import org.springframework.web.multipart.MultipartFile; import java.util.List; @@ -85,4 +86,7 @@ public interface IotPluginInfoService { * @return 插件信息列表 */ List getPluginInfoListByStatus(Integer status); + + IotPluginInfoDO getPluginInfoByPluginKey(@NotEmpty(message = "插件包标识符不能为空") String pluginKey); + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInfoServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInfoServiceImpl.java index 258392b65c..c111da3267 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInfoServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInfoServiceImpl.java @@ -41,6 +41,7 @@ public class IotPluginInfoServiceImpl implements IotPluginInfoService { @Override public Long createPluginInfo(PluginInfoSaveReqVO createReqVO) { + // TODO @haohao:pluginKey 唯一值 IotPluginInfoDO pluginInfo = BeanUtils.toBean(createReqVO, IotPluginInfoDO.class); pluginInfoMapper.insert(pluginInfo); return pluginInfo.getId(); @@ -155,4 +156,9 @@ public class IotPluginInfoServiceImpl implements IotPluginInfoService { return pluginInfoMapper.selectListByStatus(status); } + @Override + public IotPluginInfoDO getPluginInfoByPluginKey(String pluginKey) { + return pluginInfoMapper.selectByPluginKey(pluginKey); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceService.java index 381bab1d49..482bad399b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceService.java @@ -1,8 +1,11 @@ package cn.iocoder.yudao.module.iot.service.plugin; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO; import org.springframework.web.multipart.MultipartFile; +import java.time.LocalDateTime; + /** * IoT 插件实例 Service 接口 * @@ -10,11 +13,19 @@ import org.springframework.web.multipart.MultipartFile; */ public interface IotPluginInstanceService { - // TODO @芋艿:这个是否应该放到 plugin 主动心跳,而是 server 自己心跳 /** - * 上报插件实例(心跳) + * 心跳插件实例 + * + * @param heartbeatReqDTO 心跳插件实例 DTO */ - void reportPluginInstances(); + void heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO); + + /** + * 离线超时插件实例 + * + * @param maxHeartbeatTime 最大心跳时间 + */ + int offlineTimeoutPluginInstance(LocalDateTime maxHeartbeatTime); /** * 停止并卸载插件 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceServiceImpl.java index 8973a33670..d851399645 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/IotPluginInstanceServiceImpl.java @@ -1,11 +1,10 @@ package cn.iocoder.yudao.module.iot.service.plugin; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.io.FileUtil; -import cn.hutool.core.net.NetUtil; -import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO; import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO; -import cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInfoMapper; import cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInstanceMapper; import cn.iocoder.yudao.module.iot.dal.redis.plugin.DevicePluginProcessIdRedisDAO; import cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants; @@ -16,18 +15,20 @@ import org.pf4j.PluginState; import org.pf4j.PluginWrapper; import org.pf4j.spring.SpringPluginManager; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; import org.springframework.web.multipart.MultipartFile; import java.io.File; import java.io.IOException; -import java.nio.file.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.time.LocalDateTime; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; @@ -41,12 +42,9 @@ import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionU @Slf4j public class IotPluginInstanceServiceImpl implements IotPluginInstanceService { - // TODO @haohao:mac@uuid - public static final String MAIN_ID = IdUtil.fastSimpleUUID(); - - // TODO @haohao:不直接操作,通过 Service 哈 @Resource - private IotPluginInfoMapper pluginInfoMapper; + @Lazy // 延迟加载,避免循环依赖 + private IotPluginInfoService pluginInfoService; @Resource private IotPluginInstanceMapper pluginInstanceMapper; @@ -59,8 +57,60 @@ public class IotPluginInstanceServiceImpl implements IotPluginInstanceService { @Value("${pf4j.pluginsDir}") private String pluginsDir; - @Value("${server.port:48080}") - private int port; + + @Override + public void heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) { + // 情况一:已存在,则进行更新 + IotPluginInstanceDO instance = pluginInstanceMapper.selectByProcessId(heartbeatReqDTO.getProcessId()); + if (instance != null) { + IotPluginInstanceDO.IotPluginInstanceDOBuilder updateObj = IotPluginInstanceDO.builder().id(instance.getId()) + .hostIp(heartbeatReqDTO.getHostIp()).downstreamPort(heartbeatReqDTO.getDownstreamPort()) + .online(heartbeatReqDTO.getOnline()).heartbeatTime(LocalDateTime.now()); + if (Boolean.TRUE.equals(heartbeatReqDTO.getOnline())) { + if (Boolean.FALSE.equals(instance.getOnline())) { // 当前处于离线时,才需要更新上线时间 + updateObj.onlineTime(LocalDateTime.now()); + } + } else { + updateObj.offlineTime(LocalDateTime.now()); + } + pluginInstanceMapper.updateById(updateObj.build()); + return; + } + + // 情况二:不存在,则创建 + IotPluginInfoDO info = pluginInfoService.getPluginInfoByPluginKey(heartbeatReqDTO.getPluginKey()); + if (info == null) { + log.error("[heartbeatPluginInstance][心跳({}) 对应的插件不存在]", heartbeatReqDTO); + return; + } + IotPluginInstanceDO.IotPluginInstanceDOBuilder insertObj = IotPluginInstanceDO.builder() + .pluginId(info.getId()).processId(heartbeatReqDTO.getProcessId()) + .hostIp(heartbeatReqDTO.getHostIp()).downstreamPort(heartbeatReqDTO.getDownstreamPort()) + .online(heartbeatReqDTO.getOnline()).heartbeatTime(LocalDateTime.now()); + if (Boolean.TRUE.equals(heartbeatReqDTO.getOnline())) { + insertObj.onlineTime(LocalDateTime.now()); + } else { + insertObj.offlineTime(LocalDateTime.now()); + } + pluginInstanceMapper.insert(insertObj.build()); + } + + @Override + public int offlineTimeoutPluginInstance(LocalDateTime maxHeartbeatTime) { + List list = pluginInstanceMapper.selectListByHeartbeatTimeLt(maxHeartbeatTime); + if (CollUtil.isEmpty(list)) { + return 0; + } + + // 更新插件实例为离线 + int count = 0; + for (IotPluginInstanceDO instance : list) { + pluginInstanceMapper.updateById(IotPluginInstanceDO.builder().id(instance.getId()) + .online(false).offlineTime(LocalDateTime.now()).build()); + count++; + } + return count; + } @Override public void stopAndUnloadPlugin(String pluginKey) { @@ -146,57 +196,6 @@ public class IotPluginInstanceServiceImpl implements IotPluginInstanceService { } } - @Override - public void reportPluginInstances() { - // 1.1 获取 pf4j 插件列表 - List plugins = pluginManager.getPlugins(); - - // 1.2 获取插件信息列表并转换为 Map 以便快速查找 - List pluginInfos = pluginInfoMapper.selectList(); - Map pluginInfoMap = pluginInfos.stream() - .collect(Collectors.toMap(IotPluginInfoDO::getPluginKey, Function.identity())); - - // 1.3 获取本机 IP 和 MAC 地址,mac@uuid - String ip = NetUtil.getLocalhostStr(); - String mac = NetUtil.getLocalMacAddress(); - String mainId = mac + "@" + MAIN_ID; - - // 2. 遍历插件列表,并保存为插件实例 - for (PluginWrapper plugin : plugins) { - String pluginKey = plugin.getPluginId(); - - // 2.1 查找插件信息 - IotPluginInfoDO pluginInfo = pluginInfoMap.get(pluginKey); - if (pluginInfo == null) { - log.error("插件信息不存在,pluginKey = {}", pluginKey); - continue; - } - - // 2.2 情况一:如果插件实例不存在,则创建 - IotPluginInstanceDO pluginInstance = pluginInstanceMapper.selectByMainIdAndPluginId(mainId, - pluginInfo.getId()); - // TODO @芋艿:稍后优化; -// if (pluginInstance == null) { -// // 4.4 如果插件实例不存在,则创建 -// pluginInstance = PluginInstanceDO.builder() -// .pluginId(pluginInfo.getId()) -// .mainId(MAIN_ID + "-" + mac) -// .hostIp(ip) -// .port(port) -// .heartbeatAt(System.currentTimeMillis()) -// .build(); -// pluginInstanceMapper.insert(pluginInstance); -// } else { -// // 2.2 情况二:如果存在,则更新 heartbeatAt -// PluginInstanceDO updatePluginInstance = PluginInstanceDO.builder() -// .id(pluginInstance.getId()) -// .heartbeatAt(System.currentTimeMillis()) -// .build(); -// pluginInstanceMapper.updateById(updatePluginInstance); -// } - } - } - // ========== 设备与插件的映射操作 ========== @Override diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/config/IotPluginCommonAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/config/IotPluginCommonAutoConfiguration.java index 9a763557b1..0e5c73b092 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/config/IotPluginCommonAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/config/IotPluginCommonAutoConfiguration.java @@ -3,11 +3,13 @@ package cn.iocoder.yudao.module.iot.plugin.common.config; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler; import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer; +import cn.iocoder.yudao.module.iot.plugin.common.heartbeta.IotPluginInstanceHeartbeatJob; import cn.iocoder.yudao.module.iot.plugin.common.upstream.IotDeviceUpstreamClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.web.client.RestTemplate; import java.time.Duration; @@ -18,6 +20,7 @@ import java.time.Duration; * @author haohao */ @AutoConfiguration +@EnableScheduling // 开启定时任务,因为 IotPluginInstanceHeartbeatJob 是一个定时任务 public class IotPluginCommonAutoConfiguration { // TODO @haohao:这个要不搞个配置类哈 @@ -55,4 +58,10 @@ public class IotPluginCommonAutoConfiguration { return new IotDeviceDownstreamServer(deviceDownstreamHandler); } + @Bean(initMethod = "init", destroyMethod = "stop") + public IotPluginInstanceHeartbeatJob pluginInstanceHeartbeatJob( + IotDeviceUpstreamApi deviceDataApi, IotDeviceDownstreamServer deviceDownstreamServer) { + return new IotPluginInstanceHeartbeatJob(deviceDataApi, deviceDownstreamServer); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/heartbeta/IotPluginInstanceHeartbeatJob.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/heartbeta/IotPluginInstanceHeartbeatJob.java new file mode 100644 index 0000000000..fe0244bc94 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/heartbeta/IotPluginInstanceHeartbeatJob.java @@ -0,0 +1,49 @@ +package cn.iocoder.yudao.module.iot.plugin.common.heartbeta; + +import cn.hutool.system.SystemUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; +import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer; +import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils; +import lombok.RequiredArgsConstructor; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.concurrent.TimeUnit; + +/** + * IoT 插件实例心跳 Job + * + * 用于定时发送心跳给服务端 + */ +@RequiredArgsConstructor +public class IotPluginInstanceHeartbeatJob { + + private final IotDeviceUpstreamApi deviceUpstreamApi; + private final IotDeviceDownstreamServer deviceDownstreamServer; + + public void init() { + CommonResult result = deviceUpstreamApi.heartbeatPluginInstance(buildPluginInstanceHeartbeatReqDTO(true)); + // TODO @芋艿:结果的处理 + } + + public void stop() { + CommonResult result = deviceUpstreamApi.heartbeatPluginInstance(buildPluginInstanceHeartbeatReqDTO(false)); + // TODO @芋艿:结果的处理 + } + + @Scheduled(initialDelay = 3, fixedRate = 3, timeUnit = TimeUnit.MINUTES) // 3 分钟执行一次 + public void execute() { + CommonResult result = deviceUpstreamApi.heartbeatPluginInstance(buildPluginInstanceHeartbeatReqDTO(true)); + // TODO @芋艿:结果的处理 + } + + private IotPluginInstanceHeartbeatReqDTO buildPluginInstanceHeartbeatReqDTO(Boolean online) { + // TODO @芋艿:pluginKey 的获取??? + return new IotPluginInstanceHeartbeatReqDTO() + .setPluginKey("yudao-module-iot-plugin-http").setProcessId(IotPluginCommonUtils.getProcessId()) + .setHostIp(SystemUtil.getHostInfo().getAddress()).setDownstreamPort(deviceDownstreamServer.getPort()) + .setOnline(online); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/upstream/IotDeviceUpstreamClient.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/upstream/IotDeviceUpstreamClient.java index a42fdaed89..8310bad5a9 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/upstream/IotDeviceUpstreamClient.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/upstream/IotDeviceUpstreamClient.java @@ -5,6 +5,7 @@ import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.web.client.RestTemplate; @@ -51,6 +52,12 @@ public class IotDeviceUpstreamClient implements IotDeviceUpstreamApi { return doPost(url, reportReqDTO, "reportDevicePropertyData"); } + @Override + public CommonResult heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) { + String url = deviceDataUrl + URL_PREFIX + "/heartbeat-plugin-instance"; + return doPost(url, heartbeatReqDTO, "heartbeatPluginInstance"); + } + // TODO @haohao:未来可能有 get 类型哈 /** * 将与远程服务交互的通用逻辑抽取成一个私有方法 diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml index aa1b91f0be..523d16deac 100644 --- a/yudao-server/src/main/resources/application.yaml +++ b/yudao-server/src/main/resources/application.yaml @@ -284,6 +284,8 @@ yudao: - infra_job - infra_job_log - infra_job_log + - iot_plugin_info + - iot_plugin_instance - infra_data_source_config - jimu_dict - jimu_dict_item