diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml index fe8e34ec38..8701d8f26e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/pom.xml +++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml @@ -123,16 +123,16 @@ - - cn.iocoder.boot - yudao-module-iot-net-component-http - ${revision} - - - cn.iocoder.boot - yudao-module-iot-net-component-emqx - ${revision} - + + + + + + + + + + diff --git a/yudao-module-iot/yudao-module-iot-net-components/pom.xml b/yudao-module-iot/yudao-module-iot-net-components/pom.xml index cd8f39a966..6147006f50 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/pom.xml +++ b/yudao-module-iot/yudao-module-iot-net-components/pom.xml @@ -1,7 +1,6 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> yudao-module-iot cn.iocoder.boot @@ -21,6 +20,7 @@ yudao-module-iot-net-component-core yudao-module-iot-net-component-http yudao-module-iot-net-component-emqx + yudao-module-iot-net-component-server \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonProperties.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonProperties.java index 6f1df82a1b..a4fb09e609 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonProperties.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/src/main/java/cn/iocoder/yudao/module/iot/net/component/core/config/IotNetComponentCommonProperties.java @@ -31,26 +31,28 @@ public class IotNetComponentCommonProperties { /** * 网络组件消息转发配置 */ - private ForwardMessage forwardMessage = new ForwardMessage(); + // private ForwardMessage forwardMessage = new ForwardMessage(); /** * 消息转发配置 */ - @Data - public static class ForwardMessage { + /* + * @Data + * public static class ForwardMessage { + * + * /** + * 是否转发所有设备消息到 EMQX + *

+ * 默认为 true 开启 + */ + // private boolean forwardAllDeviceMessageToEmqx = true; - /** - * 是否转发所有设备消息到 EMQX - *

- * 默认为 true 开启 - */ - private boolean forwardAllDeviceMessageToEmqx = true; - - /** - * 是否转发所有设备消息到 HTTP - *

- * 默认为 false 关闭 - */ - private boolean forwardAllDeviceMessageToHttp = false; - } + /** + * 是否转发所有设备消息到 HTTP + *

+ * 默认为 false 关闭 + */ + // private boolean forwardAllDeviceMessageToHttp = false; + // } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/pom.xml b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/pom.xml new file mode 100644 index 0000000000..4c2a612205 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/pom.xml @@ -0,0 +1,75 @@ + + + 4.0.0 + + cn.iocoder.boot + yudao-module-iot-net-components + ${revision} + + + yudao-module-iot-net-component-server + jar + + ${project.artifactId} + + IoT 网络组件的独立运行服务,用于聚合和启动多个网络组件实例。 + + + + + + org.springframework.boot + spring-boot-starter + + + + + org.springframework.boot + spring-boot-starter-web + + + + + cn.iocoder.boot + yudao-module-iot-net-component-core + ${revision} + + + + + cn.iocoder.boot + yudao-module-iot-net-component-http + ${revision} + + + + + cn.iocoder.boot + yudao-module-iot-net-component-emqx + ${revision} + + + + + + + ${project.artifactId} + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring.boot.version} + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/NetComponentServerApplication.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/NetComponentServerApplication.java new file mode 100644 index 0000000000..0d40edb725 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/NetComponentServerApplication.java @@ -0,0 +1,18 @@ +package cn.iocoder.yudao.module.iot.net.component.server; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * IoT 网络组件聚合启动服务 + * + * @author haohao + */ +@SpringBootApplication(scanBasePackages = {"${yudao.info.base-package}.module.iot.net.component"}) +public class NetComponentServerApplication { + + public static void main(String[] args) { + SpringApplication.run(NetComponentServerApplication.class, args); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerConfiguration.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerConfiguration.java new file mode 100644 index 0000000000..513e8693ef --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerConfiguration.java @@ -0,0 +1,118 @@ +package cn.iocoder.yudao.module.iot.net.component.server.config; + +import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; +import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; +import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamHandlerImpl; +import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamServer; +import cn.iocoder.yudao.module.iot.net.component.server.heartbeat.IotComponentHeartbeatJob; +import cn.iocoder.yudao.module.iot.net.component.server.upstream.IotComponentUpstreamClient; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.web.client.RestTemplate; + +/** + * IoT 网络组件服务器配置类 + * + * @author haohao + */ +@Configuration +@EnableConfigurationProperties(IotNetComponentServerProperties.class) +@EnableScheduling +public class IotNetComponentServerConfiguration { + + /** + * 配置 RestTemplate + * + * @param properties 配置 + * @return RestTemplate + */ + @Bean + public RestTemplate restTemplate(IotNetComponentServerProperties properties) { + return new RestTemplateBuilder() + .connectTimeout(properties.getUpstreamConnectTimeout()) + .readTimeout(properties.getUpstreamReadTimeout()) + .build(); + } + + /** + * 配置设备上行客户端 + * + * @param properties 配置 + * @param restTemplate RestTemplate + * @return 上行客户端 + */ + @Bean + @Primary + public IotDeviceUpstreamApi deviceUpstreamApi(IotNetComponentServerProperties properties, + RestTemplate restTemplate) { + return new IotComponentUpstreamClient(properties, restTemplate); + } + + /** + * 配置设备下行处理器 + * + * @return 下行处理器 + */ + @Bean + @Primary + public IotDeviceDownstreamHandler deviceDownstreamHandler() { + return new IotComponentDownstreamHandlerImpl(); + } + + /** + * 配置下行服务器 + * + * @param properties 配置 + * @param downstreamHandler 下行处理器 + * @return 下行服务器 + */ + @Bean(initMethod = "start", destroyMethod = "stop") + public IotComponentDownstreamServer deviceDownstreamServer(IotNetComponentServerProperties properties, + @org.springframework.beans.factory.annotation.Qualifier("deviceDownstreamHandler") IotDeviceDownstreamHandler downstreamHandler) { + return new IotComponentDownstreamServer(properties, downstreamHandler); + } + + /** + * 配置心跳任务 + * + * @param deviceUpstreamApi 上行接口 + * @param downstreamServer 下行服务器 + * @param properties 配置 + * @return 心跳任务 + */ + @Bean(initMethod = "init", destroyMethod = "stop") + public IotComponentHeartbeatJob heartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi, + IotComponentDownstreamServer downstreamServer, + IotNetComponentServerProperties properties) { + return new IotComponentHeartbeatJob(deviceUpstreamApi, downstreamServer, properties); + } + + /** + * 配置默认的设备上行客户端,避免在独立运行模式下的循环依赖问题 + * + * @return 设备上行客户端 + */ + @Bean + @ConditionalOnMissingBean(name = "serverDeviceUpstreamClient") + public Object serverDeviceUpstreamClient() { + // 返回一个空对象,避免找不到类的问题 + return new Object(); + } + + /** + * 配置默认的组件实例注册客户端 + * + * @return 插件实例注册客户端 + */ + @Bean + @ConditionalOnMissingBean(name = "serverPluginInstanceRegistryClient") + public Object serverPluginInstanceRegistryClient() { + // 返回一个空对象,避免找不到类的问题 + return new Object(); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerProperties.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerProperties.java new file mode 100644 index 0000000000..7b641debda --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/config/IotNetComponentServerProperties.java @@ -0,0 +1,56 @@ +package cn.iocoder.yudao.module.iot.net.component.server.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import java.time.Duration; + +/** + * IoT 网络组件服务配置属性 + * + * @author haohao + */ +@ConfigurationProperties(prefix = "yudao.iot.component.server") +@Validated +@Data +public class IotNetComponentServerProperties { + + /** + * 上行 URL,用于向主应用程序上报数据 + *

+ * 默认:http://127.0.0.1:48080 + */ + private String upstreamUrl = "http://127.0.0.1:48080"; + + /** + * 上行连接超时时间 + */ + private Duration upstreamConnectTimeout = Duration.ofSeconds(30); + + /** + * 上行读取超时时间 + */ + private Duration upstreamReadTimeout = Duration.ofSeconds(30); + + /** + * 下行服务端口,用于接收主应用程序的请求 + *

+ * 默认:18888 + */ + private Integer downstreamPort = 18888; + + /** + * 组件服务器唯一标识 + *

+ * 默认:yudao-module-iot-net-component-server + */ + private String serverKey = "yudao-module-iot-net-component-server"; + + /** + * 心跳发送频率,单位:毫秒 + *

+ * 默认:30 秒 + */ + private Long heartbeatInterval = 30000L; +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/controller/HealthController.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/controller/HealthController.java new file mode 100644 index 0000000000..e30da459ea --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/controller/HealthController.java @@ -0,0 +1,32 @@ +package cn.iocoder.yudao.module.iot.net.component.server.controller; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; + +/** + * 健康检查接口 + * + * @author haohao + */ +@RestController +@RequestMapping("/health") +public class HealthController { + + /** + * 健康检查接口 + * + * @return 返回服务状态信息 + */ + @GetMapping("/status") + public Map status() { + Map result = new HashMap<>(); + result.put("status", "UP"); + result.put("message", "IoT 网络组件服务运行正常"); + result.put("timestamp", System.currentTimeMillis()); + return result; + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamHandlerImpl.java new file mode 100644 index 0000000000..c6509ada10 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamHandlerImpl.java @@ -0,0 +1,65 @@ +package cn.iocoder.yudao.module.iot.net.component.server.downstream; + +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; +import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; +import lombok.extern.slf4j.Slf4j; + +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.SUCCESS; + +/** + * 网络组件下行处理器实现 + *

+ * 处理来自主程序的设备控制指令 + * + * @author haohao + */ +@Slf4j +public class IotComponentDownstreamHandlerImpl implements IotDeviceDownstreamHandler { + + @Override + public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) { + log.info("[invokeDeviceService][收到服务调用请求:{}]", invokeReqDTO); + // 在这里处理服务调用,可以根据设备类型转发到对应的处理器 + // 如 MQTT 设备、HTTP 设备等的具体实现 + + // 这里仅作为示例,实际应根据接入的组件进行转发 + return CommonResult.success(true); + } + + @Override + public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) { + log.info("[getDeviceProperty][收到属性获取请求:{}]", getReqDTO); + // 在这里处理属性获取请求 + + // 这里仅作为示例,实际应根据接入的组件进行转发 + return CommonResult.success(true); + } + + @Override + public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) { + log.info("[setDeviceProperty][收到属性设置请求:{}]", setReqDTO); + // 在这里处理属性设置请求 + + // 这里仅作为示例,实际应根据接入的组件进行转发 + return CommonResult.success(true); + } + + @Override + public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) { + log.info("[setDeviceConfig][收到配置设置请求:{}]", setReqDTO); + // 在这里处理配置设置请求 + + // 这里仅作为示例,实际应根据接入的组件进行转发 + return CommonResult.success(true); + } + + @Override + public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) { + log.info("[upgradeDeviceOta][收到OTA升级请求:{}]", upgradeReqDTO); + // 在这里处理OTA升级请求 + + // 这里仅作为示例,实际应根据接入的组件进行转发 + return CommonResult.success(true); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamServer.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamServer.java new file mode 100644 index 0000000000..388a50bdfb --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/downstream/IotComponentDownstreamServer.java @@ -0,0 +1,310 @@ +package cn.iocoder.yudao.module.iot.net.component.server.downstream; + +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; +import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; +import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.BodyHandler; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; + +/** + * 组件下行服务器,接收来自主程序的控制指令 + * + * @author haohao + */ +@Slf4j +public class IotComponentDownstreamServer { + + public static final String SERVICE_INVOKE_PATH = "/sys/:productKey/:deviceName/thing/service/:identifier"; + public static final String PROPERTY_SET_PATH = "/sys/:productKey/:deviceName/thing/service/property/set"; + public static final String PROPERTY_GET_PATH = "/sys/:productKey/:deviceName/thing/service/property/get"; + public static final String CONFIG_SET_PATH = "/sys/:productKey/:deviceName/thing/service/config/set"; + public static final String OTA_UPGRADE_PATH = "/sys/:productKey/:deviceName/thing/service/ota/upgrade"; + + private final Vertx vertx; + private final HttpServer server; + private final IotNetComponentServerProperties properties; + private final IotDeviceDownstreamHandler downstreamHandler; + + public IotComponentDownstreamServer(IotNetComponentServerProperties properties, + IotDeviceDownstreamHandler downstreamHandler) { + this.properties = properties; + this.downstreamHandler = downstreamHandler; + // 创建 Vertx 实例 + this.vertx = Vertx.vertx(); + // 创建 Router 实例 + Router router = Router.router(vertx); + router.route().handler(BodyHandler.create()); // 处理 Body + + // 服务调用路由 + router.post(SERVICE_INVOKE_PATH).handler(this::handleServiceInvoke); + // 属性设置路由 + router.post(PROPERTY_SET_PATH).handler(this::handlePropertySet); + // 属性获取路由 + router.post(PROPERTY_GET_PATH).handler(this::handlePropertyGet); + // 配置设置路由 + router.post(CONFIG_SET_PATH).handler(this::handleConfigSet); + // OTA 升级路由 + router.post(OTA_UPGRADE_PATH).handler(this::handleOtaUpgrade); + + // 创建 HttpServer 实例 + this.server = vertx.createHttpServer().requestHandler(router); + } + + /** + * 启动服务器 + */ + public void start() { + log.info("[start][开始启动下行服务器]"); + server.listen(properties.getDownstreamPort()) + .toCompletionStage() + .toCompletableFuture() + .join(); + log.info("[start][下行服务器启动完成,端口({})]", server.actualPort()); + } + + /** + * 停止服务器 + */ + public void stop() { + log.info("[stop][开始关闭下行服务器]"); + try { + // 关闭 HTTP 服务器 + if (server != null) { + server.close() + .toCompletionStage() + .toCompletableFuture() + .join(); + } + + // 关闭 Vertx 实例 + if (vertx != null) { + vertx.close() + .toCompletionStage() + .toCompletableFuture() + .join(); + } + log.info("[stop][下行服务器关闭完成]"); + } catch (Exception e) { + log.error("[stop][下行服务器关闭异常]", e); + throw new RuntimeException(e); + } + } + + /** + * 获取服务器端口 + * + * @return 端口号 + */ + public int getPort() { + return server.actualPort(); + } + + /** + * 处理服务调用请求 + */ + private void handleServiceInvoke(RoutingContext ctx) { + try { + // 解析路径参数 + String productKey = ctx.pathParam("productKey"); + String deviceName = ctx.pathParam("deviceName"); + String identifier = ctx.pathParam("identifier"); + + // 解析请求体 + JsonObject body = ctx.body().asJsonObject(); + String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); + Object params = body.getMap().get("params"); + + // 创建请求对象 + IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO(); + reqDTO.setRequestId(requestId); + reqDTO.setProductKey(productKey); + reqDTO.setDeviceName(deviceName); + reqDTO.setIdentifier(identifier); + reqDTO.setParams((Map) params); + + // 调用处理器 + CommonResult result = downstreamHandler.invokeDeviceService(reqDTO); + + // 响应结果 + ctx.response() + .putHeader("Content-Type", "application/json") + .end(Json.encode(result)); + } catch (Exception e) { + log.error("[handleServiceInvoke][处理服务调用请求失败]", e); + ctx.response() + .setStatusCode(500) + .putHeader("Content-Type", "application/json") + .end(Json.encode(CommonResult.error(500, "处理服务调用请求失败:" + e.getMessage()))); + } + } + + /** + * 处理属性设置请求 + */ + private void handlePropertySet(RoutingContext ctx) { + try { + // 解析路径参数 + String productKey = ctx.pathParam("productKey"); + String deviceName = ctx.pathParam("deviceName"); + + // 解析请求体 + JsonObject body = ctx.body().asJsonObject(); + String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); + Object properties = body.getMap().get("properties"); + + // 创建请求对象 + IotDevicePropertySetReqDTO reqDTO = new IotDevicePropertySetReqDTO(); + reqDTO.setRequestId(requestId); + reqDTO.setProductKey(productKey); + reqDTO.setDeviceName(deviceName); + reqDTO.setProperties((Map) properties); + + // 调用处理器 + CommonResult result = downstreamHandler.setDeviceProperty(reqDTO); + + // 响应结果 + ctx.response() + .putHeader("Content-Type", "application/json") + .end(Json.encode(result)); + } catch (Exception e) { + log.error("[handlePropertySet][处理属性设置请求失败]", e); + ctx.response() + .setStatusCode(500) + .putHeader("Content-Type", "application/json") + .end(Json.encode(CommonResult.error(500, "处理属性设置请求失败:" + e.getMessage()))); + } + } + + /** + * 处理属性获取请求 + */ + private void handlePropertyGet(RoutingContext ctx) { + try { + // 解析路径参数 + String productKey = ctx.pathParam("productKey"); + String deviceName = ctx.pathParam("deviceName"); + + // 解析请求体 + JsonObject body = ctx.body().asJsonObject(); + String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); + Object identifiers = body.getMap().get("identifiers"); + + // 创建请求对象 + IotDevicePropertyGetReqDTO reqDTO = new IotDevicePropertyGetReqDTO(); + reqDTO.setRequestId(requestId); + reqDTO.setProductKey(productKey); + reqDTO.setDeviceName(deviceName); + reqDTO.setIdentifiers((List) identifiers); + + // 调用处理器 + CommonResult result = downstreamHandler.getDeviceProperty(reqDTO); + + // 响应结果 + ctx.response() + .putHeader("Content-Type", "application/json") + .end(Json.encode(result)); + } catch (Exception e) { + log.error("[handlePropertyGet][处理属性获取请求失败]", e); + ctx.response() + .setStatusCode(500) + .putHeader("Content-Type", "application/json") + .end(Json.encode(CommonResult.error(500, "处理属性获取请求失败:" + e.getMessage()))); + } + } + + /** + * 处理配置设置请求 + */ + private void handleConfigSet(RoutingContext ctx) { + try { + // 解析路径参数 + String productKey = ctx.pathParam("productKey"); + String deviceName = ctx.pathParam("deviceName"); + + // 解析请求体 + JsonObject body = ctx.body().asJsonObject(); + String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); + Object config = body.getMap().get("config"); + + // 创建请求对象 + IotDeviceConfigSetReqDTO reqDTO = new IotDeviceConfigSetReqDTO(); + reqDTO.setRequestId(requestId); + reqDTO.setProductKey(productKey); + reqDTO.setDeviceName(deviceName); + reqDTO.setConfig((Map) config); + + // 调用处理器 + CommonResult result = downstreamHandler.setDeviceConfig(reqDTO); + + // 响应结果 + ctx.response() + .putHeader("Content-Type", "application/json") + .end(Json.encode(result)); + } catch (Exception e) { + log.error("[handleConfigSet][处理配置设置请求失败]", e); + ctx.response() + .setStatusCode(500) + .putHeader("Content-Type", "application/json") + .end(Json.encode(CommonResult.error(500, "处理配置设置请求失败:" + e.getMessage()))); + } + } + + /** + * 处理 OTA 升级请求 + */ + private void handleOtaUpgrade(RoutingContext ctx) { + try { + // 解析路径参数 + String productKey = ctx.pathParam("productKey"); + String deviceName = ctx.pathParam("deviceName"); + + // 解析请求体 + JsonObject body = ctx.body().asJsonObject(); + String requestId = body.getString("requestId", IdUtil.fastSimpleUUID()); + Object data = body.getMap().get("data"); + + // 创建请求对象 + IotDeviceOtaUpgradeReqDTO reqDTO = new IotDeviceOtaUpgradeReqDTO(); + reqDTO.setRequestId(requestId); + reqDTO.setProductKey(productKey); + reqDTO.setDeviceName(deviceName); + + // 数据采用 IotDeviceOtaUpgradeReqDTO.build 方法转换 + if (data instanceof Map) { + IotDeviceOtaUpgradeReqDTO builtDTO = IotDeviceOtaUpgradeReqDTO.build((Map) data); + reqDTO.setFirmwareId(builtDTO.getFirmwareId()); + reqDTO.setVersion(builtDTO.getVersion()); + reqDTO.setSignMethod(builtDTO.getSignMethod()); + reqDTO.setFileSign(builtDTO.getFileSign()); + reqDTO.setFileSize(builtDTO.getFileSize()); + reqDTO.setFileUrl(builtDTO.getFileUrl()); + reqDTO.setInformation(builtDTO.getInformation()); + } + + // 调用处理器 + CommonResult result = downstreamHandler.upgradeDeviceOta(reqDTO); + + // 响应结果 + ctx.response() + .putHeader("Content-Type", "application/json") + .end(Json.encode(result)); + } catch (Exception e) { + log.error("[handleOtaUpgrade][处理OTA升级请求失败]", e); + ctx.response() + .setStatusCode(500) + .putHeader("Content-Type", "application/json") + .end(Json.encode(CommonResult.error(500, "处理OTA升级请求失败:" + e.getMessage()))); + } + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/heartbeat/IotComponentHeartbeatJob.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/heartbeat/IotComponentHeartbeatJob.java new file mode 100644 index 0000000000..a76d72b43c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/heartbeat/IotComponentHeartbeatJob.java @@ -0,0 +1,100 @@ +package cn.iocoder.yudao.module.iot.net.component.server.heartbeat; + +import cn.hutool.system.SystemUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO; +import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties; +import cn.iocoder.yudao.module.iot.net.component.server.downstream.IotComponentDownstreamServer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; + +import java.time.LocalDateTime; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.lang.ProcessHandle; + +/** + * IoT 组件心跳任务 + *

+ * 定期向主程序发送心跳,报告组件服务状态 + * + * @author haohao + */ +@Slf4j +public class IotComponentHeartbeatJob { + + private final IotDeviceUpstreamApi deviceUpstreamApi; + private final IotComponentDownstreamServer downstreamServer; + private final IotNetComponentServerProperties properties; + private ScheduledExecutorService executorService; + + public IotComponentHeartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi, + IotComponentDownstreamServer downstreamServer, + IotNetComponentServerProperties properties) { + this.deviceUpstreamApi = deviceUpstreamApi; + this.downstreamServer = downstreamServer; + this.properties = properties; + } + + /** + * 初始化心跳任务 + */ + public void init() { + log.info("[init][开始初始化心跳任务]"); + // 创建一个单线程的调度线程池 + executorService = new ScheduledThreadPoolExecutor(1); + // 延迟 5 秒后开始执行,避免服务刚启动就发送心跳 + executorService.scheduleAtFixedRate(this::sendHeartbeat, + 5000, properties.getHeartbeatInterval(), TimeUnit.MILLISECONDS); + log.info("[init][心跳任务初始化完成]"); + } + + /** + * 停止心跳任务 + */ + public void stop() { + log.info("[stop][开始停止心跳任务]"); + if (executorService != null) { + executorService.shutdown(); + executorService = null; + } + log.info("[stop][心跳任务已停止]"); + } + + /** + * 发送心跳 + */ + private void sendHeartbeat() { + try { + // 创建心跳请求 + IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO = new IotPluginInstanceHeartbeatReqDTO(); + // 设置插件标识 + heartbeatReqDTO.setPluginKey(properties.getServerKey()); + // 设置进程ID + heartbeatReqDTO.setProcessId(String.valueOf(ProcessHandle.current().pid())); + // 设置IP和端口 + try { + String hostIp = SystemUtil.getHostInfo().getAddress(); + heartbeatReqDTO.setHostIp(hostIp); + heartbeatReqDTO.setDownstreamPort(downstreamServer.getPort()); + } catch (Exception e) { + log.warn("[sendHeartbeat][获取本地主机信息异常]", e); + } + // 设置在线状态 + heartbeatReqDTO.setOnline(true); + + // 发送心跳 + CommonResult result = deviceUpstreamApi.heartbeatPluginInstance(heartbeatReqDTO); + if (result != null && result.isSuccess()) { + log.debug("[sendHeartbeat][发送心跳成功:{}]", heartbeatReqDTO); + } else { + log.error("[sendHeartbeat][发送心跳失败:{}, 结果:{}]", heartbeatReqDTO, result); + } + } catch (Exception e) { + log.error("[sendHeartbeat][发送心跳异常]", e); + } + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/upstream/IotComponentUpstreamClient.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/upstream/IotComponentUpstreamClient.java new file mode 100644 index 0000000000..f39c1d0a35 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/java/cn/iocoder/yudao/module/iot/net/component/server/upstream/IotComponentUpstreamClient.java @@ -0,0 +1,90 @@ +package cn.iocoder.yudao.module.iot.net.component.server.upstream; + +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.*; +import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.client.RestTemplate; + +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; + +/** + * 组件上行客户端,用于向主程序上报设备数据 + *

+ * 通过 HTTP 调用远程的 IotDeviceUpstreamApi 接口 + * + * @author haohao + */ +@RequiredArgsConstructor +@Slf4j +public class IotComponentUpstreamClient implements IotDeviceUpstreamApi { + + public static final String URL_PREFIX = "/rpc-api/iot/device/upstream"; + + private final IotNetComponentServerProperties properties; + + private final RestTemplate restTemplate; + + @Override + public CommonResult updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) { + String url = properties.getUpstreamUrl() + URL_PREFIX + "/update-state"; + return doPost(url, updateReqDTO); + } + + @Override + public CommonResult reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) { + String url = properties.getUpstreamUrl() + URL_PREFIX + "/report-event"; + return doPost(url, reportReqDTO); + } + + @Override + public CommonResult registerDevice(IotDeviceRegisterReqDTO registerReqDTO) { + String url = properties.getUpstreamUrl() + URL_PREFIX + "/register-device"; + return doPost(url, registerReqDTO); + } + + @Override + public CommonResult registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) { + String url = properties.getUpstreamUrl() + URL_PREFIX + "/register-sub-device"; + return doPost(url, registerReqDTO); + } + + @Override + public CommonResult addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) { + String url = properties.getUpstreamUrl() + URL_PREFIX + "/add-device-topology"; + return doPost(url, addReqDTO); + } + + @Override + public CommonResult authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) { + String url = properties.getUpstreamUrl() + URL_PREFIX + "/authenticate-emqx-connection"; + return doPost(url, authReqDTO); + } + + @Override + public CommonResult reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) { + String url = properties.getUpstreamUrl() + URL_PREFIX + "/report-property"; + return doPost(url, reportReqDTO); + } + + @Override + public CommonResult heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) { + String url = properties.getUpstreamUrl() + URL_PREFIX + "/heartbeat-plugin-instance"; + return doPost(url, heartbeatReqDTO); + } + + @SuppressWarnings("unchecked") + private CommonResult doPost(String url, T requestBody) { + try { + CommonResult result = restTemplate.postForObject(url, requestBody, + (Class>) (Class) CommonResult.class); + log.info("[doPost][url({}) requestBody({}) result({})]", url, requestBody, result); + return result; + } catch (Exception e) { + log.error("[doPost][url({}) requestBody({}) 发生异常]", url, requestBody, e); + return CommonResult.error(INTERNAL_SERVER_ERROR); + } + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml new file mode 100644 index 0000000000..ccaa7000a5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml @@ -0,0 +1,64 @@ +# 服务器配置 +server: + port: 18080 # 修改端口,避免与主应用的8080端口冲突 + +# Spring 配置 +spring: + application: + name: iot-component-server + # 允许循环引用 + main: + allow-circular-references: true + allow-bean-definition-overriding: true + +# Yudao 配置 +yudao: + info: + base-package: cn.iocoder.yudao # 主项目包路径,确保正确 + iot: + component: + # 这里可以覆盖或添加 component-core 中的通用配置 + instance-heartbeat-timeout: 30000 # 心跳超时时间 + + # 网络组件服务器专用配置 + server: + # 上行通信配置,用于向主程序上报数据 + upstream-url: http://127.0.0.1:48080 # 主程序 API 地址 + upstream-connect-timeout: 30s # 连接超时 + upstream-read-timeout: 30s # 读取超时 + + # 下行通信配置,用于接收主程序的控制指令 + downstream-port: 18888 # 下行服务器端口 + + # 组件服务唯一标识 + server-key: yudao-module-iot-net-component-server + + # 心跳频率,单位:毫秒 + heartbeat-interval: 30000 + + # ==================================== + # 针对引入的 HTTP 组件的配置 + # ==================================== + http: + enabled: true # 启用HTTP组件 + server-port: 8092 # HTTP组件服务端口 + + # ==================================== + # 针对引入的 EMQX 组件的配置 + # ==================================== + emqx: + enabled: true # 启用EMQX组件 + mqtt-host: 127.0.0.1 # MQTT服务器主机地址 + mqtt-port: 1883 # MQTT服务器端口 + mqtt-username: yudao # MQTT服务器用户名 + mqtt-password: 123456 # MQTT服务器密码 + mqtt-ssl: false # 是否启用SSL + mqtt-topics: # 订阅的主题列表 + - "/sys/#" + auth-port: 8101 # 认证端口 + +# 日志配置 +logging: + level: + cn.iocoder.yudao: INFO + root: INFO \ No newline at end of file