reactor:【IoT 物联网】清理心跳逻辑

This commit is contained in:
YunaiV 2025-05-30 22:34:43 +08:00
parent b4035cb036
commit 02c3aa748b
25 changed files with 168 additions and 1323 deletions

View File

@ -80,14 +80,4 @@ public interface IotDeviceUpstreamApi {
@PostMapping(PREFIX + "/authenticate-emqx-connection")
CommonResult<Boolean> authenticateEmqxConnection(@Valid @RequestBody IotDeviceEmqxAuthReqDTO authReqDTO);
// ========== 插件相关 ==========
/**
* 心跳插件实例
*
* @param heartbeatReqDTO 心跳插件实例 DTO
*/
@PostMapping(PREFIX + "/heartbeat-plugin-instance")
CommonResult<Boolean> heartbeatPluginInstance(@Valid @RequestBody IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO);
}

View File

@ -1,44 +0,0 @@
package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* IoT 插件实例心跳 Request DTO
*
* @author 芋道源码
*/
@Data
public class IotPluginInstanceHeartbeatReqDTO {
/**
* 请求编号
*/
@NotEmpty(message = "请求编号不能为空")
private String processId;
/**
* 插件包标识符
*/
@NotEmpty(message = "插件包标识符不能为空")
private String pluginKey;
/**
* 插件实例所在 IP
*/
@NotEmpty(message = "插件实例所在 IP 不能为空")
private String hostIp;
/**
* 插件实例的进程编号
*/
@NotNull(message = "插件实例的进程编号不能为空")
private Integer downstreamPort;
/**
* 是否在线
*/
@NotNull(message = "是否在线不能为空")
private Boolean online;
}

View File

@ -21,8 +21,6 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi {
@Resource
private IotDeviceUpstreamService deviceUpstreamService;
@Resource
private IotPluginInstanceService pluginInstanceService;
// ========== 设备相关 ==========
@ -68,12 +66,4 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi {
return success(result);
}
// ========== 插件相关 ==========
@Override
public CommonResult<Boolean> heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
pluginInstanceService.heartbeatPluginInstance(heartbeatReqDTO);
return success(true);
}
}

View File

@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.iot.service.plugin;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginConfigDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
import org.springframework.web.multipart.MultipartFile;
@ -14,13 +13,6 @@ import java.time.LocalDateTime;
*/
public interface IotPluginInstanceService {
/**
* 心跳插件实例
*
* @param heartbeatReqDTO 心跳插件实例 DTO
*/
void heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO);
/**
* 离线超时插件实例
*

View File

@ -2,8 +2,6 @@ package cn.iocoder.yudao.module.iot.service.plugin;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginConfigDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.IotPluginInstanceMapper;
@ -11,7 +9,6 @@ import cn.iocoder.yudao.module.iot.dal.redis.plugin.DevicePluginProcessIdRedisDA
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.multipart.MultipartFile;
@ -31,10 +28,6 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class IotPluginInstanceServiceImpl implements IotPluginInstanceService {
@Resource
@Lazy // 延迟加载避免循环依赖
private IotPluginConfigService pluginConfigService;
@Resource
private IotPluginInstanceMapper pluginInstanceMapper;
@ -47,47 +40,6 @@ public class IotPluginInstanceServiceImpl implements IotPluginInstanceService {
@Value("${pf4j.pluginsDir}")
private String pluginsDir;
@Override
public void heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
// 情况一已存在则进行更新
IotPluginInstanceDO instance = TenantUtils.executeIgnore(
() -> pluginInstanceMapper.selectByProcessId(heartbeatReqDTO.getProcessId()));
if (instance != null) {
IotPluginInstanceDO.IotPluginInstanceDOBuilder updateObj = IotPluginInstanceDO.builder().id(instance.getId())
.hostIp(heartbeatReqDTO.getHostIp()).downstreamPort(heartbeatReqDTO.getDownstreamPort())
.online(heartbeatReqDTO.getOnline()).heartbeatTime(LocalDateTime.now());
if (Boolean.TRUE.equals(heartbeatReqDTO.getOnline())) {
if (Boolean.FALSE.equals(instance.getOnline())) { // 当前处于离线时才需要更新上线时间
updateObj.onlineTime(LocalDateTime.now());
}
} else {
updateObj.offlineTime(LocalDateTime.now());
}
TenantUtils.execute(instance.getTenantId(),
() -> pluginInstanceMapper.updateById(updateObj.build()));
return;
}
// 情况二不存在则创建
IotPluginConfigDO info = TenantUtils.executeIgnore(
() -> pluginConfigService.getPluginConfigByPluginKey(heartbeatReqDTO.getPluginKey()));
if (info == null) {
log.error("[heartbeatPluginInstance][心跳({}) 对应的插件不存在]", heartbeatReqDTO);
return;
}
IotPluginInstanceDO.IotPluginInstanceDOBuilder insertObj = IotPluginInstanceDO.builder()
.pluginId(info.getId()).processId(heartbeatReqDTO.getProcessId())
.hostIp(heartbeatReqDTO.getHostIp()).downstreamPort(heartbeatReqDTO.getDownstreamPort())
.online(heartbeatReqDTO.getOnline()).heartbeatTime(LocalDateTime.now());
if (Boolean.TRUE.equals(heartbeatReqDTO.getOnline())) {
insertObj.onlineTime(LocalDateTime.now());
} else {
insertObj.offlineTime(LocalDateTime.now());
}
TenantUtils.execute(info.getTenantId(),
() -> pluginInstanceMapper.insert(insertObj.build()));
}
@Override
public int offlineTimeoutPluginInstance(LocalDateTime maxHeartbeatTime) {
List<IotPluginInstanceDO> list = pluginInstanceMapper.selectListByHeartbeatTimeLt(maxHeartbeatTime);

View File

@ -1,19 +1,11 @@
package cn.iocoder.yudao.module.iot.net.component.core.config;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentInstanceHeartbeatJob;
import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry;
import cn.iocoder.yudao.module.iot.net.component.core.upstream.IotDeviceUpstreamClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
// TODO @haohao应该不用写 spring.factories 因为被 imports 替代啦
/**
* IoT 网络组件的通用自动配置类
*
@ -24,33 +16,6 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling // 开启定时任务因为 IotNetComponentInstanceHeartbeatJob 是一个定时任务
public class IotNetComponentCommonAutoConfiguration {
/**
* 创建 EMQX 设备下行服务器
* <p>
* yudao.iot.component.emqx.enabled = true 优先使用 emqxDeviceDownstreamHandler
*/
@Bean
@ConditionalOnProperty(prefix = "yudao.iot.component.emqx", name = "enabled", havingValue = "true")
public IotDeviceDownstreamServer emqxDeviceDownstreamServer(
IotNetComponentCommonProperties properties,
@Qualifier("emqxDeviceDownstreamHandler") IotDeviceDownstreamHandler deviceDownstreamHandler) {
return new IotDeviceDownstreamServer(properties, deviceDownstreamHandler);
}
/**
* 创建网络组件实例心跳任务
*/
@Bean(initMethod = "init", destroyMethod = "stop")
public IotNetComponentInstanceHeartbeatJob pluginInstanceHeartbeatJob(
IotDeviceUpstreamApi deviceUpstreamApi,
IotNetComponentCommonProperties commonProperties,
IotNetComponentRegistry componentRegistry) {
return new IotNetComponentInstanceHeartbeatJob(
deviceUpstreamApi,
commonProperties,
componentRegistry);
}
/**
* 创建设备上行客户端
*/
@ -58,4 +23,5 @@ public class IotNetComponentCommonAutoConfiguration {
public IotDeviceUpstreamClient deviceUpstreamClient() {
return new IotDeviceUpstreamClient();
}
}

View File

@ -21,38 +21,4 @@ public class IotNetComponentCommonProperties {
*/
private String pluginKey;
/**
* 组件实例心跳超时时间单位毫秒
* <p>
* 默认值30
*/
private Long instanceHeartbeatTimeout = 30000L;
/**
* 网络组件消息转发配置
*/
// private ForwardMessage forwardMessage = new ForwardMessage();
/**
* 消息转发配置
*/
/*
* @Data
* public static class ForwardMessage {
*
* /**
* 是否转发所有设备消息到 EMQX
* <p>
* 默认为 true 开启
*/
// private boolean forwardAllDeviceMessageToEmqx = true;
/**
* 是否转发所有设备消息到 HTTP
* <p>
* 默认为 false 关闭
*/
// private boolean forwardAllDeviceMessageToHttp = false;
// }
}

View File

@ -1,55 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.core.downstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
/**
* IoT 设备下行处理器
* <p>
* 目的每个 plugin 需要实现用于处理 server 下行的指令请求从而实现从 server => plugin => device 的下行流程
*
* @author 芋道源码
*/
public interface IotDeviceDownstreamHandler {
/**
* 调用设备服务
*
* @param invokeReqDTO 调用设备服务的请求
* @return 是否成功
*/
CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO);
/**
* 获取设备属性
*
* @param getReqDTO 获取设备属性的请求
* @return 是否成功
*/
CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO);
/**
* 设置设备属性
*
* @param setReqDTO 设置设备属性的请求
* @return 是否成功
*/
CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO);
/**
* 设置设备配置
*
* @param setReqDTO 设置设备配置的请求
* @return 是否成功
*/
CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO);
/**
* 升级设备 OTA
*
* @param upgradeReqDTO 升级设备 OTA 的请求
* @return 是否成功
*/
CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO);
}

View File

@ -1,80 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.core.downstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 设备下行服务直接转发给 device 设备
*
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotDeviceDownstreamServer {
private final IotNetComponentCommonProperties properties;
private final IotDeviceDownstreamHandler deviceDownstreamHandler;
/**
* 调用设备服务
*
* @param invokeReqDTO 调用设备服务的请求
* @return 是否成功
*/
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
return deviceDownstreamHandler.invokeDeviceService(invokeReqDTO);
}
/**
* 获取设备属性
*
* @param getReqDTO 获取设备属性的请求
* @return 是否成功
*/
public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
return deviceDownstreamHandler.getDeviceProperty(getReqDTO);
}
/**
* 设置设备属性
*
* @param setReqDTO 设置设备属性的请求
* @return 是否成功
*/
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
return deviceDownstreamHandler.setDeviceProperty(setReqDTO);
}
/**
* 设置设备配置
*
* @param setReqDTO 设置设备配置的请求
* @return 是否成功
*/
public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
return deviceDownstreamHandler.setDeviceConfig(setReqDTO);
}
/**
* 升级设备 OTA
*
* @param upgradeReqDTO 升级设备 OTA 的请求
* @return 是否成功
*/
public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
return deviceDownstreamHandler.upgradeDeviceOta(upgradeReqDTO);
}
/**
* 获得内部组件标识
*
* @return 组件标识
*/
public String getComponentId() {
return properties.getPluginKey();
}
}

View File

@ -1,111 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.core.heartbeat;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonProperties;
import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry.IotNetComponentInfo;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
* IoT 网络组件实例心跳定时任务
* <p>
* 将组件的状态定时上报给 server 服务器
*
* @author haohao
*/
@RequiredArgsConstructor
@Slf4j
public class IotNetComponentInstanceHeartbeatJob {
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final IotNetComponentCommonProperties commonProperties;
private final IotNetComponentRegistry componentRegistry;
/**
* 初始化方法 Spring 调用注册当前组件并发送上线心跳
*/
public void init() {
// 发送所有组件的上线心跳
Collection<IotNetComponentInfo> components = componentRegistry.getAllComponents();
if (CollUtil.isEmpty(components)) {
return;
}
for (IotNetComponentInfo component : components) {
try {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(
buildPluginInstanceHeartbeatReqDTO(component, true));
log.info("[init][组件({})上线结果:{}]", component.getPluginKey(), result);
} catch (Exception e) {
log.error("[init][组件({})上线发送异常]", component.getPluginKey(), e);
}
}
}
/**
* 停止方法 Spring 调用发送下线心跳并注销组件
*/
public void stop() {
// 发送所有组件的下线心跳
Collection<IotNetComponentInfo> components = componentRegistry.getAllComponents();
if (CollUtil.isEmpty(components)) {
return;
}
for (IotNetComponentInfo component : components) {
try {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(
buildPluginInstanceHeartbeatReqDTO(component, false));
log.info("[stop][组件({})下线结果:{}]", component.getPluginKey(), result);
} catch (Exception e) {
log.error("[stop][组件({})下线发送异常]", component.getPluginKey(), e);
}
}
// 注销当前组件
componentRegistry.unregisterComponent(commonProperties.getPluginKey());
}
/**
* 定时发送心跳
*/
@Scheduled(initialDelay = 1, fixedRate = 1, timeUnit = TimeUnit.MINUTES) // 1 分钟执行一次
public void execute() {
// 发送所有组件的心跳
Collection<IotNetComponentInfo> components = componentRegistry.getAllComponents();
if (CollUtil.isEmpty(components)) {
return;
}
for (IotNetComponentInfo component : components) {
try {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(
buildPluginInstanceHeartbeatReqDTO(component, true));
log.info("[execute][组件({})心跳结果:{}]", component.getPluginKey(), result);
} catch (Exception e) {
log.error("[execute][组件({})心跳发送异常]", component.getPluginKey(), e);
}
}
}
/**
* 构建心跳 DTO
*
* @param component 组件信息
* @param online 是否在线
* @return 心跳 DTO
*/
private IotPluginInstanceHeartbeatReqDTO buildPluginInstanceHeartbeatReqDTO(IotNetComponentInfo component,
Boolean online) {
return new IotPluginInstanceHeartbeatReqDTO()
.setPluginKey(component.getPluginKey()).setProcessId(component.getProcessId())
.setHostIp(component.getHostIp()).setDownstreamPort(component.getDownstreamPort())
.setOnline(online);
}
}

View File

@ -1,98 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.core.heartbeat;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* IoT 网络组件注册表
* <p>
* 用于管理多个网络组件的注册信息解决多组件心跳问题
*
* @author haohao
*/
@Component
@Slf4j
public class IotNetComponentRegistry {
/**
* 网络组件信息
*/
@Data
public static class IotNetComponentInfo {
/**
* 组件 Key
*/
private final String pluginKey;
/**
* 主机 IP
*/
private final String hostIp;
/**
* 下游端口
*/
private final Integer downstreamPort;
/**
* 进程 ID
*/
private final String processId;
}
/**
* 组件映射表key 为组件 Key
*/
private final Map<String, IotNetComponentInfo> components = new ConcurrentHashMap<>();
/**
* 注册网络组件
*
* @param pluginKey 组件 Key
* @param hostIp 主机 IP
* @param downstreamPort 下游端口
* @param processId 进程 ID
*/
public void registerComponent(String pluginKey, String hostIp, Integer downstreamPort, String processId) {
log.info("[registerComponent][注册网络组件, pluginKey={}, hostIp={}, downstreamPort={}, processId={}]",
pluginKey, hostIp, downstreamPort, processId);
components.put(pluginKey, new IotNetComponentInfo(pluginKey, hostIp, downstreamPort, processId));
}
/**
* 注销网络组件
*
* @param pluginKey 组件 Key
*/
public void unregisterComponent(String pluginKey) {
log.info("[unregisterComponent][注销网络组件, pluginKey={}]", pluginKey);
components.remove(pluginKey);
}
/**
* 获取所有网络组件
*
* @return 所有组件集合
*/
public Collection<IotNetComponentInfo> getAllComponents() {
return CollUtil.isEmpty(components) ? CollUtil.newArrayList() : components.values();
}
/**
* 获取指定网络组件
*
* @param pluginKey 组件 Key
* @return 组件信息
*/
public IotNetComponentInfo getComponent(String pluginKey) {
return MapUtil.isEmpty(components) ? null : components.get(pluginKey);
}
}

View File

@ -53,8 +53,4 @@ public class IotDeviceUpstreamClient implements IotDeviceUpstreamApi {
return deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
}
@Override
public CommonResult<Boolean> heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
return deviceUpstreamApi.heartbeatPluginInstance(heartbeatReqDTO);
}
}

View File

@ -1,2 +0,0 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonAutoConfiguration

View File

@ -2,13 +2,7 @@ package cn.iocoder.yudao.module.iot.net.component.emqx.config;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonProperties;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
import cn.iocoder.yudao.module.iot.net.component.emqx.downstream.IotDeviceDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.net.component.emqx.upstream.IotDeviceUpstreamServer;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttClient;
@ -37,16 +31,6 @@ import org.springframework.context.event.EventListener;
@Slf4j
public class IotNetComponentEmqxAutoConfiguration {
/**
* 组件 key
*/
private static final String PLUGIN_KEY = "emqx";
// TODO @haohao这个是不是要去掉哈
public IotNetComponentEmqxAutoConfiguration() {
// 构造函数中不输出日志移到 initialize 方法中
}
/**
* 初始化 EMQX 组件
*
@ -57,21 +41,7 @@ public class IotNetComponentEmqxAutoConfiguration {
log.info("[IotNetComponentEmqxAutoConfiguration][开始初始化]");
// 从应用上下文中获取需要的 Bean
IotNetComponentRegistry componentRegistry = event.getApplicationContext()
.getBean(IotNetComponentRegistry.class);
IotNetComponentCommonProperties commonProperties = event.getApplicationContext()
.getBean(IotNetComponentCommonProperties.class);
// 设置当前组件的核心标识
// 注意这里只为当前 EMQX 组件设置 pluginKey不影响其他组件
commonProperties.setPluginKey(PLUGIN_KEY);
// EMQX 组件注册到组件注册表
componentRegistry.registerComponent(
PLUGIN_KEY,
SystemUtil.getHostInfo().getAddress(),
0, // 内嵌模式固定为 0
IotNetComponentCommonUtils.getProcessId());
// TODO @芋艿看看要不要监听下
log.info("[initialize][IoT EMQX 组件初始化完成]");
}
@ -89,15 +59,6 @@ public class IotNetComponentEmqxAutoConfiguration {
*/
@Bean
public MqttClient mqttClient(@Qualifier("emqxVertx") Vertx vertx, IotNetComponentEmqxProperties emqxProperties) {
// 使用 debug 级别记录详细配置减少生产环境日志
if (log.isDebugEnabled()) {
log.debug("MQTT 配置: host={}, port={}, username={}, ssl={}",
emqxProperties.getMqttHost(), emqxProperties.getMqttPort(),
emqxProperties.getMqttUsername(), emqxProperties.getMqttSsl());
} else {
log.info("MQTT 连接至: {}:{}", emqxProperties.getMqttHost(), emqxProperties.getMqttPort());
}
MqttClientOptions options = new MqttClientOptions()
.setClientId("yudao-iot-downstream-" + IdUtil.fastSimpleUUID())
.setUsername(emqxProperties.getMqttUsername())
@ -115,16 +76,8 @@ public class IotNetComponentEmqxAutoConfiguration {
IotDeviceUpstreamApi deviceUpstreamApi,
IotNetComponentEmqxProperties emqxProperties,
@Qualifier("emqxVertx") Vertx vertx,
MqttClient mqttClient,
IotNetComponentRegistry componentRegistry) {
return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi, vertx, mqttClient, componentRegistry);
MqttClient mqttClient) {
return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi, vertx, mqttClient);
}
/**
* 创建设备下行处理器
*/
@Bean(name = "emqxDeviceDownstreamHandler")
public IotDeviceDownstreamHandler deviceDownstreamHandler(MqttClient mqttClient) {
return new IotDeviceDownstreamHandlerImpl(mqttClient);
}
}

View File

@ -1,136 +1,121 @@
package cn.iocoder.yudao.module.iot.net.component.emqx.downstream;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.net.component.core.constants.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.core.message.IotMqttMessage;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.MQTT_TOPIC_ILLEGAL;
/**
* EMQX 网络组件的 {@link IotDeviceDownstreamHandler} 实现类
*
* @author 芋道源码
*/
@Slf4j
public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
/**
* MQTT 客户端
*/
private final MqttClient mqttClient;
/**
* 构造函数
*
* @param mqttClient MQTT 客户端
*/
public IotDeviceDownstreamHandlerImpl(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
@Override
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO reqDTO) {
log.info("[invokeService][开始调用设备服务][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
// 验证参数
if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null || reqDTO.getIdentifier() == null) {
log.error("[invokeService][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
}
try {
// 构建请求主题
String topic = IotDeviceTopicEnum.buildServiceTopic(reqDTO.getProductKey(), reqDTO.getDeviceName(),
reqDTO.getIdentifier());
// 构建请求消息
String requestId = StrUtil.isNotEmpty(reqDTO.getRequestId()) ? reqDTO.getRequestId()
: IotNetComponentCommonUtils.generateRequestId();
IotMqttMessage message = IotMqttMessage.createServiceInvokeMessage(
requestId, reqDTO.getIdentifier(), reqDTO.getParams());
// 发送消息
publishMessage(topic, message.toJsonObject());
log.info("[invokeService][调用设备服务成功][requestId: {}][topic: {}]", requestId, topic);
return CommonResult.success(true);
} catch (Exception e) {
log.error("[invokeService][调用设备服务异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
}
}
@Override
public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
// 暂未实现返回成功
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO reqDTO) {
log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
// 验证参数
if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null) {
log.error("[setProperty][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
}
try {
// 构建请求主题
String topic = IotDeviceTopicEnum.buildPropertySetTopic(reqDTO.getProductKey(), reqDTO.getDeviceName());
// 构建请求消息
String requestId = StrUtil.isNotEmpty(reqDTO.getRequestId()) ? reqDTO.getRequestId()
: IotNetComponentCommonUtils.generateRequestId();
IotMqttMessage message = IotMqttMessage.createPropertySetMessage(requestId, reqDTO.getProperties());
// 发送消息
publishMessage(topic, message.toJsonObject());
log.info("[setProperty][设置设备属性成功][requestId: {}][topic: {}]", requestId, topic);
return CommonResult.success(true);
} catch (Exception e) {
log.error("[setProperty][设置设备属性异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
}
}
@Override
public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
// 暂未实现返回成功
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
// 暂未实现返回成功
return CommonResult.success(true);
}
/**
* 发布 MQTT 消息
*
* @param topic 主题
* @param payload 消息内容
*/
private void publishMessage(String topic, JSONObject payload) {
mqttClient.publish(
topic,
Buffer.buffer(payload.toString()),
MqttQoS.AT_LEAST_ONCE,
false,
false);
log.info("[publishMessage][发送消息成功][topic: {}][payload: {}]", topic, payload);
}
}
// TODO @芋艿后续再支持下@haohao改成消费者
///**
// * EMQX 网络组件的 {@link IotDeviceDownstreamHandler} 实现类
// *
// * @author 芋道源码
// */
//@Slf4j
//public class IotDeviceDownstreamHandlerImpl {
//
// /**
// * MQTT 客户端
// */
// private final MqttClient mqttClient;
//
// /**
// * 构造函数
// *
// * @param mqttClient MQTT 客户端
// */
// public IotDeviceDownstreamHandlerImpl(MqttClient mqttClient) {
// this.mqttClient = mqttClient;
// }
//
// @Override
// public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO reqDTO) {
// log.info("[invokeService][开始调用设备服务][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
//
// // 验证参数
// if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null || reqDTO.getIdentifier() == null) {
// log.error("[invokeService][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
// return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
// }
//
// try {
// // 构建请求主题
// String topic = IotDeviceTopicEnum.buildServiceTopic(reqDTO.getProductKey(), reqDTO.getDeviceName(),
// reqDTO.getIdentifier());
//
// // 构建请求消息
// String requestId = StrUtil.isNotEmpty(reqDTO.getRequestId()) ? reqDTO.getRequestId()
// : IotNetComponentCommonUtils.generateRequestId();
// IotMqttMessage message = IotMqttMessage.createServiceInvokeMessage(
// requestId, reqDTO.getIdentifier(), reqDTO.getParams());
//
// // 发送消息
// publishMessage(topic, message.toJsonObject());
//
// log.info("[invokeService][调用设备服务成功][requestId: {}][topic: {}]", requestId, topic);
// return CommonResult.success(true);
// } catch (Exception e) {
// log.error("[invokeService][调用设备服务异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
// return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
// }
// }
//
// @Override
// public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
// // 暂未实现返回成功
// return CommonResult.success(true);
// }
//
// @Override
// public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO reqDTO) {
// log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
//
// // 验证参数
// if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null) {
// log.error("[setProperty][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
// return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
// }
//
// try {
// // 构建请求主题
// String topic = IotDeviceTopicEnum.buildPropertySetTopic(reqDTO.getProductKey(), reqDTO.getDeviceName());
//
// // 构建请求消息
// String requestId = StrUtil.isNotEmpty(reqDTO.getRequestId()) ? reqDTO.getRequestId()
// : IotNetComponentCommonUtils.generateRequestId();
// IotMqttMessage message = IotMqttMessage.createPropertySetMessage(requestId, reqDTO.getProperties());
//
// // 发送消息
// publishMessage(topic, message.toJsonObject());
//
// log.info("[setProperty][设置设备属性成功][requestId: {}][topic: {}]", requestId, topic);
// return CommonResult.success(true);
// } catch (Exception e) {
// log.error("[setProperty][设置设备属性异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
// return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
// }
// }
//
// @Override
// public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
// // 暂未实现返回成功
// return CommonResult.success(true);
// }
//
// @Override
// public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
// // 暂未实现返回成功
// return CommonResult.success(true);
// }
//
// /**
// * 发布 MQTT 消息
// *
// * @param topic 主题
// * @param payload 消息内容
// */
// private void publishMessage(String topic, JSONObject payload) {
// mqttClient.publish(
// topic,
// Buffer.buffer(payload.toString()),
// MqttQoS.AT_LEAST_ONCE,
// false,
// false);
// log.info("[publishMessage][发送消息成功][topic: {}][payload: {}]", topic, payload);
// }
//}

View File

@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.iot.net.component.emqx.upstream;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry;
import cn.iocoder.yudao.module.iot.net.component.emqx.config.IotNetComponentEmqxProperties;
import cn.iocoder.yudao.module.iot.net.component.emqx.upstream.router.IotDeviceAuthVertxHandler;
import cn.iocoder.yudao.module.iot.net.component.emqx.upstream.router.IotDeviceMqttMessageHandler;
@ -40,7 +39,6 @@ public class IotDeviceUpstreamServer {
private final MqttClient client;
private final IotNetComponentEmqxProperties emqxProperties;
private final IotDeviceMqttMessageHandler mqttMessageHandler;
private final IotNetComponentRegistry componentRegistry;
/**
* 服务运行状态标志
@ -50,12 +48,10 @@ public class IotDeviceUpstreamServer {
public IotDeviceUpstreamServer(IotNetComponentEmqxProperties emqxProperties,
IotDeviceUpstreamApi deviceUpstreamApi,
Vertx vertx,
MqttClient client,
IotNetComponentRegistry componentRegistry) {
MqttClient client) {
this.vertx = vertx;
this.emqxProperties = emqxProperties;
this.client = client;
this.componentRegistry = componentRegistry;
// 创建 Router 实例
Router router = Router.router(vertx);

View File

@ -5,8 +5,6 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.http.downstream.IotDeviceDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.net.component.http.upstream.IotDeviceUpstreamServer;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
@ -89,14 +87,4 @@ public class IotNetComponentHttpAutoConfiguration {
return new IotDeviceUpstreamServer(vertx, properties, deviceUpstreamApi, deviceMessageProducer);
}
/**
* 创建设备下行处理器
*
* @return 设备下行处理器
*/
@Bean(name = "httpDeviceDownstreamHandler")
public IotDeviceDownstreamHandler deviceDownstreamHandler() {
return new IotDeviceDownstreamHandlerImpl();
}
}

View File

@ -1,50 +1,44 @@
package cn.iocoder.yudao.module.iot.net.component.http.downstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.NOT_IMPLEMENTED;
/**
* HTTP 网络组件的 {@link IotDeviceDownstreamHandler} 实现类
* <p>
* 但是由于设备通过 HTTP 短链接接入导致其实无法下行指导给 device 设备所以基本都是直接返回失败
* 类似 MQTTWebSocketTCP 网络组件是可以实现下行指令的
*
* @author 芋道源码
*/
@Slf4j
public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
/**
* 不支持的错误消息
*/
private static final String NOT_SUPPORTED_MSG = "HTTP 不支持设备下行通信";
@Override
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
}
@Override
public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
}
@Override
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
}
@Override
public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
}
@Override
public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
}
}
// TODO @芋艿实现下
///**
// * HTTP 网络组件的 {@link IotDeviceDownstreamHandler} 实现类
// * <p>
// * 但是由于设备通过 HTTP 短链接接入导致其实无法下行指导给 device 设备所以基本都是直接返回失败
// * 类似 MQTTWebSocketTCP 网络组件是可以实现下行指令的
// *
// * @author 芋道源码
// */
//@Slf4j
//public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
//
// /**
// * 不支持的错误消息
// */
// private static final String NOT_SUPPORTED_MSG = "HTTP 不支持设备下行通信";
//
// @Override
// public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//
// @Override
// public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//
// @Override
// public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//
// @Override
// public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//
// @Override
// public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//}

View File

@ -1,10 +1,6 @@
package cn.iocoder.yudao.module.iot.net.component.server.config;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamServer;
import cn.iocoder.yudao.module.iot.net.component.server.heartbeat.IotComponentHeartbeatJob;
import cn.iocoder.yudao.module.iot.net.component.server.upstream.IotComponentUpstreamClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@ -54,45 +50,6 @@ public class IotNetComponentServerConfiguration {
return new IotComponentUpstreamClient(properties, restTemplate);
}
/**
* 配置设备下行处理器
*
* @return 下行处理器
*/
@Bean
@Primary
public IotDeviceDownstreamHandler deviceDownstreamHandler() {
return new IotComponentDownstreamHandlerImpl();
}
/**
* 配置下行服务器
*
* @param properties 配置
* @param downstreamHandler 下行处理器
* @return 下行服务器
*/
@Bean(initMethod = "start", destroyMethod = "stop")
public IotComponentDownstreamServer deviceDownstreamServer(IotNetComponentServerProperties properties,
@org.springframework.beans.factory.annotation.Qualifier("deviceDownstreamHandler") IotDeviceDownstreamHandler downstreamHandler) {
return new IotComponentDownstreamServer(properties, downstreamHandler);
}
/**
* 配置心跳任务
*
* @param deviceUpstreamApi 上行接口
* @param downstreamServer 下行服务器
* @param properties 配置
* @return 心跳任务
*/
@Bean(initMethod = "init", destroyMethod = "stop")
public IotComponentHeartbeatJob heartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi,
IotComponentDownstreamServer downstreamServer,
IotNetComponentServerProperties properties) {
return new IotComponentHeartbeatJob(deviceUpstreamApi, downstreamServer, properties);
}
/**
* 配置默认的设备上行客户端避免在独立运行模式下的循环依赖问题
*

View File

@ -47,10 +47,4 @@ public class IotNetComponentServerProperties {
*/
private String serverKey = "yudao-module-iot-net-component-server";
/**
* 心跳发送频率单位毫秒
* <p>
* 默认30
*/
private Long heartbeatInterval = 30000L;
}
}

View File

@ -1,65 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.server.downstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.SUCCESS;
/**
* 网络组件下行处理器实现
* <p>
* 处理来自主程序的设备控制指令
*
* @author haohao
*/
@Slf4j
public class IotComponentDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
@Override
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
log.info("[invokeDeviceService][收到服务调用请求:{}]", invokeReqDTO);
// 在这里处理服务调用可以根据设备类型转发到对应的处理器
// MQTT 设备HTTP 设备等的具体实现
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
log.info("[getDeviceProperty][收到属性获取请求:{}]", getReqDTO);
// 在这里处理属性获取请求
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
log.info("[setDeviceProperty][收到属性设置请求:{}]", setReqDTO);
// 在这里处理属性设置请求
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
log.info("[setDeviceConfig][收到配置设置请求:{}]", setReqDTO);
// 在这里处理配置设置请求
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
log.info("[upgradeDeviceOta][收到OTA升级请求{}]", upgradeReqDTO);
// 在这里处理OTA升级请求
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
}

View File

@ -1,310 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.server.downstream;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
/**
* 组件下行服务器接收来自主程序的控制指令
*
* @author haohao
*/
@Slf4j
public class IotComponentDownstreamServer {
public static final String SERVICE_INVOKE_PATH = "/sys/:productKey/:deviceName/thing/service/:identifier";
public static final String PROPERTY_SET_PATH = "/sys/:productKey/:deviceName/thing/service/property/set";
public static final String PROPERTY_GET_PATH = "/sys/:productKey/:deviceName/thing/service/property/get";
public static final String CONFIG_SET_PATH = "/sys/:productKey/:deviceName/thing/service/config/set";
public static final String OTA_UPGRADE_PATH = "/sys/:productKey/:deviceName/thing/service/ota/upgrade";
private final Vertx vertx;
private final HttpServer server;
private final IotNetComponentServerProperties properties;
private final IotDeviceDownstreamHandler downstreamHandler;
public IotComponentDownstreamServer(IotNetComponentServerProperties properties,
IotDeviceDownstreamHandler downstreamHandler) {
this.properties = properties;
this.downstreamHandler = downstreamHandler;
// 创建 Vertx 实例
this.vertx = Vertx.vertx();
// 创建 Router 实例
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create()); // 处理 Body
// 服务调用路由
router.post(SERVICE_INVOKE_PATH).handler(this::handleServiceInvoke);
// 属性设置路由
router.post(PROPERTY_SET_PATH).handler(this::handlePropertySet);
// 属性获取路由
router.post(PROPERTY_GET_PATH).handler(this::handlePropertyGet);
// 配置设置路由
router.post(CONFIG_SET_PATH).handler(this::handleConfigSet);
// OTA 升级路由
router.post(OTA_UPGRADE_PATH).handler(this::handleOtaUpgrade);
// 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router);
}
/**
* 启动服务器
*/
public void start() {
log.info("[start][开始启动下行服务器]");
server.listen(properties.getDownstreamPort())
.toCompletionStage()
.toCompletableFuture()
.join();
log.info("[start][下行服务器启动完成,端口({})]", server.actualPort());
}
/**
* 停止服务器
*/
public void stop() {
log.info("[stop][开始关闭下行服务器]");
try {
// 关闭 HTTP 服务器
if (server != null) {
server.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
// 关闭 Vertx 实例
if (vertx != null) {
vertx.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
log.info("[stop][下行服务器关闭完成]");
} catch (Exception e) {
log.error("[stop][下行服务器关闭异常]", e);
throw new RuntimeException(e);
}
}
/**
* 获取服务器端口
*
* @return 端口号
*/
public int getPort() {
return server.actualPort();
}
/**
* 处理服务调用请求
*/
private void handleServiceInvoke(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
String identifier = ctx.pathParam("identifier");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object params = body.getMap().get("params");
// 创建请求对象
IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
reqDTO.setIdentifier(identifier);
reqDTO.setParams((Map<String, Object>) params);
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.invokeDeviceService(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handleServiceInvoke][处理服务调用请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理服务调用请求失败:" + e.getMessage())));
}
}
/**
* 处理属性设置请求
*/
private void handlePropertySet(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object properties = body.getMap().get("properties");
// 创建请求对象
IotDevicePropertySetReqDTO reqDTO = new IotDevicePropertySetReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
reqDTO.setProperties((Map<String, Object>) properties);
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.setDeviceProperty(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handlePropertySet][处理属性设置请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理属性设置请求失败:" + e.getMessage())));
}
}
/**
* 处理属性获取请求
*/
private void handlePropertyGet(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object identifiers = body.getMap().get("identifiers");
// 创建请求对象
IotDevicePropertyGetReqDTO reqDTO = new IotDevicePropertyGetReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
reqDTO.setIdentifiers((List<String>) identifiers);
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.getDeviceProperty(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handlePropertyGet][处理属性获取请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理属性获取请求失败:" + e.getMessage())));
}
}
/**
* 处理配置设置请求
*/
private void handleConfigSet(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object config = body.getMap().get("config");
// 创建请求对象
IotDeviceConfigSetReqDTO reqDTO = new IotDeviceConfigSetReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
reqDTO.setConfig((Map<String, Object>) config);
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.setDeviceConfig(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handleConfigSet][处理配置设置请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理配置设置请求失败:" + e.getMessage())));
}
}
/**
* 处理 OTA 升级请求
*/
private void handleOtaUpgrade(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object data = body.getMap().get("data");
// 创建请求对象
IotDeviceOtaUpgradeReqDTO reqDTO = new IotDeviceOtaUpgradeReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
// 数据采用 IotDeviceOtaUpgradeReqDTO.build 方法转换
if (data instanceof Map) {
IotDeviceOtaUpgradeReqDTO builtDTO = IotDeviceOtaUpgradeReqDTO.build((Map<?, ?>) data);
reqDTO.setFirmwareId(builtDTO.getFirmwareId());
reqDTO.setVersion(builtDTO.getVersion());
reqDTO.setSignMethod(builtDTO.getSignMethod());
reqDTO.setFileSign(builtDTO.getFileSign());
reqDTO.setFileSize(builtDTO.getFileSize());
reqDTO.setFileUrl(builtDTO.getFileUrl());
reqDTO.setInformation(builtDTO.getInformation());
}
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.upgradeDeviceOta(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handleOtaUpgrade][处理OTA升级请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理OTA升级请求失败" + e.getMessage())));
}
}
}

View File

@ -1,98 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.server.heartbeat;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties;
import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamServer;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// TODO @haohao有办法服用 yudao-module-iot-net-component-core 的么就是 server只是一个启动器没什么特殊的功能
/**
* IoT 组件心跳任务
* <p>
* 定期向主程序发送心跳报告组件服务状态
*
* @author haohao
*/
@Slf4j
public class IotComponentHeartbeatJob {
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final IotComponentDownstreamServer downstreamServer;
private final IotNetComponentServerProperties properties;
private ScheduledExecutorService executorService;
public IotComponentHeartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi,
IotComponentDownstreamServer downstreamServer,
IotNetComponentServerProperties properties) {
this.deviceUpstreamApi = deviceUpstreamApi;
this.downstreamServer = downstreamServer;
this.properties = properties;
}
/**
* 初始化心跳任务
*/
public void init() {
log.info("[init][开始初始化心跳任务]");
// 创建一个单线程的调度线程池
executorService = new ScheduledThreadPoolExecutor(1);
// 延迟 5 秒后开始执行避免服务刚启动就发送心跳
executorService.scheduleAtFixedRate(this::sendHeartbeat,
5000, properties.getHeartbeatInterval(), TimeUnit.MILLISECONDS);
log.info("[init][心跳任务初始化完成]");
}
/**
* 停止心跳任务
*/
public void stop() {
log.info("[stop][开始停止心跳任务]");
if (executorService != null) {
executorService.shutdown();
executorService = null;
}
log.info("[stop][心跳任务已停止]");
}
/**
* 发送心跳
*/
private void sendHeartbeat() {
try {
// 创建心跳请求
IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO = new IotPluginInstanceHeartbeatReqDTO();
// 设置插件标识
heartbeatReqDTO.setPluginKey(properties.getServerKey());
// 设置进程ID
heartbeatReqDTO.setProcessId(String.valueOf(ProcessHandle.current().pid()));
// 设置IP和端口
try {
String hostIp = SystemUtil.getHostInfo().getAddress();
heartbeatReqDTO.setHostIp(hostIp);
heartbeatReqDTO.setDownstreamPort(downstreamServer.getPort());
} catch (Exception e) {
log.warn("[sendHeartbeat][获取本地主机信息异常]", e);
}
// 设置在线状态
heartbeatReqDTO.setOnline(true);
// 发送心跳
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(heartbeatReqDTO);
if (result != null && result.isSuccess()) {
log.debug("[sendHeartbeat][发送心跳成功:{}]", heartbeatReqDTO);
} else {
log.error("[sendHeartbeat][发送心跳失败:{}, 结果:{}]", heartbeatReqDTO, result);
}
} catch (Exception e) {
log.error("[sendHeartbeat][发送心跳异常]", e);
}
}
}

View File

@ -69,12 +69,6 @@ public class IotComponentUpstreamClient implements IotDeviceUpstreamApi {
return doPost(url, reportReqDTO);
}
@Override
public CommonResult<Boolean> heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/heartbeat-plugin-instance";
return doPost(url, heartbeatReqDTO);
}
@SuppressWarnings("unchecked")
private <T> CommonResult<Boolean> doPost(String url, T requestBody) {
try {
@ -87,4 +81,4 @@ public class IotComponentUpstreamClient implements IotDeviceUpstreamApi {
return CommonResult.error(INTERNAL_SERVER_ERROR);
}
}
}
}

View File

@ -17,8 +17,6 @@ yudao:
base-package: cn.iocoder.yudao # 主项目包路径,确保正确
iot:
component:
# 这里可以覆盖或添加 component-core 中的通用配置
instance-heartbeat-timeout: 30000 # 心跳超时时间
# 网络组件服务器专用配置
server:
@ -33,9 +31,6 @@ yudao:
# 组件服务唯一标识
server-key: yudao-module-iot-net-component-server
# 心跳频率,单位:毫秒
heartbeat-interval: 30000
# ====================================
# 针对引入的 HTTP 组件的配置
# ====================================