From 40a92426916bd09a95cb906d7a03ae07e035f64f Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sat, 7 Jun 2025 18:03:40 +0800 Subject: [PATCH] =?UTF-8?q?reactor=EF=BC=9A=E3=80=90IoT=20=E7=89=A9?= =?UTF-8?q?=E8=81=94=E7=BD=91=E3=80=91=E9=98=85=E8=AF=BB=20thingsboard=20?= =?UTF-8?q?=E5=90=8E=EF=BC=8C=E5=A2=9E=E5=8A=A0=20IotDeviceMessage=20?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=9A=84=20TODO?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/rocketmq/RocketMQIotMessageBus.java | 1 + .../iot/core/mq/message/IotDeviceMessage.java | 5 ++ .../http/router/IotHttpUpstreamHandler.java | 82 ++----------------- 3 files changed, 11 insertions(+), 77 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBus.java index 68d2ce9102..5d6d72af1c 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBus.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBus.java @@ -41,6 +41,7 @@ public class RocketMQIotMessageBus implements IotMessageBus { @Override public void post(String topic, Object message) { + // TODO @芋艿:需要 orderly! SendResult result = rocketMQTemplate.syncSend(topic, JsonUtils.toJsonString(message)); log.info("[post][topic({}) 发送消息({}) result({})]", topic, message, result); } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java index 22bd03ba36..ef3550bc72 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java @@ -32,11 +32,13 @@ public class IotDeviceMessage { */ public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s"; + // TODO @芋艿:thingsboard 对应 id,全部由后端生成,由于追溯;是不是调整下? /** * 消息编号 */ private String messageId; + // TODO @芋艿:thingsboard 是使用 deviceId /** * 设备信息 */ @@ -46,6 +48,7 @@ public class IotDeviceMessage { */ private String deviceName; + // TODO @芋艿:thingsboard 只定义了 type;相当于 type + identifier 结合!TbMsgType /** * 消息类型 * @@ -59,6 +62,8 @@ public class IotDeviceMessage { */ private String identifier; + // TODO @芋艿:thingsboard 只有 data 字段,没有 code 字段; + // TODO @芋艿:要不提前序列化成字符串???类似 thingsboard 的 data 字段 /** * 请求参数 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java index aff8c0d3af..6becd773bb 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java @@ -4,7 +4,6 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; @@ -41,16 +40,6 @@ public class IotHttpUpstreamHandler implements Handler { + IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic() + ":identifier" + IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic(); - /** - * 事件上报方法前缀 - */ - private static final String EVENT_METHOD_PREFIX = "thing.event."; - - /** - * 事件上报方法后缀 - */ - private static final String EVENT_METHOD_SUFFIX = ".post"; - private final IotHttpUpstreamProtocol protocol; private final IotDeviceMessageProducer deviceMessageProducer; @@ -64,39 +53,21 @@ public class IotHttpUpstreamHandler implements Handler { public void handle(RoutingContext context) { String path = context.request().path(); // 1. 解析通用参数 - Map params = parseCommonParams(context); - String productKey = params.get("productKey"); - String deviceName = params.get("deviceName"); + String productKey = context.pathParam("productKey"); + String deviceName = context.pathParam("deviceName"); JsonObject body = context.body().asJsonObject(); // 2. 根据路径模式处理不同类型的请求 if (isPropertyPostPath(path)) { // 处理属性上报 handlePropertyPost(context, productKey, deviceName, body); - return; - } - - if (isEventPostPath(path)) { + } else if (isEventPostPath(path)) { // 处理事件上报 String identifier = context.pathParam("identifier"); handleEventPost(context, productKey, deviceName, identifier, body); - return; } } - /** - * 解析通用参数 - * - * @param routingContext 路由上下文 - * @return 参数映射 - */ - private Map parseCommonParams(RoutingContext routingContext) { - Map params = MapUtil.newHashMap(); - params.put("productKey", routingContext.pathParam("productKey")); - params.put("deviceName", routingContext.pathParam("deviceName")); - return params; - } - /** * 判断是否是属性上报路径 * @@ -134,8 +105,8 @@ public class IotHttpUpstreamHandler implements Handler { // 1.2 发送消息 deviceMessageProducer.sendDeviceMessage(message); - // 2. 返回响应 - sendResponse(routingContext, null); +// // 2. 返回响应 +// sendResponse(routingContext, null); } /** @@ -155,49 +126,6 @@ public class IotHttpUpstreamHandler implements Handler { // // // 事件上报 // CommonResult result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO); -// -// // 返回响应 -// sendResponse(routingContext, requestId, method, result); - } - - /** - * 发送响应 - * - * @param routingContext 路由上下文 - * @param result 结果 - */ - private void sendResponse(RoutingContext routingContext, - CommonResult result) { -// // TODO @芋艿:后续再优化 -// IotStandardResponse response; -// if (result == null ) { -// response = IotStandardResponse.success(requestId, method, null); -// } else if (result.isSuccess()) { -// response = IotStandardResponse.success(requestId, method, result.getData()); -// } else { -// response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg()); -// } -// IotNetComponentCommonUtils.writeJsonResponse(routingContext, response); - } - - /** - * 从路径确定方法名 - * - * @param path 路径 - * @param routingContext 路由上下文 - * @return 方法名 - */ - private String determineMethodFromPath(String path, RoutingContext routingContext) { - if (StrUtil.contains(path, "/property/")) { - return null; - } - - return EVENT_METHOD_PREFIX - + (routingContext.pathParams().containsKey("identifier") - ? routingContext.pathParam("identifier") - : "unknown") - + - EVENT_METHOD_SUFFIX; } // TODO @芋艿:这块在看看