【代码新增】IoT:增加 plugin 插件的心跳机制,以及 Job 超时离线

This commit is contained in:
YunaiV 2025-01-30 23:36:18 +08:00
parent e650e75271
commit 7670ac19e5
16 changed files with 279 additions and 105 deletions

View File

@ -4,6 +4,7 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.enums.ApiConstants;
import jakarta.validation.Valid;
import org.springframework.web.bind.annotation.PostMapping;
@ -20,6 +21,8 @@ public interface IotDeviceUpstreamApi {
String PREFIX = ApiConstants.PREFIX + "/device/upstream";
// ========== 设备相关 ==========
/**
* 更新设备状态
*
@ -44,4 +47,14 @@ public interface IotDeviceUpstreamApi {
@PostMapping(PREFIX + "/report-event")
CommonResult<Boolean> reportDeviceEvent(@Valid @RequestBody IotDeviceEventReportReqDTO reportReqDTO);
// ========== 插件相关 ==========
/**
* 心跳插件实例
*
* @param heartbeatReqDTO 心跳插件实例 DTO
*/
@PostMapping(PREFIX + "/heartbeat-plugin-instance")
CommonResult<Boolean> heartbeatPluginInstance(@Valid @RequestBody IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO);
}

View File

@ -0,0 +1,44 @@
package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* IoT 插件实例心跳 Request DTO
*
* @author 芋道源码
*/
@Data
public class IotPluginInstanceHeartbeatReqDTO {
/**
* 请求编号
*/
@NotEmpty(message = "请求编号不能为空")
private String processId;
/**
* 插件包标识符
*/
@NotEmpty(message = "插件包标识符不能为空")
private String pluginKey;
/**
* 插件实例所在 IP
*/
@NotEmpty(message = "插件实例所在 IP 不能为空")
private String hostIp;
/**
* 插件实例的进程编号
*/
@NotNull(message = "插件实例的进程编号不能为空")
private Integer downstreamPort;
/**
* 是否在线
*/
@NotNull(message = "是否在线不能为空")
private Boolean online;
}

View File

@ -4,7 +4,9 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceUpstreamService;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
@ -21,6 +23,10 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi {
@Resource
private IotDeviceUpstreamService deviceUpstreamService;
@Resource
private IotPluginInstanceService pluginInstanceService;
// ========== 设备相关 ==========
@Override
public CommonResult<Boolean> updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) {
@ -40,4 +46,12 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi {
return success(true);
}
// ========== 插件相关 ==========
@Override
public CommonResult<Boolean> heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
pluginInstanceService.heartbeatPluginInstance(heartbeatReqDTO);
return success(true);
}
}

View File

@ -67,6 +67,6 @@ public class IotPluginInstanceDO extends BaseDO {
*
* 目的心路时间超过一定时间后会被进行下线处理
*/
private Long heartbeatTime;
private LocalDateTime heartbeatTime;
}

View File

@ -25,4 +25,8 @@ public interface IotPluginInfoMapper extends BaseMapperX<IotPluginInfoDO> {
.orderByAsc(IotPluginInfoDO::getId));
}
default IotPluginInfoDO selectByPluginKey(String pluginKey) {
return selectOne(IotPluginInfoDO::getPluginKey, pluginKey);
}
}

View File

@ -1,18 +1,23 @@
package cn.iocoder.yudao.module.iot.dal.mysql.plugin;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.apache.ibatis.annotations.Mapper;
import java.time.LocalDateTime;
import java.util.List;
@Mapper
public interface IotPluginInstanceMapper extends BaseMapperX<IotPluginInstanceDO> {
// TODO @芋艿方法名重构
default IotPluginInstanceDO selectByMainIdAndPluginId(String mainId, Long pluginId) {
return selectOne(new LambdaQueryWrapperX<IotPluginInstanceDO>()
.eq(IotPluginInstanceDO::getProcessId, mainId)
.eq(IotPluginInstanceDO::getPluginId, pluginId));
default IotPluginInstanceDO selectByProcessId(String processId) {
return selectOne(IotPluginInstanceDO::getProcessId, processId);
}
default List<IotPluginInstanceDO> selectListByHeartbeatTimeLt(LocalDateTime heartbeatTime) {
return selectList(new LambdaQueryWrapper<IotPluginInstanceDO>()
.lt(IotPluginInstanceDO::getHeartbeatTime, heartbeatTime));
}
}

View File

@ -0,0 +1,37 @@
package cn.iocoder.yudao.module.iot.job.plugin;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* IoT 插件实例离线检查 Job
*
* @author 芋道源码
*/
@Component
public class IotPluginInstancesJob implements JobHandler {
/**
* 插件离线超时时间
*
* TODO 芋艿暂定 10 分钟后续看要不要做配置
*/
public static final Duration OFFLINE_TIMEOUT = Duration.ofMinutes(10);
@Resource
private IotPluginInstanceService pluginInstanceService;
@Override
public String execute(String param) {
int count = pluginInstanceService.offlineTimeoutPluginInstance(
LocalDateTime.now().minus(OFFLINE_TIMEOUT));
return StrUtil.format("离线超时插件实例数量为: {}", count);
}
}

View File

@ -1,30 +0,0 @@
package cn.iocoder.yudao.module.iot.job.plugin;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
// TODO 芋艿后续再看看
/**
* 插件实例 Job
*
* @author 芋道源码
*/
@Component
public class PluginInstancesJob {
@Resource
private IotPluginInstanceService pluginInstanceService;
@Scheduled(initialDelay = 60, fixedRate = 60, timeUnit = TimeUnit.SECONDS)
public void updatePluginInstances() {
TenantUtils.executeIgnore(() -> {
pluginInstanceService.reportPluginInstances();
});
}
}

View File

@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoSav
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import org.springframework.web.multipart.MultipartFile;
import java.util.List;
@ -85,4 +86,7 @@ public interface IotPluginInfoService {
* @return 插件信息列表
*/
List<IotPluginInfoDO> getPluginInfoListByStatus(Integer status);
IotPluginInfoDO getPluginInfoByPluginKey(@NotEmpty(message = "插件包标识符不能为空") String pluginKey);
}

View File

@ -41,6 +41,7 @@ public class IotPluginInfoServiceImpl implements IotPluginInfoService {
@Override
public Long createPluginInfo(PluginInfoSaveReqVO createReqVO) {
// TODO @haohaopluginKey 唯一值
IotPluginInfoDO pluginInfo = BeanUtils.toBean(createReqVO, IotPluginInfoDO.class);
pluginInfoMapper.insert(pluginInfo);
return pluginInfo.getId();
@ -155,4 +156,9 @@ public class IotPluginInfoServiceImpl implements IotPluginInfoService {
return pluginInfoMapper.selectListByStatus(status);
}
@Override
public IotPluginInfoDO getPluginInfoByPluginKey(String pluginKey) {
return pluginInfoMapper.selectByPluginKey(pluginKey);
}
}

View File

@ -1,8 +1,11 @@
package cn.iocoder.yudao.module.iot.service.plugin;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import org.springframework.web.multipart.MultipartFile;
import java.time.LocalDateTime;
/**
* IoT 插件实例 Service 接口
*
@ -10,11 +13,19 @@ import org.springframework.web.multipart.MultipartFile;
*/
public interface IotPluginInstanceService {
// TODO @芋艿这个是否应该放到 plugin 主动心跳而是 server 自己心跳
/**
* 上报插件实例心跳
* 心跳插件实例
*
* @param heartbeatReqDTO 心跳插件实例 DTO
*/
void reportPluginInstances();
void heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO);
/**
* 离线超时插件实例
*
* @param maxHeartbeatTime 最大心跳时间
*/
int offlineTimeoutPluginInstance(LocalDateTime maxHeartbeatTime);
/**
* 停止并卸载插件

View File

@ -1,11 +1,10 @@
package cn.iocoder.yudao.module.iot.service.plugin;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.net.NetUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInfoMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInstanceMapper;
import cn.iocoder.yudao.module.iot.dal.redis.plugin.DevicePluginProcessIdRedisDAO;
import cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants;
@ -16,18 +15,20 @@ import org.pf4j.PluginState;
import org.pf4j.PluginWrapper;
import org.pf4j.spring.SpringPluginManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
@ -41,12 +42,9 @@ import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionU
@Slf4j
public class IotPluginInstanceServiceImpl implements IotPluginInstanceService {
// TODO @haohaomac@uuid
public static final String MAIN_ID = IdUtil.fastSimpleUUID();
// TODO @haohao不直接操作通过 Service
@Resource
private IotPluginInfoMapper pluginInfoMapper;
@Lazy // 延迟加载避免循环依赖
private IotPluginInfoService pluginInfoService;
@Resource
private IotPluginInstanceMapper pluginInstanceMapper;
@ -59,8 +57,60 @@ public class IotPluginInstanceServiceImpl implements IotPluginInstanceService {
@Value("${pf4j.pluginsDir}")
private String pluginsDir;
@Value("${server.port:48080}")
private int port;
@Override
public void heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
// 情况一已存在则进行更新
IotPluginInstanceDO instance = pluginInstanceMapper.selectByProcessId(heartbeatReqDTO.getProcessId());
if (instance != null) {
IotPluginInstanceDO.IotPluginInstanceDOBuilder updateObj = IotPluginInstanceDO.builder().id(instance.getId())
.hostIp(heartbeatReqDTO.getHostIp()).downstreamPort(heartbeatReqDTO.getDownstreamPort())
.online(heartbeatReqDTO.getOnline()).heartbeatTime(LocalDateTime.now());
if (Boolean.TRUE.equals(heartbeatReqDTO.getOnline())) {
if (Boolean.FALSE.equals(instance.getOnline())) { // 当前处于离线时才需要更新上线时间
updateObj.onlineTime(LocalDateTime.now());
}
} else {
updateObj.offlineTime(LocalDateTime.now());
}
pluginInstanceMapper.updateById(updateObj.build());
return;
}
// 情况二不存在则创建
IotPluginInfoDO info = pluginInfoService.getPluginInfoByPluginKey(heartbeatReqDTO.getPluginKey());
if (info == null) {
log.error("[heartbeatPluginInstance][心跳({}) 对应的插件不存在]", heartbeatReqDTO);
return;
}
IotPluginInstanceDO.IotPluginInstanceDOBuilder insertObj = IotPluginInstanceDO.builder()
.pluginId(info.getId()).processId(heartbeatReqDTO.getProcessId())
.hostIp(heartbeatReqDTO.getHostIp()).downstreamPort(heartbeatReqDTO.getDownstreamPort())
.online(heartbeatReqDTO.getOnline()).heartbeatTime(LocalDateTime.now());
if (Boolean.TRUE.equals(heartbeatReqDTO.getOnline())) {
insertObj.onlineTime(LocalDateTime.now());
} else {
insertObj.offlineTime(LocalDateTime.now());
}
pluginInstanceMapper.insert(insertObj.build());
}
@Override
public int offlineTimeoutPluginInstance(LocalDateTime maxHeartbeatTime) {
List<IotPluginInstanceDO> list = pluginInstanceMapper.selectListByHeartbeatTimeLt(maxHeartbeatTime);
if (CollUtil.isEmpty(list)) {
return 0;
}
// 更新插件实例为离线
int count = 0;
for (IotPluginInstanceDO instance : list) {
pluginInstanceMapper.updateById(IotPluginInstanceDO.builder().id(instance.getId())
.online(false).offlineTime(LocalDateTime.now()).build());
count++;
}
return count;
}
@Override
public void stopAndUnloadPlugin(String pluginKey) {
@ -146,57 +196,6 @@ public class IotPluginInstanceServiceImpl implements IotPluginInstanceService {
}
}
@Override
public void reportPluginInstances() {
// 1.1 获取 pf4j 插件列表
List<PluginWrapper> plugins = pluginManager.getPlugins();
// 1.2 获取插件信息列表并转换为 Map 以便快速查找
List<IotPluginInfoDO> pluginInfos = pluginInfoMapper.selectList();
Map<String, IotPluginInfoDO> pluginInfoMap = pluginInfos.stream()
.collect(Collectors.toMap(IotPluginInfoDO::getPluginKey, Function.identity()));
// 1.3 获取本机 IP MAC 地址,mac@uuid
String ip = NetUtil.getLocalhostStr();
String mac = NetUtil.getLocalMacAddress();
String mainId = mac + "@" + MAIN_ID;
// 2. 遍历插件列表并保存为插件实例
for (PluginWrapper plugin : plugins) {
String pluginKey = plugin.getPluginId();
// 2.1 查找插件信息
IotPluginInfoDO pluginInfo = pluginInfoMap.get(pluginKey);
if (pluginInfo == null) {
log.error("插件信息不存在pluginKey = {}", pluginKey);
continue;
}
// 2.2 情况一如果插件实例不存在则创建
IotPluginInstanceDO pluginInstance = pluginInstanceMapper.selectByMainIdAndPluginId(mainId,
pluginInfo.getId());
// TODO @芋艿稍后优化
// if (pluginInstance == null) {
// // 4.4 如果插件实例不存在则创建
// pluginInstance = PluginInstanceDO.builder()
// .pluginId(pluginInfo.getId())
// .mainId(MAIN_ID + "-" + mac)
// .hostIp(ip)
// .port(port)
// .heartbeatAt(System.currentTimeMillis())
// .build();
// pluginInstanceMapper.insert(pluginInstance);
// } else {
// // 2.2 情况二如果存在则更新 heartbeatAt
// PluginInstanceDO updatePluginInstance = PluginInstanceDO.builder()
// .id(pluginInstance.getId())
// .heartbeatAt(System.currentTimeMillis())
// .build();
// pluginInstanceMapper.updateById(updatePluginInstance);
// }
}
}
// ========== 设备与插件的映射操作 ==========
@Override

View File

@ -3,11 +3,13 @@ package cn.iocoder.yudao.module.iot.plugin.common.config;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.plugin.common.heartbeta.IotPluginInstanceHeartbeatJob;
import cn.iocoder.yudao.module.iot.plugin.common.upstream.IotDeviceUpstreamClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.client.RestTemplate;
import java.time.Duration;
@ -18,6 +20,7 @@ import java.time.Duration;
* @author haohao
*/
@AutoConfiguration
@EnableScheduling // 开启定时任务因为 IotPluginInstanceHeartbeatJob 是一个定时任务
public class IotPluginCommonAutoConfiguration {
// TODO @haohao这个要不搞个配置类哈
@ -55,4 +58,10 @@ public class IotPluginCommonAutoConfiguration {
return new IotDeviceDownstreamServer(deviceDownstreamHandler);
}
@Bean(initMethod = "init", destroyMethod = "stop")
public IotPluginInstanceHeartbeatJob pluginInstanceHeartbeatJob(
IotDeviceUpstreamApi deviceDataApi, IotDeviceDownstreamServer deviceDownstreamServer) {
return new IotPluginInstanceHeartbeatJob(deviceDataApi, deviceDownstreamServer);
}
}

View File

@ -0,0 +1,49 @@
package cn.iocoder.yudao.module.iot.plugin.common.heartbeta;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.concurrent.TimeUnit;
/**
* IoT 插件实例心跳 Job
*
* 用于定时发送心跳给服务端
*/
@RequiredArgsConstructor
public class IotPluginInstanceHeartbeatJob {
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final IotDeviceDownstreamServer deviceDownstreamServer;
public void init() {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(buildPluginInstanceHeartbeatReqDTO(true));
// TODO @芋艿结果的处理
}
public void stop() {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(buildPluginInstanceHeartbeatReqDTO(false));
// TODO @芋艿结果的处理
}
@Scheduled(initialDelay = 3, fixedRate = 3, timeUnit = TimeUnit.MINUTES) // 3 分钟执行一次
public void execute() {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(buildPluginInstanceHeartbeatReqDTO(true));
// TODO @芋艿结果的处理
}
private IotPluginInstanceHeartbeatReqDTO buildPluginInstanceHeartbeatReqDTO(Boolean online) {
// TODO @芋艿pluginKey 的获取
return new IotPluginInstanceHeartbeatReqDTO()
.setPluginKey("yudao-module-iot-plugin-http").setProcessId(IotPluginCommonUtils.getProcessId())
.setHostIp(SystemUtil.getHostInfo().getAddress()).setDownstreamPort(deviceDownstreamServer.getPort())
.setOnline(online);
}
}

View File

@ -5,6 +5,7 @@ import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.client.RestTemplate;
@ -51,6 +52,12 @@ public class IotDeviceUpstreamClient implements IotDeviceUpstreamApi {
return doPost(url, reportReqDTO, "reportDevicePropertyData");
}
@Override
public CommonResult<Boolean> heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
String url = deviceDataUrl + URL_PREFIX + "/heartbeat-plugin-instance";
return doPost(url, heartbeatReqDTO, "heartbeatPluginInstance");
}
// TODO @haohao未来可能有 get 类型哈
/**
* 将与远程服务交互的通用逻辑抽取成一个私有方法

View File

@ -284,6 +284,8 @@ yudao:
- infra_job
- infra_job_log
- infra_job_log
- iot_plugin_info
- iot_plugin_instance
- infra_data_source_config
- jimu_dict
- jimu_dict_item