reactor:【IoT 物联网】阅读 thingsboard 后,增加 IotDeviceMessage 优化的 TODO

This commit is contained in:
YunaiV 2025-06-07 18:03:40 +08:00
parent a0a26c3d64
commit 40a9242691
3 changed files with 11 additions and 77 deletions

View File

@ -41,6 +41,7 @@ public class RocketMQIotMessageBus implements IotMessageBus {
@Override @Override
public void post(String topic, Object message) { public void post(String topic, Object message) {
// TODO @芋艿需要 orderly
SendResult result = rocketMQTemplate.syncSend(topic, JsonUtils.toJsonString(message)); SendResult result = rocketMQTemplate.syncSend(topic, JsonUtils.toJsonString(message));
log.info("[post][topic({}) 发送消息({}) result({})]", topic, message, result); log.info("[post][topic({}) 发送消息({}) result({})]", topic, message, result);
} }

View File

@ -32,11 +32,13 @@ public class IotDeviceMessage {
*/ */
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s"; public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
// TODO @芋艿thingsboard 对应 id全部由后端生成由于追溯是不是调整下
/** /**
* 消息编号 * 消息编号
*/ */
private String messageId; private String messageId;
// TODO @芋艿thingsboard 是使用 deviceId
/** /**
* 设备信息 * 设备信息
*/ */
@ -46,6 +48,7 @@ public class IotDeviceMessage {
*/ */
private String deviceName; private String deviceName;
// TODO @芋艿thingsboard 只定义了 type相当于 type + identifier 结合TbMsgType
/** /**
* 消息类型 * 消息类型
* *
@ -59,6 +62,8 @@ public class IotDeviceMessage {
*/ */
private String identifier; private String identifier;
// TODO @芋艿thingsboard 只有 data 字段没有 code 字段
// TODO @芋艿要不提前序列化成字符串类似 thingsboard data 字段
/** /**
* 请求参数 * 请求参数
* *

View File

@ -4,7 +4,6 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil; import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
@ -41,16 +40,6 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
+ IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic() + ":identifier" + IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic() + ":identifier"
+ IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic(); + IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic();
/**
* 事件上报方法前缀
*/
private static final String EVENT_METHOD_PREFIX = "thing.event.";
/**
* 事件上报方法后缀
*/
private static final String EVENT_METHOD_SUFFIX = ".post";
private final IotHttpUpstreamProtocol protocol; private final IotHttpUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer; private final IotDeviceMessageProducer deviceMessageProducer;
@ -64,39 +53,21 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
public void handle(RoutingContext context) { public void handle(RoutingContext context) {
String path = context.request().path(); String path = context.request().path();
// 1. 解析通用参数 // 1. 解析通用参数
Map<String, String> params = parseCommonParams(context); String productKey = context.pathParam("productKey");
String productKey = params.get("productKey"); String deviceName = context.pathParam("deviceName");
String deviceName = params.get("deviceName");
JsonObject body = context.body().asJsonObject(); JsonObject body = context.body().asJsonObject();
// 2. 根据路径模式处理不同类型的请求 // 2. 根据路径模式处理不同类型的请求
if (isPropertyPostPath(path)) { if (isPropertyPostPath(path)) {
// 处理属性上报 // 处理属性上报
handlePropertyPost(context, productKey, deviceName, body); handlePropertyPost(context, productKey, deviceName, body);
return; } else if (isEventPostPath(path)) {
}
if (isEventPostPath(path)) {
// 处理事件上报 // 处理事件上报
String identifier = context.pathParam("identifier"); String identifier = context.pathParam("identifier");
handleEventPost(context, productKey, deviceName, identifier, body); handleEventPost(context, productKey, deviceName, identifier, body);
return;
} }
} }
/**
* 解析通用参数
*
* @param routingContext 路由上下文
* @return 参数映射
*/
private Map<String, String> parseCommonParams(RoutingContext routingContext) {
Map<String, String> params = MapUtil.newHashMap();
params.put("productKey", routingContext.pathParam("productKey"));
params.put("deviceName", routingContext.pathParam("deviceName"));
return params;
}
/** /**
* 判断是否是属性上报路径 * 判断是否是属性上报路径
* *
@ -134,8 +105,8 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
// 1.2 发送消息 // 1.2 发送消息
deviceMessageProducer.sendDeviceMessage(message); deviceMessageProducer.sendDeviceMessage(message);
// 2. 返回响应 // // 2. 返回响应
sendResponse(routingContext, null); // sendResponse(routingContext, null);
} }
/** /**
@ -155,49 +126,6 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
// //
// // 事件上报 // // 事件上报
// CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO); // CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
//
// // 返回响应
// sendResponse(routingContext, requestId, method, result);
}
/**
* 发送响应
*
* @param routingContext 路由上下文
* @param result 结果
*/
private void sendResponse(RoutingContext routingContext,
CommonResult<Boolean> result) {
// // TODO @芋艿后续再优化
// IotStandardResponse response;
// if (result == null ) {
// response = IotStandardResponse.success(requestId, method, null);
// } else if (result.isSuccess()) {
// response = IotStandardResponse.success(requestId, method, result.getData());
// } else {
// response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg());
// }
// IotNetComponentCommonUtils.writeJsonResponse(routingContext, response);
}
/**
* 从路径确定方法名
*
* @param path 路径
* @param routingContext 路由上下文
* @return 方法名
*/
private String determineMethodFromPath(String path, RoutingContext routingContext) {
if (StrUtil.contains(path, "/property/")) {
return null;
}
return EVENT_METHOD_PREFIX
+ (routingContext.pathParams().containsKey("identifier")
? routingContext.pathParam("identifier")
: "unknown")
+
EVENT_METHOD_SUFFIX;
} }
// TODO @芋艿这块在看看 // TODO @芋艿这块在看看