【功能新增】IoT: 新增网络组件服务器,支持设备上行和下行处理,添加健康检查接口,配置心跳任务,更新相关依赖和配置文件。

This commit is contained in:
安浩浩 2025-04-09 23:08:24 +08:00
parent 2954445d34
commit 44b835bd4a
13 changed files with 959 additions and 29 deletions

View File

@ -123,16 +123,16 @@
<!-- </dependency>-->
<!-- IoT 网络组件:接收来自设备的上行数据 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-net-component-http</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-net-component-emqx</artifactId>
<version>${revision}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>cn.iocoder.boot</groupId>-->
<!-- <artifactId>yudao-module-iot-net-component-http</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>cn.iocoder.boot</groupId>-->
<!-- <artifactId>yudao-module-iot-net-component-emqx</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- </dependency>-->
<!-- 脚本相关 -->
<dependency>

View File

@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot</artifactId>
<groupId>cn.iocoder.boot</groupId>
@ -21,6 +20,7 @@
<module>yudao-module-iot-net-component-core</module>
<module>yudao-module-iot-net-component-http</module>
<module>yudao-module-iot-net-component-emqx</module>
<module>yudao-module-iot-net-component-server</module>
</modules>
</project>

View File

@ -31,26 +31,28 @@ public class IotNetComponentCommonProperties {
/**
* 网络组件消息转发配置
*/
private ForwardMessage forwardMessage = new ForwardMessage();
// private ForwardMessage forwardMessage = new ForwardMessage();
/**
* 消息转发配置
*/
@Data
public static class ForwardMessage {
/*
* @Data
* public static class ForwardMessage {
*
* /**
* 是否转发所有设备消息到 EMQX
* <p>
* 默认为 true 开启
*/
// private boolean forwardAllDeviceMessageToEmqx = true;
/**
* 是否转发所有设备消息到 EMQX
* <p>
* 默认为 true 开启
*/
private boolean forwardAllDeviceMessageToEmqx = true;
/**
* 是否转发所有设备消息到 HTTP
* <p>
* 默认为 false 关闭
*/
private boolean forwardAllDeviceMessageToHttp = false;
}
/**
* 是否转发所有设备消息到 HTTP
* <p>
* 默认为 false 关闭
*/
// private boolean forwardAllDeviceMessageToHttp = false;
// }
}

View File

@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-net-components</artifactId>
<version>${revision}</version>
</parent>
<artifactId>yudao-module-iot-net-component-server</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
IoT 网络组件的独立运行服务,用于聚合和启动多个网络组件实例。
</description>
<dependencies>
<!-- Spring Boot 核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Web 相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Net Component 核心 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-net-component-core</artifactId>
<version>${revision}</version>
</dependency>
<!-- Net Component HTTP -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-net-component-http</artifactId>
<version>${revision}</version>
</dependency>
<!-- Net Component EMQX -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-net-component-emqx</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>
<!-- 设置构建的 jar 包名 -->
<finalName>${project.artifactId}</finalName>
<plugins>
<!-- 打包 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version> <!-- 需要确认父 POM 中有定义 -->
<executions>
<execution>
<goals>
<goal>repackage</goal> <!-- 将引入的 jar 打入其中 -->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,18 @@
package cn.iocoder.yudao.module.iot.net.component.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* IoT 网络组件聚合启动服务
*
* @author haohao
*/
@SpringBootApplication(scanBasePackages = {"${yudao.info.base-package}.module.iot.net.component"})
public class NetComponentServerApplication {
public static void main(String[] args) {
SpringApplication.run(NetComponentServerApplication.class, args);
}
}

View File

@ -0,0 +1,118 @@
package cn.iocoder.yudao.module.iot.net.component.server.config;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamServer;
import cn.iocoder.yudao.module.iot.net.component.server.heartbeat.IotComponentHeartbeatJob;
import cn.iocoder.yudao.module.iot.net.component.server.upstream.IotComponentUpstreamClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.client.RestTemplate;
/**
* IoT 网络组件服务器配置类
*
* @author haohao
*/
@Configuration
@EnableConfigurationProperties(IotNetComponentServerProperties.class)
@EnableScheduling
public class IotNetComponentServerConfiguration {
/**
* 配置 RestTemplate
*
* @param properties 配置
* @return RestTemplate
*/
@Bean
public RestTemplate restTemplate(IotNetComponentServerProperties properties) {
return new RestTemplateBuilder()
.connectTimeout(properties.getUpstreamConnectTimeout())
.readTimeout(properties.getUpstreamReadTimeout())
.build();
}
/**
* 配置设备上行客户端
*
* @param properties 配置
* @param restTemplate RestTemplate
* @return 上行客户端
*/
@Bean
@Primary
public IotDeviceUpstreamApi deviceUpstreamApi(IotNetComponentServerProperties properties,
RestTemplate restTemplate) {
return new IotComponentUpstreamClient(properties, restTemplate);
}
/**
* 配置设备下行处理器
*
* @return 下行处理器
*/
@Bean
@Primary
public IotDeviceDownstreamHandler deviceDownstreamHandler() {
return new IotComponentDownstreamHandlerImpl();
}
/**
* 配置下行服务器
*
* @param properties 配置
* @param downstreamHandler 下行处理器
* @return 下行服务器
*/
@Bean(initMethod = "start", destroyMethod = "stop")
public IotComponentDownstreamServer deviceDownstreamServer(IotNetComponentServerProperties properties,
@org.springframework.beans.factory.annotation.Qualifier("deviceDownstreamHandler") IotDeviceDownstreamHandler downstreamHandler) {
return new IotComponentDownstreamServer(properties, downstreamHandler);
}
/**
* 配置心跳任务
*
* @param deviceUpstreamApi 上行接口
* @param downstreamServer 下行服务器
* @param properties 配置
* @return 心跳任务
*/
@Bean(initMethod = "init", destroyMethod = "stop")
public IotComponentHeartbeatJob heartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi,
IotComponentDownstreamServer downstreamServer,
IotNetComponentServerProperties properties) {
return new IotComponentHeartbeatJob(deviceUpstreamApi, downstreamServer, properties);
}
/**
* 配置默认的设备上行客户端避免在独立运行模式下的循环依赖问题
*
* @return 设备上行客户端
*/
@Bean
@ConditionalOnMissingBean(name = "serverDeviceUpstreamClient")
public Object serverDeviceUpstreamClient() {
// 返回一个空对象避免找不到类的问题
return new Object();
}
/**
* 配置默认的组件实例注册客户端
*
* @return 插件实例注册客户端
*/
@Bean
@ConditionalOnMissingBean(name = "serverPluginInstanceRegistryClient")
public Object serverPluginInstanceRegistryClient() {
// 返回一个空对象避免找不到类的问题
return new Object();
}
}

View File

@ -0,0 +1,56 @@
package cn.iocoder.yudao.module.iot.net.component.server.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
import java.time.Duration;
/**
* IoT 网络组件服务配置属性
*
* @author haohao
*/
@ConfigurationProperties(prefix = "yudao.iot.component.server")
@Validated
@Data
public class IotNetComponentServerProperties {
/**
* 上行 URL用于向主应用程序上报数据
* <p>
* 默认http://127.0.0.1:48080
*/
private String upstreamUrl = "http://127.0.0.1:48080";
/**
* 上行连接超时时间
*/
private Duration upstreamConnectTimeout = Duration.ofSeconds(30);
/**
* 上行读取超时时间
*/
private Duration upstreamReadTimeout = Duration.ofSeconds(30);
/**
* 下行服务端口用于接收主应用程序的请求
* <p>
* 默认18888
*/
private Integer downstreamPort = 18888;
/**
* 组件服务器唯一标识
* <p>
* 默认yudao-module-iot-net-component-server
*/
private String serverKey = "yudao-module-iot-net-component-server";
/**
* 心跳发送频率单位毫秒
* <p>
* 默认30
*/
private Long heartbeatInterval = 30000L;
}

View File

@ -0,0 +1,32 @@
package cn.iocoder.yudao.module.iot.net.component.server.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* 健康检查接口
*
* @author haohao
*/
@RestController
@RequestMapping("/health")
public class HealthController {
/**
* 健康检查接口
*
* @return 返回服务状态信息
*/
@GetMapping("/status")
public Map<String, Object> status() {
Map<String, Object> result = new HashMap<>();
result.put("status", "UP");
result.put("message", "IoT 网络组件服务运行正常");
result.put("timestamp", System.currentTimeMillis());
return result;
}
}

View File

@ -0,0 +1,65 @@
package cn.iocoder.yudao.module.iot.net.component.server.downstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.SUCCESS;
/**
* 网络组件下行处理器实现
* <p>
* 处理来自主程序的设备控制指令
*
* @author haohao
*/
@Slf4j
public class IotComponentDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
@Override
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
log.info("[invokeDeviceService][收到服务调用请求:{}]", invokeReqDTO);
// 在这里处理服务调用可以根据设备类型转发到对应的处理器
// MQTT 设备HTTP 设备等的具体实现
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
log.info("[getDeviceProperty][收到属性获取请求:{}]", getReqDTO);
// 在这里处理属性获取请求
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
log.info("[setDeviceProperty][收到属性设置请求:{}]", setReqDTO);
// 在这里处理属性设置请求
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
log.info("[setDeviceConfig][收到配置设置请求:{}]", setReqDTO);
// 在这里处理配置设置请求
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
log.info("[upgradeDeviceOta][收到OTA升级请求{}]", upgradeReqDTO);
// 在这里处理OTA升级请求
// 这里仅作为示例实际应根据接入的组件进行转发
return CommonResult.success(true);
}
}

View File

@ -0,0 +1,310 @@
package cn.iocoder.yudao.module.iot.net.component.server.downstream;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
/**
* 组件下行服务器接收来自主程序的控制指令
*
* @author haohao
*/
@Slf4j
public class IotComponentDownstreamServer {
public static final String SERVICE_INVOKE_PATH = "/sys/:productKey/:deviceName/thing/service/:identifier";
public static final String PROPERTY_SET_PATH = "/sys/:productKey/:deviceName/thing/service/property/set";
public static final String PROPERTY_GET_PATH = "/sys/:productKey/:deviceName/thing/service/property/get";
public static final String CONFIG_SET_PATH = "/sys/:productKey/:deviceName/thing/service/config/set";
public static final String OTA_UPGRADE_PATH = "/sys/:productKey/:deviceName/thing/service/ota/upgrade";
private final Vertx vertx;
private final HttpServer server;
private final IotNetComponentServerProperties properties;
private final IotDeviceDownstreamHandler downstreamHandler;
public IotComponentDownstreamServer(IotNetComponentServerProperties properties,
IotDeviceDownstreamHandler downstreamHandler) {
this.properties = properties;
this.downstreamHandler = downstreamHandler;
// 创建 Vertx 实例
this.vertx = Vertx.vertx();
// 创建 Router 实例
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create()); // 处理 Body
// 服务调用路由
router.post(SERVICE_INVOKE_PATH).handler(this::handleServiceInvoke);
// 属性设置路由
router.post(PROPERTY_SET_PATH).handler(this::handlePropertySet);
// 属性获取路由
router.post(PROPERTY_GET_PATH).handler(this::handlePropertyGet);
// 配置设置路由
router.post(CONFIG_SET_PATH).handler(this::handleConfigSet);
// OTA 升级路由
router.post(OTA_UPGRADE_PATH).handler(this::handleOtaUpgrade);
// 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router);
}
/**
* 启动服务器
*/
public void start() {
log.info("[start][开始启动下行服务器]");
server.listen(properties.getDownstreamPort())
.toCompletionStage()
.toCompletableFuture()
.join();
log.info("[start][下行服务器启动完成,端口({})]", server.actualPort());
}
/**
* 停止服务器
*/
public void stop() {
log.info("[stop][开始关闭下行服务器]");
try {
// 关闭 HTTP 服务器
if (server != null) {
server.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
// 关闭 Vertx 实例
if (vertx != null) {
vertx.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
log.info("[stop][下行服务器关闭完成]");
} catch (Exception e) {
log.error("[stop][下行服务器关闭异常]", e);
throw new RuntimeException(e);
}
}
/**
* 获取服务器端口
*
* @return 端口号
*/
public int getPort() {
return server.actualPort();
}
/**
* 处理服务调用请求
*/
private void handleServiceInvoke(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
String identifier = ctx.pathParam("identifier");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object params = body.getMap().get("params");
// 创建请求对象
IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
reqDTO.setIdentifier(identifier);
reqDTO.setParams((Map<String, Object>) params);
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.invokeDeviceService(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handleServiceInvoke][处理服务调用请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理服务调用请求失败:" + e.getMessage())));
}
}
/**
* 处理属性设置请求
*/
private void handlePropertySet(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object properties = body.getMap().get("properties");
// 创建请求对象
IotDevicePropertySetReqDTO reqDTO = new IotDevicePropertySetReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
reqDTO.setProperties((Map<String, Object>) properties);
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.setDeviceProperty(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handlePropertySet][处理属性设置请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理属性设置请求失败:" + e.getMessage())));
}
}
/**
* 处理属性获取请求
*/
private void handlePropertyGet(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object identifiers = body.getMap().get("identifiers");
// 创建请求对象
IotDevicePropertyGetReqDTO reqDTO = new IotDevicePropertyGetReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
reqDTO.setIdentifiers((List<String>) identifiers);
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.getDeviceProperty(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handlePropertyGet][处理属性获取请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理属性获取请求失败:" + e.getMessage())));
}
}
/**
* 处理配置设置请求
*/
private void handleConfigSet(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object config = body.getMap().get("config");
// 创建请求对象
IotDeviceConfigSetReqDTO reqDTO = new IotDeviceConfigSetReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
reqDTO.setConfig((Map<String, Object>) config);
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.setDeviceConfig(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handleConfigSet][处理配置设置请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理配置设置请求失败:" + e.getMessage())));
}
}
/**
* 处理 OTA 升级请求
*/
private void handleOtaUpgrade(RoutingContext ctx) {
try {
// 解析路径参数
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
// 解析请求体
JsonObject body = ctx.body().asJsonObject();
String requestId = body.getString("requestId", IdUtil.fastSimpleUUID());
Object data = body.getMap().get("data");
// 创建请求对象
IotDeviceOtaUpgradeReqDTO reqDTO = new IotDeviceOtaUpgradeReqDTO();
reqDTO.setRequestId(requestId);
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
// 数据采用 IotDeviceOtaUpgradeReqDTO.build 方法转换
if (data instanceof Map) {
IotDeviceOtaUpgradeReqDTO builtDTO = IotDeviceOtaUpgradeReqDTO.build((Map<?, ?>) data);
reqDTO.setFirmwareId(builtDTO.getFirmwareId());
reqDTO.setVersion(builtDTO.getVersion());
reqDTO.setSignMethod(builtDTO.getSignMethod());
reqDTO.setFileSign(builtDTO.getFileSign());
reqDTO.setFileSize(builtDTO.getFileSize());
reqDTO.setFileUrl(builtDTO.getFileUrl());
reqDTO.setInformation(builtDTO.getInformation());
}
// 调用处理器
CommonResult<Boolean> result = downstreamHandler.upgradeDeviceOta(reqDTO);
// 响应结果
ctx.response()
.putHeader("Content-Type", "application/json")
.end(Json.encode(result));
} catch (Exception e) {
log.error("[handleOtaUpgrade][处理OTA升级请求失败]", e);
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end(Json.encode(CommonResult.error(500, "处理OTA升级请求失败" + e.getMessage())));
}
}
}

View File

@ -0,0 +1,100 @@
package cn.iocoder.yudao.module.iot.net.component.server.heartbeat;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties;
import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import java.time.LocalDateTime;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.lang.ProcessHandle;
/**
* IoT 组件心跳任务
* <p>
* 定期向主程序发送心跳报告组件服务状态
*
* @author haohao
*/
@Slf4j
public class IotComponentHeartbeatJob {
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final IotComponentDownstreamServer downstreamServer;
private final IotNetComponentServerProperties properties;
private ScheduledExecutorService executorService;
public IotComponentHeartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi,
IotComponentDownstreamServer downstreamServer,
IotNetComponentServerProperties properties) {
this.deviceUpstreamApi = deviceUpstreamApi;
this.downstreamServer = downstreamServer;
this.properties = properties;
}
/**
* 初始化心跳任务
*/
public void init() {
log.info("[init][开始初始化心跳任务]");
// 创建一个单线程的调度线程池
executorService = new ScheduledThreadPoolExecutor(1);
// 延迟 5 秒后开始执行避免服务刚启动就发送心跳
executorService.scheduleAtFixedRate(this::sendHeartbeat,
5000, properties.getHeartbeatInterval(), TimeUnit.MILLISECONDS);
log.info("[init][心跳任务初始化完成]");
}
/**
* 停止心跳任务
*/
public void stop() {
log.info("[stop][开始停止心跳任务]");
if (executorService != null) {
executorService.shutdown();
executorService = null;
}
log.info("[stop][心跳任务已停止]");
}
/**
* 发送心跳
*/
private void sendHeartbeat() {
try {
// 创建心跳请求
IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO = new IotPluginInstanceHeartbeatReqDTO();
// 设置插件标识
heartbeatReqDTO.setPluginKey(properties.getServerKey());
// 设置进程ID
heartbeatReqDTO.setProcessId(String.valueOf(ProcessHandle.current().pid()));
// 设置IP和端口
try {
String hostIp = SystemUtil.getHostInfo().getAddress();
heartbeatReqDTO.setHostIp(hostIp);
heartbeatReqDTO.setDownstreamPort(downstreamServer.getPort());
} catch (Exception e) {
log.warn("[sendHeartbeat][获取本地主机信息异常]", e);
}
// 设置在线状态
heartbeatReqDTO.setOnline(true);
// 发送心跳
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(heartbeatReqDTO);
if (result != null && result.isSuccess()) {
log.debug("[sendHeartbeat][发送心跳成功:{}]", heartbeatReqDTO);
} else {
log.error("[sendHeartbeat][发送心跳失败:{}, 结果:{}]", heartbeatReqDTO, result);
}
} catch (Exception e) {
log.error("[sendHeartbeat][发送心跳异常]", e);
}
}
}

View File

@ -0,0 +1,90 @@
package cn.iocoder.yudao.module.iot.net.component.server.upstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.*;
import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.client.RestTemplate;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* 组件上行客户端用于向主程序上报设备数据
* <p>
* 通过 HTTP 调用远程的 IotDeviceUpstreamApi 接口
*
* @author haohao
*/
@RequiredArgsConstructor
@Slf4j
public class IotComponentUpstreamClient implements IotDeviceUpstreamApi {
public static final String URL_PREFIX = "/rpc-api/iot/device/upstream";
private final IotNetComponentServerProperties properties;
private final RestTemplate restTemplate;
@Override
public CommonResult<Boolean> updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/update-state";
return doPost(url, updateReqDTO);
}
@Override
public CommonResult<Boolean> reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/report-event";
return doPost(url, reportReqDTO);
}
@Override
public CommonResult<Boolean> registerDevice(IotDeviceRegisterReqDTO registerReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/register-device";
return doPost(url, registerReqDTO);
}
@Override
public CommonResult<Boolean> registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/register-sub-device";
return doPost(url, registerReqDTO);
}
@Override
public CommonResult<Boolean> addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/add-device-topology";
return doPost(url, addReqDTO);
}
@Override
public CommonResult<Boolean> authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/authenticate-emqx-connection";
return doPost(url, authReqDTO);
}
@Override
public CommonResult<Boolean> reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/report-property";
return doPost(url, reportReqDTO);
}
@Override
public CommonResult<Boolean> heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/heartbeat-plugin-instance";
return doPost(url, heartbeatReqDTO);
}
@SuppressWarnings("unchecked")
private <T> CommonResult<Boolean> doPost(String url, T requestBody) {
try {
CommonResult<Boolean> result = restTemplate.postForObject(url, requestBody,
(Class<CommonResult<Boolean>>) (Class<?>) CommonResult.class);
log.info("[doPost][url({}) requestBody({}) result({})]", url, requestBody, result);
return result;
} catch (Exception e) {
log.error("[doPost][url({}) requestBody({}) 发生异常]", url, requestBody, e);
return CommonResult.error(INTERNAL_SERVER_ERROR);
}
}
}

View File

@ -0,0 +1,64 @@
# 服务器配置
server:
port: 18080 # 修改端口避免与主应用的8080端口冲突
# Spring 配置
spring:
application:
name: iot-component-server
# 允许循环引用
main:
allow-circular-references: true
allow-bean-definition-overriding: true
# Yudao 配置
yudao:
info:
base-package: cn.iocoder.yudao # 主项目包路径,确保正确
iot:
component:
# 这里可以覆盖或添加 component-core 中的通用配置
instance-heartbeat-timeout: 30000 # 心跳超时时间
# 网络组件服务器专用配置
server:
# 上行通信配置,用于向主程序上报数据
upstream-url: http://127.0.0.1:48080 # 主程序 API 地址
upstream-connect-timeout: 30s # 连接超时
upstream-read-timeout: 30s # 读取超时
# 下行通信配置,用于接收主程序的控制指令
downstream-port: 18888 # 下行服务器端口
# 组件服务唯一标识
server-key: yudao-module-iot-net-component-server
# 心跳频率,单位:毫秒
heartbeat-interval: 30000
# ====================================
# 针对引入的 HTTP 组件的配置
# ====================================
http:
enabled: true # 启用HTTP组件
server-port: 8092 # HTTP组件服务端口
# ====================================
# 针对引入的 EMQX 组件的配置
# ====================================
emqx:
enabled: true # 启用EMQX组件
mqtt-host: 127.0.0.1 # MQTT服务器主机地址
mqtt-port: 1883 # MQTT服务器端口
mqtt-username: yudao # MQTT服务器用户名
mqtt-password: 123456 # MQTT服务器密码
mqtt-ssl: false # 是否启用SSL
mqtt-topics: # 订阅的主题列表
- "/sys/#"
auth-port: 8101 # 认证端口
# 日志配置
logging:
level:
cn.iocoder.yudao: INFO
root: INFO