From 81739186c971fdc508cf5cd115e5be49a26f38dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com> Date: Sat, 15 Mar 2025 17:56:45 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E3=80=91IoT=EF=BC=9A=E9=87=8D=E6=9E=84=E4=B8=8A=E8=A1=8C?= =?UTF-8?q?=E8=AF=B7=E6=B1=82=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E5=90=88=E5=B9=B6=E5=B1=9E=E6=80=A7=E5=92=8C=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5=E5=A4=84=E7=90=86=EF=BC=8C=E7=AE=80=E5=8C=96?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84=EF=BC=8C=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E5=86=97=E4=BD=99=E5=A4=84=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-module-iot/yudao-module-iot-biz/pom.xml | 11 -- .../IotKafkaMQDataBridgeExecute.java | 3 +- .../iot/plugin/emqx/config/IotEmqxPlugin.java | 9 +- .../IotDeviceDownstreamHandlerImpl.java | 1 + .../router/IotDeviceMqttMessageHandler.java | 1 + .../upstream/IotDeviceUpstreamServer.java | 13 +- .../IotDeviceEventReportVertxHandler.java | 111 ----------- .../IotDevicePropertyReportVertxHandler.java | 131 ------------- .../router/IotDeviceUpstreamVertxHandler.java | 185 ++++++++++++++++++ 9 files changed, 201 insertions(+), 264 deletions(-) delete mode 100644 yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceEventReportVertxHandler.java delete mode 100644 yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDevicePropertyReportVertxHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceUpstreamVertxHandler.java diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml index 1b897c5d7f..d398aa7d81 100644 --- a/yudao-module-iot/yudao-module-iot-biz/pom.xml +++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml @@ -64,17 +64,6 @@ yudao-spring-boot-starter-excel - - - - - - - - - - - org.apache.rocketmq diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index 3b7f99bf42..08dfec3338 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -10,7 +10,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; import java.time.Duration; import java.time.LocalDateTime; @@ -24,7 +23,7 @@ import java.util.concurrent.TimeUnit; * @author HUIHUI */ @ConditionalOnClass(name = "org.springframework.kafka.core.KafkaTemplate") -@Component +//@Component @Slf4j public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute> { diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotEmqxPlugin.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotEmqxPlugin.java index af2e568628..74a49c4f19 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotEmqxPlugin.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotEmqxPlugin.java @@ -7,10 +7,13 @@ import org.pf4j.spring.SpringPlugin; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; -// TODO @芋艿:完善注释 - /** - * 负责插件的启动和停止 + * EMQX 插件实现类 + * + * 基于 PF4J 插件框架,实现 EMQX 消息中间件的集成 + * 负责插件的生命周期管理,包括启动、停止和应用上下文的创建 + * + * @author 芋道源码 */ @Slf4j public class IotEmqxPlugin extends SpringPlugin { diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java index aed677c49e..977f0869c7 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java @@ -27,6 +27,7 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle private static final String SYS_TOPIC_PREFIX = "/sys/"; // TODO @haohao:讨论,感觉 mqtt 和 http,可以做个相对统一的格式哈。 + // 回复 都使用 Alink 格式,方便后续扩展。 // 设备服务调用 标准 JSON // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier} // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java index 4af6877bfd..b92868582c 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java @@ -31,6 +31,7 @@ import java.util.Map; public class IotDeviceMqttMessageHandler { // TODO @haohao:讨论,感觉 mqtt 和 http,可以做个相对统一的格式哈。 + // 回复 都使用 Alink 格式,方便后续扩展。 // 设备上报属性 标准 JSON // 请求 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post // 响应 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/IotDeviceUpstreamServer.java index 42da951a24..67129a4d1c 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/IotDeviceUpstreamServer.java @@ -2,8 +2,7 @@ package cn.iocoder.yudao.module.iot.plugin.http.upstream; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.plugin.http.config.IotPluginHttpProperties; -import cn.iocoder.yudao.module.iot.plugin.http.upstream.router.IotDeviceEventReportVertxHandler; -import cn.iocoder.yudao.module.iot.plugin.http.upstream.router.IotDevicePropertyReportVertxHandler; +import cn.iocoder.yudao.module.iot.plugin.http.upstream.router.IotDeviceUpstreamVertxHandler; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; import io.vertx.ext.web.Router; @@ -32,10 +31,12 @@ public class IotDeviceUpstreamServer { // 创建 Router 实例 Router router = Router.router(vertx); router.route().handler(BodyHandler.create()); // 处理 Body - router.post(IotDevicePropertyReportVertxHandler.PATH) - .handler(new IotDevicePropertyReportVertxHandler(deviceUpstreamApi)); - router.post(IotDeviceEventReportVertxHandler.PATH) - .handler(new IotDeviceEventReportVertxHandler(deviceUpstreamApi)); + + // 使用统一的 Handler 处理所有上行请求 + IotDeviceUpstreamVertxHandler upstreamHandler = new IotDeviceUpstreamVertxHandler(deviceUpstreamApi); + router.post(IotDeviceUpstreamVertxHandler.PROPERTY_PATH).handler(upstreamHandler); + router.post(IotDeviceUpstreamVertxHandler.EVENT_PATH).handler(upstreamHandler); + // 创建 HttpServer 实例 this.server = vertx.createHttpServer().requestHandler(router); } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceEventReportVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceEventReportVertxHandler.java deleted file mode 100644 index e4d1a73814..0000000000 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceEventReportVertxHandler.java +++ /dev/null @@ -1,111 +0,0 @@ -package cn.iocoder.yudao.module.iot.plugin.http.upstream.router; - -import cn.hutool.core.util.IdUtil; -import cn.hutool.core.util.ObjUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; -import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse; -import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils; -import io.vertx.core.Handler; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.RoutingContext; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; - -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST; -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; - -/** - * IoT 设备事件上报的 Vert.x Handler - */ -@RequiredArgsConstructor -@Slf4j -public class IotDeviceEventReportVertxHandler implements Handler { - - public static final String PATH = "/sys/:productKey/:deviceName/thing/event/:identifier/post"; - private static final String VERSION = "1.0"; - private static final String EVENT_METHOD_PREFIX = "thing.event."; - private static final String EVENT_METHOD_SUFFIX = ".post"; - - private final IotDeviceUpstreamApi deviceUpstreamApi; - - @Override - @SuppressWarnings("unchecked") - public void handle(RoutingContext routingContext) { - // 1. 解析参数 - IotDeviceEventReportReqDTO reportReqDTO; - String identifier = null; - String requestId = IdUtil.fastSimpleUUID(); - try { - String productKey = routingContext.pathParam("productKey"); - String deviceName = routingContext.pathParam("deviceName"); - identifier = routingContext.pathParam("identifier"); - JsonObject body = routingContext.body().asJsonObject(); - requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId); - - // 按照标准JSON格式处理事件参数 - Map params; - - // 优先使用params字段,符合标准 - if (body.getJsonObject("params") != null) { - params = body.getJsonObject("params").getMap(); - } else { - // 兼容旧格式 - params = new HashMap<>(); - } - - reportReqDTO = ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId) - .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()) - .setProductKey(productKey).setDeviceName(deviceName)) - .setIdentifier(identifier).setParams(params); - } catch (Exception e) { - log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e); - // 使用IotStandardResponse实体类返回错误 - String method = identifier != null ? EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX - : "thing.event.unknown.post"; - IotStandardResponse errorResponse = IotStandardResponse.error( - requestId, method, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg()); - IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse); - return; - } - - try { - // 2. 设备上线 - deviceUpstreamApi.updateDeviceState( - ((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()) - .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()) - .setProductKey(reportReqDTO.getProductKey()).setDeviceName(reportReqDTO.getDeviceName())) - .setState(IotDeviceStateEnum.ONLINE.getState())); - - // 3.1 事件上报 - CommonResult result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO); - - // 3.2 返回结果 - 使用IotStandardResponse实体类 - String method = EVENT_METHOD_PREFIX + reportReqDTO.getIdentifier() + EVENT_METHOD_SUFFIX; - IotStandardResponse response; - if (result.isSuccess()) { - response = IotStandardResponse.success(reportReqDTO.getRequestId(), method, result.getData()); - } else { - response = IotStandardResponse.error( - reportReqDTO.getRequestId(), method, result.getCode(), result.getMsg()); - } - IotPluginCommonUtils.writeJsonResponse(routingContext, response); - } catch (Exception e) { - log.error("[handle][请求参数({}) 事件上报异常]", reportReqDTO, e); - - // 构建错误响应 - 使用IotStandardResponse实体类 - String method = EVENT_METHOD_PREFIX + reportReqDTO.getIdentifier() + EVENT_METHOD_SUFFIX; - IotStandardResponse errorResponse = IotStandardResponse.error( - reportReqDTO.getRequestId(), method, - INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg()); - IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse); - } - } -} diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDevicePropertyReportVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDevicePropertyReportVertxHandler.java deleted file mode 100644 index be3e0017ad..0000000000 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDevicePropertyReportVertxHandler.java +++ /dev/null @@ -1,131 +0,0 @@ -package cn.iocoder.yudao.module.iot.plugin.http.upstream.router; - -import cn.hutool.core.util.IdUtil; -import cn.hutool.core.util.ObjUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; -import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse; -import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils; -import io.vertx.core.Handler; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.RoutingContext; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; - -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST; -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; - -// TODO @芋艿:【待定 005】要不要简化成,解析后,统一处理?只有一个 Handler!!! -/** - * IoT 设备属性上报的 Vert.x Handler - * - * @author haohao - */ -@RequiredArgsConstructor -@Slf4j -public class IotDevicePropertyReportVertxHandler implements Handler { - - public static final String PATH = "/sys/:productKey/:deviceName/thing/event/property/post"; - private static final String VERSION = "1.0"; - private static final String METHOD = "thing.event.property.post"; - - private final IotDeviceUpstreamApi deviceUpstreamApi; - - @Override - @SuppressWarnings("unchecked") - public void handle(RoutingContext routingContext) { - // 1. 解析参数 - IotDevicePropertyReportReqDTO reportReqDTO; - String requestId = IdUtil.fastSimpleUUID(); - try { - String productKey = routingContext.pathParam("productKey"); - String deviceName = routingContext.pathParam("deviceName"); - JsonObject body = routingContext.body().asJsonObject(); - requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId); - - // 按照标准JSON格式处理属性数据 - Map properties = new HashMap<>(); - - // 优先使用params字段,符合标准 - Map params = body.getJsonObject("params") != null ? body.getJsonObject("params").getMap() - : null; - - if (params != null) { - // 将标准格式的params转换为平台需要的properties格式 - for (Map.Entry entry : params.entrySet()) { - String key = entry.getKey(); - Object valueObj = entry.getValue(); - - // 如果是复杂结构(包含value和time) - if (valueObj instanceof Map) { - Map valueMap = (Map) valueObj; - if (valueMap.containsKey("value")) { - properties.put(key, valueMap.get("value")); - } else { - properties.put(key, valueObj); - } - } else { - properties.put(key, valueObj); - } - } - } else { - // 兼容旧格式,直接使用properties字段 - properties = body.getJsonObject("properties") != null ? body.getJsonObject("properties").getMap() - : new HashMap<>(); - } - - reportReqDTO = ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId) - .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()) - .setProductKey(productKey).setDeviceName(deviceName)) - .setProperties(properties); - } catch (Exception e) { - log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e); - // 使用IotStandardResponse实体类返回错误 - IotStandardResponse errorResponse = IotStandardResponse.error( - requestId, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg()); - IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse); - return; - } - - // TODO @芋艿:secret 校验。目前的想法: - // 方案一:请求的时候,带上 secret 参数,然后进行校验,减少请求的频次。不过可能要看下 mqtt 能不能复用! - // 方案二:本地有设备信息的缓存,异步刷新。这样可能 mqtt 的校验,和 http 校验都容易适配。 - - try { - // 2. 设备上线 - deviceUpstreamApi.updateDeviceState( - ((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()) - .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()) - .setProductKey(reportReqDTO.getProductKey()).setDeviceName(reportReqDTO.getDeviceName())) - .setState(IotDeviceStateEnum.ONLINE.getState())); - - // 3.1 属性上报 - CommonResult result = deviceUpstreamApi.reportDeviceProperty(reportReqDTO); - - // 3.2 返回结果 - 使用IotStandardResponse实体类 - IotStandardResponse response; - if (result.isSuccess()) { - response = IotStandardResponse.success(reportReqDTO.getRequestId(), METHOD, result.getData()); - } else { - response = IotStandardResponse.error( - reportReqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg()); - } - IotPluginCommonUtils.writeJsonResponse(routingContext, response); - } catch (Exception e) { - log.error("[handle][请求参数({}) 属性上报异常]", reportReqDTO, e); - - // 构建错误响应 - 使用IotStandardResponse实体类 - IotStandardResponse errorResponse = IotStandardResponse.error( - reportReqDTO.getRequestId(), METHOD, - INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg()); - IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse); - } - } -} diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceUpstreamVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceUpstreamVertxHandler.java new file mode 100644 index 0000000000..ce250f41e1 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/upstream/router/IotDeviceUpstreamVertxHandler.java @@ -0,0 +1,185 @@ +package cn.iocoder.yudao.module.iot.plugin.http.upstream.router; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.ObjUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO; +import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse; +import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST; +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; + +/** + * IoT 设备上行统一处理的 Vert.x Handler + *

+ * 统一处理设备属性上报和事件上报的请求 + * + * @author haohao + */ +@RequiredArgsConstructor +@Slf4j +public class IotDeviceUpstreamVertxHandler implements Handler { + + // 属性上报路径 + public static final String PROPERTY_PATH = "/sys/:productKey/:deviceName/thing/event/property/post"; + // 事件上报路径 + public static final String EVENT_PATH = "/sys/:productKey/:deviceName/thing/event/:identifier/post"; + + private static final String PROPERTY_METHOD = "thing.event.property.post"; + private static final String EVENT_METHOD_PREFIX = "thing.event."; + private static final String EVENT_METHOD_SUFFIX = ".post"; + + private final IotDeviceUpstreamApi deviceUpstreamApi; + + @Override + public void handle(RoutingContext routingContext) { + String path = routingContext.request().path(); + String requestId = IdUtil.fastSimpleUUID(); + + try { + // 1. 解析通用参数 + String productKey = routingContext.pathParam("productKey"); + String deviceName = routingContext.pathParam("deviceName"); + JsonObject body = routingContext.body().asJsonObject(); + requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId); + + // 2. 根据路径模式处理不同类型的请求 + CommonResult result; + String method; + + if (path.matches(".*/thing/event/property/post")) { + // 处理属性上报 + IotDevicePropertyReportReqDTO reportReqDTO = parsePropertyReportRequest(productKey, deviceName, requestId, body); + + // 设备上线 + updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName()); + + // 属性上报 + result = deviceUpstreamApi.reportDeviceProperty(reportReqDTO); + method = PROPERTY_METHOD; + } else if (path.matches(".*/thing/event/.+/post")) { + // 处理事件上报 + String identifier = routingContext.pathParam("identifier"); + IotDeviceEventReportReqDTO reportReqDTO = parseEventReportRequest(productKey, deviceName, identifier, requestId, body); + + // 设备上线 + updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName()); + + // 事件上报 + result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO); + method = EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX; + } else { + // 不支持的请求路径 + IotStandardResponse errorResponse = IotStandardResponse.error(requestId, "unknown", BAD_REQUEST.getCode(), "不支持的请求路径"); + IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse); + return; + } + + // 3. 返回标准响应 + IotStandardResponse response; + if (result.isSuccess()) { + response = IotStandardResponse.success(requestId, method, result.getData()); + } else { + response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg()); + } + IotPluginCommonUtils.writeJsonResponse(routingContext, response); + + } catch (Exception e) { + log.error("[handle][处理上行请求异常] path={}", path, e); + + // 构建错误响应 + String method = path.contains("/property/") ? PROPERTY_METHOD : EVENT_METHOD_PREFIX + (routingContext.pathParams().containsKey("identifier") ? routingContext.pathParam("identifier") : "unknown") + EVENT_METHOD_SUFFIX; + + IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg()); + IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse); + } + } + + /** + * 更新设备状态 + * + * @param productKey 产品Key + * @param deviceName 设备名称 + */ + private void updateDeviceState(String productKey, String deviceName) { + deviceUpstreamApi.updateDeviceState(((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setState(IotDeviceStateEnum.ONLINE.getState())); + } + + /** + * 解析属性上报请求 + * + * @param productKey 产品Key + * @param deviceName 设备名称 + * @param requestId 请求ID + * @param body 请求体 + * @return 属性上报请求DTO + */ + @SuppressWarnings("unchecked") + private IotDevicePropertyReportReqDTO parsePropertyReportRequest(String productKey, String deviceName, String requestId, JsonObject body) { + + // 按照标准JSON格式处理属性数据 + Map properties = new HashMap<>(); + + // 优先使用params字段,符合标准 + Map params = body.getJsonObject("params") != null ? body.getJsonObject("params").getMap() : null; + + if (params != null) { + // 将标准格式的params转换为平台需要的properties格式 + for (Map.Entry entry : params.entrySet()) { + String key = entry.getKey(); + Object valueObj = entry.getValue(); + + // 如果是复杂结构(包含value和time) + if (valueObj instanceof Map) { + Map valueMap = (Map) valueObj; + properties.put(key, valueMap.getOrDefault("value", valueObj)); + } else { + properties.put(key, valueObj); + } + } + } + + return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setProperties(properties); + } + + /** + * 解析事件上报请求 + * + * @param productKey 产品Key + * @param deviceName 设备名称 + * @param identifier 事件标识符 + * @param requestId 请求ID + * @param body 请求体 + * @return 事件上报请求DTO + */ + @SuppressWarnings("unchecked") + private IotDeviceEventReportReqDTO parseEventReportRequest(String productKey, String deviceName, String identifier, String requestId, JsonObject body) { + + // 按照标准JSON格式处理事件参数 + Map params; + + // 优先使用params字段,符合标准 + if (body.getJsonObject("params") != null) { + params = body.getJsonObject("params").getMap(); + } else { + // 兼容旧格式 + params = new HashMap<>(); + } + + return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setIdentifier(identifier).setParams(params); + } +} \ No newline at end of file