【代码评审】IoT:插件机制

This commit is contained in:
YunaiV 2025-01-06 20:24:47 +08:00
parent 603649d248
commit b5856c4cfc
15 changed files with 86 additions and 75 deletions

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.api;
import java.util.HashMap;
import java.util.Map;
// TODO 芋艿纠结下
/**
* 服务注册表 - 插架模块使用无法使用 Spring 注入
*/

View File

@ -2,9 +2,12 @@ package cn.iocoder.yudao.module.iot.api.device;
/**
* 设备数据 API
*
* @author haohao
*/
public interface DeviceDataApi {
// TODO @haohao最好搞成 dto
/**
* 保存设备数据
*

View File

@ -5,9 +5,9 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
// TODO @芋艿要不要加个 mqtt 值了的前缀
/**
* MQTT RPC 请求
*
*/
@Data
@Builder
@ -23,6 +23,7 @@ public class RpcRequest {
/**
* 参数
*/
// TODO @haohaoobject 对象会不会不好序列化
private Object[] params;
/**
@ -34,4 +35,5 @@ public class RpcRequest {
* 回复地址
*/
private String replyTo;
}

View File

@ -7,7 +7,6 @@ import lombok.NoArgsConstructor;
/**
* MQTT RPC 响应
*
*/
@Data
@Builder
@ -23,10 +22,12 @@ public class RpcResponse {
/**
* 结果
*/
// TODO @haohaoobject 对象会不会不好反序列化
private Object result;
/**
* 错误
*/
private String error;
}

View File

@ -15,4 +15,5 @@ public class SerializationUtils {
public static <T> T deserialize(String json, Class<T> clazz) {
return JSONUtil.toBean(json, clazz);
}
}

View File

@ -9,7 +9,7 @@ import org.springframework.web.multipart.MultipartFile;
@Data
public class PluginInfoImportReqVO {
@Schema(description = "主键ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "11546")
@Schema(description = "主键 ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "11546")
private Long id;
@Schema(description = "插件文件", requiredMode = Schema.RequiredMode.REQUIRED)

View File

@ -1,7 +1,5 @@
package cn.iocoder.yudao.module.iot.controller.admin.plugin.vo;
import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;
import com.alibaba.excel.annotation.ExcelProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@ -9,63 +7,48 @@ import java.time.LocalDateTime;
@Schema(description = "管理后台 - IoT 插件信息 Response VO")
@Data
@ExcelIgnoreUnannotated
public class PluginInfoRespVO {
@Schema(description = "主键 ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "11546")
@ExcelProperty("主键 ID")
private Long id;
@Schema(description = "插件包标识符", requiredMode = Schema.RequiredMode.REQUIRED, example = "24627")
@ExcelProperty("插件包标识符")
private String pluginKey;
@Schema(description = "插件名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "赵六")
@ExcelProperty("插件名称")
private String name;
@Schema(description = "描述", example = "你猜")
@ExcelProperty("描述")
private String description;
@Schema(description = "部署方式", requiredMode = Schema.RequiredMode.REQUIRED, example = "2")
@ExcelProperty("部署方式")
private Integer deployType;
@Schema(description = "插件包文件名", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("插件包文件名")
private String fileName;
@Schema(description = "插件版本", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("插件版本")
private String version;
@Schema(description = "插件类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "2")
@ExcelProperty("插件类型")
private Integer type;
@Schema(description = "设备插件协议类型")
@ExcelProperty("设备插件协议类型")
private String protocol;
@Schema(description = "状态", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("状态")
private Integer status;
@Schema(description = "插件配置项描述信息")
@ExcelProperty("插件配置项描述信息")
private String configSchema;
@Schema(description = "插件配置信息")
@ExcelProperty("插件配置信息")
private String config;
@Schema(description = "插件脚本")
@ExcelProperty("插件脚本")
private String script;
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("创建时间")
private LocalDateTime createTime;
}

View File

@ -7,6 +7,10 @@ import lombok.*;
@Data
public class PluginInfoSaveReqVO {
// TODO @haohao新增的字段有点多每个都需要哇
// TODO @haohao一些枚举字段需要加枚举校验例如说deployTypestatustype
@Schema(description = "主键ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "11546")
private Long id;

View File

@ -21,6 +21,7 @@ public interface PluginInstanceMapper extends BaseMapperX<PluginInstanceDO> {
.eq(PluginInstanceDO::getPluginId, pluginId));
}
// TODO @haohao这个还需要么相关不用的 VO 可以删除
default PageResult<PluginInstanceDO> selectPage(PluginInstancePageReqVO reqVO) {
return selectPage(reqVO, new LambdaQueryWrapperX<PluginInstanceDO>()
.eqIfPresent(PluginInstanceDO::getMainId, reqVO.getMainId())

View File

@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.iot.job.plugin;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.service.plugin.PluginInstanceService;
import org.springframework.scheduling.annotation.Scheduled;
@ -26,4 +25,5 @@ public class PluginInstancesJob {
pluginInstanceService.updatePluginInstances();
});
}
}

View File

@ -15,6 +15,8 @@ import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
// TODO @芋艿server 逻辑再瞅瞅
// TODO @haohao如果只写在 iot biz 那么后续 server => client 貌似不方便微信再讨论下~
@Service
@Slf4j
public class RpcServer {
@ -90,6 +92,9 @@ public class RpcServer {
*/
@FunctionalInterface
public interface MethodInvoker {
Object invoke(Object[] params) throws Exception;
}
}

View File

@ -41,18 +41,18 @@ public class PluginInfoServiceImpl implements PluginInfoService {
@Resource
private PluginInfoMapper pluginInfoMapper;
@Resource
private SpringPluginManager pluginManager;
// TODO @芋艿要不要换位置
@Value("${pf4j.pluginsDir}")
private String pluginsDir;
@Override
public Long createPluginInfo(PluginInfoSaveReqVO createReqVO) {
// 插入
PluginInfoDO pluginInfo = BeanUtils.toBean(createReqVO, PluginInfoDO.class);
pluginInfoMapper.insert(pluginInfo);
// 返回
return pluginInfo.getId();
}
@ -67,29 +67,29 @@ public class PluginInfoServiceImpl implements PluginInfoService {
@Override
public void deletePluginInfo(Long id) {
// 校验存在
// 1.1 校验存在
PluginInfoDO pluginInfoDO = validatePluginInfoExists(id);
// 停止插件
// 1.2 停止插件
if (IotPluginStatusEnum.RUNNING.getStatus().equals(pluginInfoDO.getStatus())) {
throw exception(PLUGIN_INFO_DELETE_FAILED_RUNNING);
}
// 卸载插件
// 2. 卸载插件
// TODO @haohao可以复用 stopAndUnloadPlugin
PluginWrapper plugin = pluginManager.getPlugin(pluginInfoDO.getPluginKey());
if (plugin != null) {
// 查询插件是否是启动状态
if (plugin.getPluginState().equals(PluginState.STARTED)) {
// 停止插件
if (plugin.getPluginState().equals(PluginState.STARTED)) {
pluginManager.stopPlugin(plugin.getPluginId());
}
// 卸载插件
pluginManager.unloadPlugin(plugin.getPluginId());
}
// 删除
// 3.1 删除
pluginInfoMapper.deleteById(id);
// 删除插件文件
// 3.2 删除插件文件
// TODO @haohao这个直接主线程 sleep 就好了不用单独开线程池哈原因是低频操作另外只有存在的时候 sleep + 删除
Executors.newSingleThreadExecutor().submit(() -> {
try {
TimeUnit.SECONDS.sleep(1); // 等待 1 避免插件未卸载完毕
@ -101,7 +101,6 @@ public class PluginInfoServiceImpl implements PluginInfoService {
log.error("[deletePluginInfo][删除插件文件({}) 失败]", pluginInfoDO.getFileName(), e);
}
});
}
private PluginInfoDO validatePluginInfoExists(Long id) {
@ -127,22 +126,19 @@ public class PluginInfoServiceImpl implements PluginInfoService {
// 1. 校验插件信息是否存在
PluginInfoDO pluginInfoDo = validatePluginInfoExists(id);
// 2. 获取插件标识
String pluginKey = pluginInfoDo.getPluginKey();
// 2. 停止并卸载旧的插件
stopAndUnloadPlugin(pluginInfoDo.getPluginKey());
// 3. 停止并卸载旧的插件
stopAndUnloadPlugin(pluginKey);
// 4. 上传新的插件文件
// 3.1 上传新的插件文件
String pluginKeyNew = uploadAndLoadNewPlugin(file);
// 5. 更新插件启用状态文件
// 3.2 更新插件启用状态文件
updatePluginStatusFile(pluginKeyNew, false);
// 6. 更新插件信息
// 4. 更新插件信息
updatePluginInfo(pluginInfoDo, pluginKeyNew, file);
}
// TODO @haohao注释的格式
// 停止并卸载旧的插件
private void stopAndUnloadPlugin(String pluginKey) {
PluginWrapper plugin = pluginManager.getPlugin(pluginKey);
@ -154,10 +150,13 @@ public class PluginInfoServiceImpl implements PluginInfoService {
}
}
// TODO @haohao注释的格式
// 上传并加载新的插件文件
private String uploadAndLoadNewPlugin(MultipartFile file) {
// TODO @haohao多节点是不是要上传 s3 之类的存储器然后定时去加载
Path pluginsPath = Paths.get(pluginsDir);
try {
// TODO @haohao可以使用 FileUtil 简化
if (!Files.exists(pluginsPath)) {
Files.createDirectories(pluginsPath); // 创建插件目录
}
@ -166,16 +165,18 @@ public class PluginInfoServiceImpl implements PluginInfoService {
Path jarPath = pluginsPath.resolve(filename);
Files.copy(file.getInputStream(), jarPath, StandardCopyOption.REPLACE_EXISTING); // 保存上传的 JAR 文件
return pluginManager.loadPlugin(jarPath.toAbsolutePath()); // 加载插件
} else {
throw exception(PLUGIN_INSTALL_FAILED);
}
throw exception(PLUGIN_INSTALL_FAILED); // TODO @haohao这么抛的话貌似会被 catch (Exception e) {
} catch (Exception e) {
// TODO @haohao打个 error log方便排查
throw exception(PLUGIN_INSTALL_FAILED);
}
}
// TODO @haohao注释的格式
// 更新插件状态文件
private void updatePluginStatusFile(String pluginKeyNew, boolean isEnabled) {
// TODO @haohao疑问这里写 enabled.txt disabled.txt 的目的是啥哈
Path enabledFilePath = Paths.get(pluginsDir, "enabled.txt");
Path disabledFilePath = Paths.get(pluginsDir, "disabled.txt");
Path targetFilePath = isEnabled ? enabledFilePath : disabledFilePath;
@ -186,10 +187,8 @@ public class PluginInfoServiceImpl implements PluginInfoService {
if (pluginWrapper == null) {
throw exception(PLUGIN_INSTALL_FAILED);
}
List<String> targetLines = Files.exists(targetFilePath) ? Files.readAllLines(targetFilePath)
: new ArrayList<>();
List<String> oppositeLines = Files.exists(oppositeFilePath) ? Files.readAllLines(oppositeFilePath)
: new ArrayList<>();
List<String> targetLines = Files.exists(targetFilePath) ? Files.readAllLines(targetFilePath) : new ArrayList<>();
List<String> oppositeLines = Files.exists(oppositeFilePath) ? Files.readAllLines(oppositeFilePath) : new ArrayList<>();
if (!targetLines.contains(pluginKeyNew)) {
targetLines.add(pluginKeyNew);
@ -207,26 +206,33 @@ public class PluginInfoServiceImpl implements PluginInfoService {
}
}
// TODO @haohao注释的格式
// 更新插件信息
private void updatePluginInfo(PluginInfoDO pluginInfoDo, String pluginKeyNew, MultipartFile file) {
// TODO @haohao更新实体的时候最好 new 一个新的
// TODO @haohao可以链式调用简化下代码
pluginInfoDo.setPluginKey(pluginKeyNew);
pluginInfoDo.setStatus(IotPluginStatusEnum.STOPPED.getStatus());
pluginInfoDo.setFileName(file.getOriginalFilename());
pluginInfoDo.setScript("");
// 解析 pf4j 插件
PluginDescriptor pluginDescriptor = pluginManager.getPlugin(pluginKeyNew).getDescriptor();
pluginInfoDo.setConfigSchema(pluginDescriptor.getPluginDescription());
pluginInfoDo.setVersion(pluginDescriptor.getVersion());
pluginInfoDo.setDescription(pluginDescriptor.getPluginDescription());
// 执行更新
pluginInfoMapper.updateById(pluginInfoDo);
}
// TODO @haohaostatusstate 字段命名要统一下~
@Override
public void updatePluginStatus(Long id, Integer status) {
// 1. 校验插件信息是否存在
PluginInfoDO pluginInfoDo = validatePluginInfoExists(id);
// 2. 校验插件状态是否有效
// TODO @haohao直接参数校验掉通过 @InEnum
if (!IotPluginStatusEnum.contains(status)) {
throw exception(PLUGIN_STATUS_INVALID);
}
@ -237,17 +243,16 @@ public class PluginInfoServiceImpl implements PluginInfoService {
// 4. 根据状态更新插件
if (plugin != null) {
// 4.1 如果目标状态是运行且插件未启动则启动插件
// 4.1 启动如果目标状态是运行且插件未启动则启动插件
if (status.equals(IotPluginStatusEnum.RUNNING.getStatus())
&& plugin.getPluginState() != PluginState.STARTED) {
pluginManager.startPlugin(pluginKey);
updatePluginStatusFile(pluginKey, true); // 更新插件状态文件为启用
}
// 4.2 如果目标状态是停止且插件已启动则停止插件
else if (status.equals(IotPluginStatusEnum.STOPPED.getStatus())
updatePluginStatusFile(pluginKey, true);
// 4.2 停止如果目标状态是停止且插件已启动则停止插件
} else if (status.equals(IotPluginStatusEnum.STOPPED.getStatus())
&& plugin.getPluginState() == PluginState.STARTED) {
pluginManager.stopPlugin(pluginKey);
updatePluginStatusFile(pluginKey, false); // 更新插件状态文件为禁用
updatePluginStatusFile(pluginKey, false);
}
} else {
// 5. 插件不存在且状态为停止抛出异常
@ -257,17 +262,20 @@ public class PluginInfoServiceImpl implements PluginInfoService {
}
// 6. 更新数据库中的插件状态
// TODO @haohao新建新建 pluginInfoDo
pluginInfoDo.setStatus(status);
pluginInfoMapper.updateById(pluginInfoDo);
}
@Override
public List<PluginInfoDO> getPluginInfoList() {
return pluginInfoMapper.selectList(null);
return pluginInfoMapper.selectList();
}
// TODO @haohao可以改成 getPluginInfoListByStatus 更通用哈
@Override
public List<PluginInfoDO> getRunningPluginInfoList() {
return pluginInfoMapper.selectListByStatus(IotPluginStatusEnum.RUNNING.getStatus());
}
}

View File

@ -26,8 +26,9 @@ import java.util.List;
public class PluginInstanceServiceImpl implements PluginInstanceService {
/**
* 主程序id
* 主程序 ID
*/
// TODO @haohao这个可以后续确认下有没更合适的标识例如说 mac 地址之类的
public static final String MAIN_ID = IdUtil.fastSimpleUUID();
@Resource
@ -40,36 +41,37 @@ public class PluginInstanceServiceImpl implements PluginInstanceService {
@Value("${server.port:48080}")
private int port;
// TODO @haohao建议把 PluginInfoServiceImpl 里面 instance 相关的逻辑拿过来可能会更好info 处理信息instance 处理实例
// TODO @haohao这个改成 reportPluginInstance 会不会更合适哈
@Override
public void updatePluginInstances() {
// 1. 查询 pf4j 插件列表
// 1.1 查询 pf4j 插件列表
List<PluginWrapper> plugins = pluginManager.getPlugins();
// 2. 查询插件信息列表
// 1.2 查询插件信息列表
List<PluginInfoDO> pluginInfos = pluginInfoService.getPluginInfoList();
// 动态获取主程序的 IP 和端口
// 1.3 动态获取主程序的 IP 和端口
String mainIp = getLocalIpAddress();
// 3. 遍历插件列表并保存为插件实例
// 2. 遍历插件列表并保存为插件实例
for (PluginWrapper plugin : plugins) {
// 2.1 查找插件信息
String pluginKey = plugin.getPluginId();
// TODO @haohaoCollUtil.findOne() 简化
PluginInfoDO pluginInfo = pluginInfos.stream()
.filter(pluginInfoDO -> pluginInfoDO.getPluginKey().equals(pluginKey))
.findFirst()
.orElse(null);
// 4. 如果插件信息不存在则跳过
if (pluginInfo == null) {
// TODO @haohao建议打个 error log
continue;
}
// 5. 查询插件实例
// 2.2 查询插件实例
PluginInstanceDO pluginInstance = pluginInstanceMapper.selectByMainIdAndPluginId(MAIN_ID, pluginInfo.getId());
// 6. 如果插件实例不存在则创建
// 2.3.1 如果插件实例不存在则创建
if (pluginInstance == null) {
// TODO @haohao可以链式调用建议新建一个
pluginInstance = new PluginInstanceDO();
pluginInstance.setPluginId(pluginInfo.getId());
pluginInstance.setMainId(MAIN_ID);
@ -78,13 +80,14 @@ public class PluginInstanceServiceImpl implements PluginInstanceService {
pluginInstance.setHeartbeatAt(System.currentTimeMillis());
pluginInstanceMapper.insert(pluginInstance);
} else {
// 7. 如果插件实例存在则更新
// 2.3.2 如果插件实例存在则更新
pluginInstance.setHeartbeatAt(System.currentTimeMillis());
pluginInstanceMapper.updateById(pluginInstance);
}
}
}
// TODO @haohao这个目的是获取到第一个有效 ip 是哇
private String getLocalIpAddress() {
try {
List<String> ipList = NetUtil.localIpv4s().stream()

View File

@ -28,4 +28,5 @@ public class RpcController {
public CompletableFuture<Object> concat(@RequestParam String str1, @RequestParam String str2) throws Exception {
return rpcClient.call("concat", new Object[]{str1, str2}, 10);
}
}

View File

@ -1,17 +1,14 @@
package cn.iocoder.yudao.module.iot.mqttrpc.client;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest;
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse;
import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils;
import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
@ -20,6 +17,7 @@ import javax.annotation.PreDestroy;
import java.util.UUID;
import java.util.concurrent.*;
// TODO @芋艿需要考虑怎么公用
@Service
@Slf4j
public class RpcClient {