【代码优化】IoT:设备上行时,增加 processId,用于设备与插件的映射

This commit is contained in:
YunaiV 2025-01-30 21:06:47 +08:00
parent 30ae986c1a
commit e650e75271
25 changed files with 277 additions and 190 deletions

View File

@ -21,9 +21,9 @@ public abstract class IotDeviceUpstreamAbstractReqDTO {
private String requestId;
/**
* 插件标识
* 插件实例的进程编号
*/
private String pluginKey;
private String processId;
/**
* 产品标识

View File

@ -7,8 +7,8 @@ import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoImp
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoRespVO;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.PluginInfoDO;
import cn.iocoder.yudao.module.iot.service.plugin.PluginInfoService;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInfoService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@ -27,7 +27,7 @@ import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
public class PluginInfoController {
@Resource
private PluginInfoService pluginInfoService;
private IotPluginInfoService pluginInfoService;
@PostMapping("/create")
@Operation(summary = "创建插件信息")
@ -58,7 +58,7 @@ public class PluginInfoController {
@Parameter(name = "id", description = "编号", required = true, example = "1024")
@PreAuthorize("@ss.hasPermission('iot:plugin-info:query')")
public CommonResult<PluginInfoRespVO> getPluginInfo(@RequestParam("id") Long id) {
PluginInfoDO pluginInfo = pluginInfoService.getPluginInfo(id);
IotPluginInfoDO pluginInfo = pluginInfoService.getPluginInfo(id);
return success(BeanUtils.toBean(pluginInfo, PluginInfoRespVO.class));
}
@ -66,7 +66,7 @@ public class PluginInfoController {
@Operation(summary = "获得插件信息分页")
@PreAuthorize("@ss.hasPermission('iot:plugin-info:query')")
public CommonResult<PageResult<PluginInfoRespVO>> getPluginInfoPage(@Valid PluginInfoPageReqVO pageReqVO) {
PageResult<PluginInfoDO> pageResult = pluginInfoService.getPluginInfoPage(pageReqVO);
PageResult<IotPluginInfoDO> pageResult = pluginInfoService.getPluginInfoPage(pageReqVO);
return success(BeanUtils.toBean(pageResult, PluginInfoRespVO.class));
}

View File

@ -22,7 +22,7 @@ import lombok.*;
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PluginInfoDO extends BaseDO {
public class IotPluginInfoDO extends BaseDO {
/**
* 主键 ID

View File

@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.*;
import java.time.LocalDateTime;
/**
* IoT 插件实例 DO
@ -20,40 +21,52 @@ import lombok.*;
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PluginInstanceDO extends BaseDO {
public class IotPluginInstanceDO extends BaseDO {
/**
* 主键ID
* 主键
*/
@TableId
private Long id;
// TODO @芋艿找一个更好的字段名
/**
* 插件主程序 ID
*/
private String mainId;
/**
* 插件 ID
* 插件编号
* <p>
* 关联 {@link PluginInfoDO#getId()}
* 关联 {@link IotPluginInfoDO#getId()}
*/
private Long pluginId;
/**
* 插件进程编号
*
* 一般格式是hostIp@processId@${uuid}
*/
private String processId;
/**
* 插件主程序所在 IP
* 插件实例所在 IP
*/
private String ip;
// TODO @芋艿这个 port 是否有必要记录
private String hostIp;
/**
* 插件主程序端口
* 设备下行端口
*/
private Integer port;
// TODO @芋艿downstreamPort 增加目的用于下行
private Integer downstreamPort;
// TODO @haohao字段改成 heartbeatTimeLocalDateTime
/**
* 心跳时间心路时间超过 30 秒需要剔除
* 是否在线
*/
private Long heartbeatAt;
private Boolean online;
/**
* 在线时间
*/
private LocalDateTime onlineTime;
/**
* 离线时间
*/
private LocalDateTime offlineTime;
/**
* 心跳时间
*
* 目的心路时间超过一定时间后会被进行下线处理
*/
private Long heartbeatTime;
}

View File

@ -0,0 +1,28 @@
package cn.iocoder.yudao.module.iot.dal.mysql.plugin;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoPageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface IotPluginInfoMapper extends BaseMapperX<IotPluginInfoDO> {
default PageResult<IotPluginInfoDO> selectPage(PluginInfoPageReqVO reqVO) {
return selectPage(reqVO, new LambdaQueryWrapperX<IotPluginInfoDO>()
.likeIfPresent(IotPluginInfoDO::getName, reqVO.getName())
.eqIfPresent(IotPluginInfoDO::getStatus, reqVO.getStatus())
.orderByDesc(IotPluginInfoDO::getId));
}
default List<IotPluginInfoDO> selectListByStatus(Integer status) {
return selectList(new LambdaQueryWrapperX<IotPluginInfoDO>()
.eq(IotPluginInfoDO::getStatus, status)
.orderByAsc(IotPluginInfoDO::getId));
}
}

View File

@ -0,0 +1,18 @@
package cn.iocoder.yudao.module.iot.dal.mysql.plugin;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface IotPluginInstanceMapper extends BaseMapperX<IotPluginInstanceDO> {
// TODO @芋艿方法名重构
default IotPluginInstanceDO selectByMainIdAndPluginId(String mainId, Long pluginId) {
return selectOne(new LambdaQueryWrapperX<IotPluginInstanceDO>()
.eq(IotPluginInstanceDO::getProcessId, mainId)
.eq(IotPluginInstanceDO::getPluginId, pluginId));
}
}

View File

@ -1,34 +0,0 @@
package cn.iocoder.yudao.module.iot.dal.mysql.plugin;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoPageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.PluginInfoDO;
import java.util.List;
import org.apache.ibatis.annotations.Mapper;
/**
* IoT 插件信息 Mapper
*
* @author 芋道源码
*/
@Mapper
public interface PluginInfoMapper extends BaseMapperX<PluginInfoDO> {
default PageResult<PluginInfoDO> selectPage(PluginInfoPageReqVO reqVO) {
return selectPage(reqVO, new LambdaQueryWrapperX<PluginInfoDO>()
.likeIfPresent(PluginInfoDO::getName, reqVO.getName())
.eqIfPresent(PluginInfoDO::getStatus, reqVO.getStatus())
.orderByDesc(PluginInfoDO::getId));
}
default List<PluginInfoDO> selectListByStatus(Integer status) {
return selectList(new LambdaQueryWrapperX<PluginInfoDO>()
.eq(PluginInfoDO::getStatus, status)
.orderByAsc(PluginInfoDO::getId));
}
}

View File

@ -1,36 +0,0 @@
package cn.iocoder.yudao.module.iot.dal.mysql.plugin;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.instance.PluginInstancePageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.PluginInstanceDO;
import org.apache.ibatis.annotations.Mapper;
/**
* IoT 插件实例 Mapper
*
* @author 芋道源码
*/
@Mapper
public interface PluginInstanceMapper extends BaseMapperX<PluginInstanceDO> {
default PluginInstanceDO selectByMainIdAndPluginId(String mainId, Long pluginId) {
return selectOne(new LambdaQueryWrapperX<PluginInstanceDO>()
.eq(PluginInstanceDO::getMainId, mainId)
.eq(PluginInstanceDO::getPluginId, pluginId));
}
// TODO @haohao这个还需要么相关不用的 VO 可以删除
default PageResult<PluginInstanceDO> selectPage(PluginInstancePageReqVO reqVO) {
return selectPage(reqVO, new LambdaQueryWrapperX<PluginInstanceDO>()
.eqIfPresent(PluginInstanceDO::getMainId, reqVO.getMainId())
.eqIfPresent(PluginInstanceDO::getPluginId, reqVO.getPluginId())
.eqIfPresent(PluginInstanceDO::getIp, reqVO.getIp())
.eqIfPresent(PluginInstanceDO::getPort, reqVO.getPort())
.eqIfPresent(PluginInstanceDO::getHeartbeatAt, reqVO.getHeartbeatAt())
.betweenIfPresent(PluginInstanceDO::getCreateTime, reqVO.getCreateTime())
.orderByDesc(PluginInstanceDO::getId));
}
}

View File

@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.dal.redis;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
/**
* Iot Redis Key 枚举类
@ -42,4 +43,13 @@ public interface RedisKeyConstants {
*/
String THING_MODEL_LIST = "thing_model_list";
/**
* 设备插件的插件进程编号的映射采用 HASH 结构
*
* KEY 格式device_plugin_instance_process_ids
* HASH KEY${deviceKey}
* VALUE插件进程编号对应 {@link IotPluginInstanceDO#getProcessId()} 字段
*/
String DEVICE_PLUGIN_INSTANCE_PROCESS_IDS = "device_plugin_instance_process_ids";
}

View File

@ -33,7 +33,7 @@ public class DevicePropertyRedisDAO {
entry -> JsonUtils.parseObject((String) entry.getValue(), IotDevicePropertyDO.class));
}
public void set(String deviceKey, Map<String, IotDevicePropertyDO> properties) {
public void putAll(String deviceKey, Map<String, IotDevicePropertyDO> properties) {
if (CollUtil.isEmpty(properties)) {
return;
}
@ -43,11 +43,6 @@ public class DevicePropertyRedisDAO {
entry -> JsonUtils.toJsonString(entry.getValue())));
}
public void delete(String deviceKey) {
String redisKey = formatKey(deviceKey);
stringRedisTemplate.delete(redisKey);
}
private static String formatKey(String deviceKey) {
return String.format(DEVICE_PROPERTY, deviceKey);
}

View File

@ -0,0 +1,25 @@
package cn.iocoder.yudao.module.iot.dal.redis.plugin;
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
/**
* 设备插件的插件进程编号的缓存的 Redis DAO
*/
@Repository
public class DevicePluginProcessIdRedisDAO {
@Resource
private StringRedisTemplate stringRedisTemplate;
public void put(String deviceKey, String processId) {
stringRedisTemplate.opsForHash().put(RedisKeyConstants.DEVICE_PLUGIN_INSTANCE_PROCESS_IDS, deviceKey, processId);
}
public String get(String deviceKey) {
return (String) stringRedisTemplate.opsForHash().get(RedisKeyConstants.DEVICE_PLUGIN_INSTANCE_PROCESS_IDS, deviceKey);
}
}

View File

@ -2,9 +2,9 @@ package cn.iocoder.yudao.module.iot.framework.plugin.core;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.PluginInfoDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import cn.iocoder.yudao.module.iot.service.plugin.PluginInfoService;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInfoService;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.spring.SpringPluginManager;
import org.springframework.boot.ApplicationArguments;
@ -20,7 +20,7 @@ import java.util.List;
public class PluginStart implements ApplicationRunner {
@Resource
private PluginInfoService pluginInfoService;
private IotPluginInfoService pluginInfoService;
@Resource
private SpringPluginManager pluginManager;
@ -28,7 +28,7 @@ public class PluginStart implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) {
TenantUtils.executeIgnore(() -> { // 1. 忽略租户上下文执行
List<PluginInfoDO> pluginInfoList = pluginInfoService
List<IotPluginInfoDO> pluginInfoList = pluginInfoService
.getPluginInfoListByStatus(IotPluginStatusEnum.RUNNING.getStatus()); // 2. 获取运行中的插件列表
if (CollUtil.isEmpty(pluginInfoList)) { // 3. 检查插件列表是否为空
log.info("[run] 没有需要启动的插件"); // 4. 日志记录没有插件需要启动

View File

@ -1,7 +1,7 @@
package cn.iocoder.yudao.module.iot.job.plugin;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.service.plugin.PluginInstanceService;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit;
public class PluginInstancesJob {
@Resource
private PluginInstanceService pluginInstanceService;
private IotPluginInstanceService pluginInstanceService;
@Scheduled(initialDelay = 60, fixedRate = 60, timeUnit = TimeUnit.SECONDS)
public void updatePluginInstances() {

View File

@ -20,6 +20,7 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.mq.producer.device.IotDeviceProducer;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -43,6 +44,8 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
private IotDeviceService deviceService;
@Resource
private IotDevicePropertyService devicePropertyService;
@Resource
private IotPluginInstanceService pluginInstanceService;
@Resource
private IotDeviceProducer deviceProducer;
@ -144,10 +147,11 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
}
private void updateDeviceLastTime(IotDeviceDO device, IotDeviceUpstreamAbstractReqDTO reqDTO) {
// 1. TODO 芋艿插件状态
// 1. 异步记录设备与插件实例的映射
pluginInstanceService.updateDevicePluginInstanceProcessIdAsync(device.getDeviceKey(), reqDTO.getProcessId());
// 2. 更新设备的最后时间
devicePropertyService.updateDeviceReportTime(device.getDeviceKey(), LocalDateTime.now());
// 2. 异步更新设备的最后时间
devicePropertyService.updateDeviceReportTimeAsync(device.getDeviceKey(), LocalDateTime.now());
}
private void sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device) {

View File

@ -61,11 +61,11 @@ public interface IotDevicePropertyService {
Set<String> getDeviceKeysByReportTime(LocalDateTime maxReportTime);
/**
* 更新设备上报时间
* 异步更新设备上报时间
*
* @param deviceKey 设备标识
* @param reportTime 上报时间
*/
void updateDeviceReportTime(String deviceKey, LocalDateTime reportTime);
void updateDeviceReportTimeAsync(String deviceKey, LocalDateTime reportTime);
}

View File

@ -27,6 +27,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
@ -151,7 +152,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
LocalDateTimeUtil.toEpochMilli(message.getReportTime()));
// 3.2 保存设备属性日志
deviceDataRedisDAO.set(message.getDeviceKey(), convertMap(properties.entrySet(), Map.Entry::getKey,
deviceDataRedisDAO.putAll(message.getDeviceKey(), convertMap(properties.entrySet(), Map.Entry::getKey,
entry -> IotDevicePropertyDO.builder().value(entry.getValue()).updateTime(message.getReportTime()).build()));
}
@ -190,7 +191,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
}
@Override
public void updateDeviceReportTime(String deviceKey, LocalDateTime reportTime) {
@Async
public void updateDeviceReportTimeAsync(String deviceKey, LocalDateTime reportTime) {
deviceReportTimeRedisDAO.update(deviceKey, reportTime);
}

View File

@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.service.plugin;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.PluginInfoDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import jakarta.validation.Valid;
import org.springframework.web.multipart.MultipartFile;
@ -15,10 +15,10 @@ import java.util.List;
*
* @author 芋道源码
*/
public interface PluginInfoService {
public interface IotPluginInfoService {
/**
* 创建IoT 插件信息
* 创建插件信息
*
* @param createReqVO 创建信息
* @return 编号
@ -26,34 +26,34 @@ public interface PluginInfoService {
Long createPluginInfo(@Valid PluginInfoSaveReqVO createReqVO);
/**
* 更新IoT 插件信息
* 更新插件信息
*
* @param updateReqVO 更新信息
*/
void updatePluginInfo(@Valid PluginInfoSaveReqVO updateReqVO);
/**
* 删除IoT 插件信息
* 删除插件信息
*
* @param id 编号
*/
void deletePluginInfo(Long id);
/**
* 获得IoT 插件信息
* 获得插件信息
*
* @param id 编号
* @return IoT 插件信息
* @return 插件信息
*/
PluginInfoDO getPluginInfo(Long id);
IotPluginInfoDO getPluginInfo(Long id);
/**
* 获得IoT 插件信息分页
* 获得插件信息分页
*
* @param pageReqVO 分页查询
* @return IoT 插件信息分页
* @return 插件信息分页
*/
PageResult<PluginInfoDO> getPluginInfoPage(PluginInfoPageReqVO pageReqVO);
PageResult<IotPluginInfoDO> getPluginInfoPage(PluginInfoPageReqVO pageReqVO);
/**
* 上传插件的 JAR
@ -76,7 +76,7 @@ public interface PluginInfoService {
*
* @return 插件信息列表
*/
List<PluginInfoDO> getPluginInfoList();
List<IotPluginInfoDO> getPluginInfoList();
/**
* 根据状态获得插件信息列表
@ -84,5 +84,5 @@ public interface PluginInfoService {
* @param status 状态 {@link IotPluginStatusEnum}
* @return 插件信息列表
*/
List<PluginInfoDO> getPluginInfoListByStatus(Integer status);
List<IotPluginInfoDO> getPluginInfoListByStatus(Integer status);
}

View File

@ -4,8 +4,8 @@ import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.info.PluginInfoSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.PluginInfoDO;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.PluginInfoMapper;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInfoMapper;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@ -28,20 +28,20 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.PLUGIN_INFO_N
@Service
@Validated
@Slf4j
public class PluginInfoServiceImpl implements PluginInfoService {
public class IotPluginInfoServiceImpl implements IotPluginInfoService {
@Resource
private PluginInfoMapper pluginInfoMapper;
private IotPluginInfoMapper pluginInfoMapper;
@Resource
private PluginInstanceService pluginInstanceService;
private IotPluginInstanceService pluginInstanceService;
@Resource
private SpringPluginManager pluginManager;
@Override
public Long createPluginInfo(PluginInfoSaveReqVO createReqVO) {
PluginInfoDO pluginInfo = BeanUtils.toBean(createReqVO, PluginInfoDO.class);
IotPluginInfoDO pluginInfo = BeanUtils.toBean(createReqVO, IotPluginInfoDO.class);
pluginInfoMapper.insert(pluginInfo);
return pluginInfo.getId();
}
@ -51,14 +51,14 @@ public class PluginInfoServiceImpl implements PluginInfoService {
// 校验存在
validatePluginInfoExists(updateReqVO.getId());
// 更新
PluginInfoDO updateObj = BeanUtils.toBean(updateReqVO, PluginInfoDO.class);
IotPluginInfoDO updateObj = BeanUtils.toBean(updateReqVO, IotPluginInfoDO.class);
pluginInfoMapper.updateById(updateObj);
}
@Override
public void deletePluginInfo(Long id) {
// 1.1 校验存在
PluginInfoDO pluginInfoDO = validatePluginInfoExists(id);
IotPluginInfoDO pluginInfoDO = validatePluginInfoExists(id);
// 1.2 停止插件
if (IotPluginStatusEnum.RUNNING.getStatus().equals(pluginInfoDO.getStatus())) {
throw exception(PLUGIN_INFO_DELETE_FAILED_RUNNING);
@ -74,8 +74,8 @@ public class PluginInfoServiceImpl implements PluginInfoService {
pluginInfoMapper.deleteById(id);
}
private PluginInfoDO validatePluginInfoExists(Long id) {
PluginInfoDO pluginInfo = pluginInfoMapper.selectById(id);
private IotPluginInfoDO validatePluginInfoExists(Long id) {
IotPluginInfoDO pluginInfo = pluginInfoMapper.selectById(id);
if (pluginInfo == null) {
throw exception(PLUGIN_INFO_NOT_EXISTS);
}
@ -83,19 +83,19 @@ public class PluginInfoServiceImpl implements PluginInfoService {
}
@Override
public PluginInfoDO getPluginInfo(Long id) {
public IotPluginInfoDO getPluginInfo(Long id) {
return pluginInfoMapper.selectById(id);
}
@Override
public PageResult<PluginInfoDO> getPluginInfoPage(PluginInfoPageReqVO pageReqVO) {
public PageResult<IotPluginInfoDO> getPluginInfoPage(PluginInfoPageReqVO pageReqVO) {
return pluginInfoMapper.selectPage(pageReqVO);
}
@Override
public void uploadFile(Long id, MultipartFile file) {
// 1. 校验插件信息是否存在
PluginInfoDO pluginInfoDo = validatePluginInfoExists(id);
IotPluginInfoDO pluginInfoDo = validatePluginInfoExists(id);
// 2. 停止并卸载旧的插件
pluginInstanceService.stopAndUnloadPlugin(pluginInfoDo.getPluginKey());
@ -114,9 +114,9 @@ public class PluginInfoServiceImpl implements PluginInfoService {
* @param pluginKeyNew 插件标识符
* @param file 文件
*/
private void updatePluginInfo(PluginInfoDO pluginInfoDo, String pluginKeyNew, MultipartFile file) {
private void updatePluginInfo(IotPluginInfoDO pluginInfoDo, String pluginKeyNew, MultipartFile file) {
// 创建新的插件信息对象并链式设置属性
PluginInfoDO updatedPluginInfo = new PluginInfoDO()
IotPluginInfoDO updatedPluginInfo = new IotPluginInfoDO()
.setId(pluginInfoDo.getId())
.setPluginKey(pluginKeyNew)
.setStatus(IotPluginStatusEnum.STOPPED.getStatus())
@ -133,25 +133,25 @@ public class PluginInfoServiceImpl implements PluginInfoService {
@Override
public void updatePluginStatus(Long id, Integer status) {
// 1. 校验插件信息是否存在
PluginInfoDO pluginInfoDo = validatePluginInfoExists(id);
IotPluginInfoDO pluginInfoDo = validatePluginInfoExists(id);
// 2. 更新插件状态
pluginInstanceService.updatePluginStatus(pluginInfoDo, status);
// 3. 更新数据库中的插件状态
PluginInfoDO updatedPluginInfo = new PluginInfoDO();
IotPluginInfoDO updatedPluginInfo = new IotPluginInfoDO();
updatedPluginInfo.setId(id);
updatedPluginInfo.setStatus(status);
pluginInfoMapper.updateById(updatedPluginInfo);
}
@Override
public List<PluginInfoDO> getPluginInfoList() {
public List<IotPluginInfoDO> getPluginInfoList() {
return pluginInfoMapper.selectList();
}
@Override
public List<PluginInfoDO> getPluginInfoListByStatus(Integer status) {
public List<IotPluginInfoDO> getPluginInfoListByStatus(Integer status) {
return pluginInfoMapper.selectListByStatus(status);
}

View File

@ -1,6 +1,6 @@
package cn.iocoder.yudao.module.iot.service.plugin;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.PluginInfoDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import org.springframework.web.multipart.MultipartFile;
/**
@ -8,7 +8,7 @@ import org.springframework.web.multipart.MultipartFile;
*
* @author 芋道源码
*/
public interface PluginInstanceService {
public interface IotPluginInstanceService {
// TODO @芋艿这个是否应该放到 plugin 主动心跳而是 server 自己心跳
/**
@ -28,7 +28,7 @@ public interface PluginInstanceService {
*
* @param pluginInfoDo 插件信息
*/
void deletePluginFile(PluginInfoDO pluginInfoDo);
void deletePluginFile(IotPluginInfoDO pluginInfoDo);
/**
* 上传并加载新的插件文件
@ -44,6 +44,16 @@ public interface PluginInstanceService {
* @param pluginInfoDo 插件信息
* @param status 新状态
*/
void updatePluginStatus(PluginInfoDO pluginInfoDo, Integer status);
void updatePluginStatus(IotPluginInfoDO pluginInfoDo, Integer status);
// ========== 设备与插件的映射操作 ==========
/**
* 更新设备对应的插件实例的进程编号
*
* @param deviceKey 设备 Key
* @param processId 进程编号
*/
void updateDevicePluginInstanceProcessIdAsync(String deviceKey, String processId);
}

View File

@ -3,10 +3,11 @@ package cn.iocoder.yudao.module.iot.service.plugin;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.net.NetUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.PluginInfoDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.PluginInstanceDO;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.PluginInfoMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.PluginInstanceMapper;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInfoMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInstanceMapper;
import cn.iocoder.yudao.module.iot.dal.redis.plugin.DevicePluginProcessIdRedisDAO;
import cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import jakarta.annotation.Resource;
@ -38,16 +39,20 @@ import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionU
@Service
@Validated
@Slf4j
public class PluginInstanceServiceImpl implements PluginInstanceService {
public class IotPluginInstanceServiceImpl implements IotPluginInstanceService {
// TODO @haohaomac@uuid
public static final String MAIN_ID = IdUtil.fastSimpleUUID();
// TODO @haohao不直接操作通过 Service
@Resource
private PluginInfoMapper pluginInfoMapper;
private IotPluginInfoMapper pluginInfoMapper;
@Resource
private PluginInstanceMapper pluginInstanceMapper;
private IotPluginInstanceMapper pluginInstanceMapper;
@Resource
private DevicePluginProcessIdRedisDAO devicePluginProcessIdRedisDAO;
@Resource
private SpringPluginManager pluginManager;
@ -73,7 +78,7 @@ public class PluginInstanceServiceImpl implements PluginInstanceService {
}
@Override
public void deletePluginFile(PluginInfoDO pluginInfoDO) {
public void deletePluginFile(IotPluginInfoDO pluginInfoDO) {
File file = new File(pluginsDir, pluginInfoDO.getFileName());
if (!file.exists()) {
return;
@ -115,7 +120,7 @@ public class PluginInstanceServiceImpl implements PluginInstanceService {
}
@Override
public void updatePluginStatus(PluginInfoDO pluginInfoDo, Integer status) {
public void updatePluginStatus(IotPluginInfoDO pluginInfoDo, Integer status) {
String pluginKey = pluginInfoDo.getPluginKey();
PluginWrapper plugin = pluginManager.getPlugin(pluginKey);
@ -147,9 +152,9 @@ public class PluginInstanceServiceImpl implements PluginInstanceService {
List<PluginWrapper> plugins = pluginManager.getPlugins();
// 1.2 获取插件信息列表并转换为 Map 以便快速查找
List<PluginInfoDO> pluginInfos = pluginInfoMapper.selectList();
Map<String, PluginInfoDO> pluginInfoMap = pluginInfos.stream()
.collect(Collectors.toMap(PluginInfoDO::getPluginKey, Function.identity()));
List<IotPluginInfoDO> pluginInfos = pluginInfoMapper.selectList();
Map<String, IotPluginInfoDO> pluginInfoMap = pluginInfos.stream()
.collect(Collectors.toMap(IotPluginInfoDO::getPluginKey, Function.identity()));
// 1.3 获取本机 IP MAC 地址,mac@uuid
String ip = NetUtil.getLocalhostStr();
@ -161,34 +166,42 @@ public class PluginInstanceServiceImpl implements PluginInstanceService {
String pluginKey = plugin.getPluginId();
// 2.1 查找插件信息
PluginInfoDO pluginInfo = pluginInfoMap.get(pluginKey);
IotPluginInfoDO pluginInfo = pluginInfoMap.get(pluginKey);
if (pluginInfo == null) {
log.error("插件信息不存在pluginKey = {}", pluginKey);
continue;
}
// 2.2 情况一如果插件实例不存在则创建
PluginInstanceDO pluginInstance = pluginInstanceMapper.selectByMainIdAndPluginId(mainId,
IotPluginInstanceDO pluginInstance = pluginInstanceMapper.selectByMainIdAndPluginId(mainId,
pluginInfo.getId());
if (pluginInstance == null) {
// 4.4 如果插件实例不存在则创建
pluginInstance = PluginInstanceDO.builder()
.pluginId(pluginInfo.getId())
.mainId(MAIN_ID + "-" + mac)
.ip(ip)
.port(port)
.heartbeatAt(System.currentTimeMillis())
.build();
pluginInstanceMapper.insert(pluginInstance);
} else {
// 2.2 情况二如果存在则更新 heartbeatAt
PluginInstanceDO updatePluginInstance = PluginInstanceDO.builder()
.id(pluginInstance.getId())
.heartbeatAt(System.currentTimeMillis())
.build();
pluginInstanceMapper.updateById(updatePluginInstance);
}
// TODO @芋艿稍后优化
// if (pluginInstance == null) {
// // 4.4 如果插件实例不存在则创建
// pluginInstance = PluginInstanceDO.builder()
// .pluginId(pluginInfo.getId())
// .mainId(MAIN_ID + "-" + mac)
// .hostIp(ip)
// .port(port)
// .heartbeatAt(System.currentTimeMillis())
// .build();
// pluginInstanceMapper.insert(pluginInstance);
// } else {
// // 2.2 情况二如果存在则更新 heartbeatAt
// PluginInstanceDO updatePluginInstance = PluginInstanceDO.builder()
// .id(pluginInstance.getId())
// .heartbeatAt(System.currentTimeMillis())
// .build();
// pluginInstanceMapper.updateById(updatePluginInstance);
// }
}
}
// ========== 设备与插件的映射操作 ==========
@Override
public void updateDevicePluginInstanceProcessIdAsync(String deviceKey, String processId) {
devicePluginProcessIdRedisDAO.put(deviceKey, processId);
}
}

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.iocoder.yudao.module.iot.dal.mysql.plugin.PluginInstanceMapper">
<mapper namespace="cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInstanceMapper">
<!--
一般情况下,尽可能使用 Mapper 进行 CRUD 增删改查即可。

View File

@ -0,0 +1,31 @@
package cn.iocoder.yudao.module.iot.plugin.common.util;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
/**
* IoT 插件的通用工具类
*
* 芋道源码
*/
public class IotPluginCommonUtils {
/**
* 流程实例的进程编号
*/
private static String processId;
public static String getProcessId() {
if (StrUtil.isEmpty(processId)) {
initProcessId();
}
return processId;
}
private synchronized static void initProcessId() {
processId = String.format("%s@%d@%s", // IP@PID@${uuid}
SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID(), IdUtil.fastSimpleUUID());
}
}

View File

@ -128,6 +128,12 @@
<version>${revision}</version>
</dependency>
<!-- Web 相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 工具类相关 -->
<dependency>
<groupId>io.vertx</groupId>

View File

@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.plugin.http;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@ -10,10 +11,12 @@ import org.springframework.context.ConfigurableApplicationContext;
*/
@Slf4j
@SpringBootApplication
public class HttpPluginSpringbootApplication {
public class IotHttpPluginApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(HttpPluginSpringbootApplication.class, args);
SpringApplication application = new SpringApplication(IotHttpPluginApplication.class);
application.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = application.run(args);
// 手动获取 VertxService 并启动
// TODO @haohao可以放在 bean init 里么回复会和插件模式冲突 @芋艿测试下

View File

@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.RoutingContext;
@ -50,19 +51,17 @@ public class IotDevicePropertyReportVertxHandler implements Handler<RoutingConte
String id = jsonData.getStr("id");
try {
// TODO @haohaopluginKey 需要设置
// 设备上线
deviceUpstreamApi.updateDeviceState(((IotDeviceStateUpdateReqDTO)
new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID())
.setPluginKey("http")
.setReportTime(LocalDateTime.now())
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName))
.setState(IotDeviceStateEnum.ONLINE.getState()));
// 属性上报
deviceUpstreamApi.reportDeviceProperty(((IotDevicePropertyReportReqDTO)
new IotDevicePropertyReportReqDTO().setRequestId(IdUtil.fastSimpleUUID())
.setPluginKey("http").setReportTime(LocalDateTime.now())
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName))
.setProperties((Map<String, Object>) requestBody.asJsonObject().getMap().get("properties")));