【功能优化】IoT:重构上行请求处理逻辑,合并属性和事件上报处理,简化代码结构,删除冗余处理器

This commit is contained in:
安浩浩 2025-03-15 17:56:45 +08:00
parent 348c138749
commit 81739186c9
9 changed files with 201 additions and 264 deletions

View File

@ -64,17 +64,6 @@
<artifactId>yudao-spring-boot-starter-excel</artifactId>
</dependency>
<!-- TODO @haohao貌似不需要这个 -->
<!-- <dependency>-->
<!-- <groupId>io.vertx</groupId>-->
<!-- <artifactId>vertx-web</artifactId>-->
<!-- </dependency>-->
<!-- TODO @haohao貌似 biz 模块,不需要 MQTT -->
<!-- <dependency>-->
<!-- <groupId>org.eclipse.paho</groupId> &lt;!&ndash; MQTT &ndash;&gt;-->
<!-- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>-->
<!-- </dependency>-->
<!-- 消息队列相关 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>

View File

@ -10,7 +10,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
@ -24,7 +23,7 @@ import java.util.concurrent.TimeUnit;
* @author HUIHUI
*/
@ConditionalOnClass(name = "org.springframework.kafka.core.KafkaTemplate")
@Component
//@Component
@Slf4j
public class IotKafkaMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeKafkaMQConfig, KafkaTemplate<String, String>> {

View File

@ -7,10 +7,13 @@ import org.pf4j.spring.SpringPlugin;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
// TODO @芋艿完善注释
/**
* 负责插件的启动和停止
* EMQX 插件实现类
*
* 基于 PF4J 插件框架实现 EMQX 消息中间件的集成
* 负责插件的生命周期管理包括启动停止和应用上下文的创建
*
* @author 芋道源码
*/
@Slf4j
public class IotEmqxPlugin extends SpringPlugin {

View File

@ -27,6 +27,7 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
private static final String SYS_TOPIC_PREFIX = "/sys/";
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈
// 回复 都使用 Alink 格式方便后续扩展
// 设备服务调用 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}
// 响应Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply

View File

@ -31,6 +31,7 @@ import java.util.Map;
public class IotDeviceMqttMessageHandler {
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈
// 回复 都使用 Alink 格式方便后续扩展
// 设备上报属性 标准 JSON
// 请求 Topic/sys/${productKey}/${deviceName}/thing/event/property/post
// 响应 Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply

View File

@ -2,8 +2,7 @@ package cn.iocoder.yudao.module.iot.plugin.http.upstream;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.plugin.http.config.IotPluginHttpProperties;
import cn.iocoder.yudao.module.iot.plugin.http.upstream.router.IotDeviceEventReportVertxHandler;
import cn.iocoder.yudao.module.iot.plugin.http.upstream.router.IotDevicePropertyReportVertxHandler;
import cn.iocoder.yudao.module.iot.plugin.http.upstream.router.IotDeviceUpstreamVertxHandler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router;
@ -32,10 +31,12 @@ public class IotDeviceUpstreamServer {
// 创建 Router 实例
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create()); // 处理 Body
router.post(IotDevicePropertyReportVertxHandler.PATH)
.handler(new IotDevicePropertyReportVertxHandler(deviceUpstreamApi));
router.post(IotDeviceEventReportVertxHandler.PATH)
.handler(new IotDeviceEventReportVertxHandler(deviceUpstreamApi));
// 使用统一的 Handler 处理所有上行请求
IotDeviceUpstreamVertxHandler upstreamHandler = new IotDeviceUpstreamVertxHandler(deviceUpstreamApi);
router.post(IotDeviceUpstreamVertxHandler.PROPERTY_PATH).handler(upstreamHandler);
router.post(IotDeviceUpstreamVertxHandler.EVENT_PATH).handler(upstreamHandler);
// 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router);
}

View File

@ -1,111 +0,0 @@
package cn.iocoder.yudao.module.iot.plugin.http.upstream.router;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 设备事件上报的 Vert.x Handler
*/
@RequiredArgsConstructor
@Slf4j
public class IotDeviceEventReportVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/sys/:productKey/:deviceName/thing/event/:identifier/post";
private static final String VERSION = "1.0";
private static final String EVENT_METHOD_PREFIX = "thing.event.";
private static final String EVENT_METHOD_SUFFIX = ".post";
private final IotDeviceUpstreamApi deviceUpstreamApi;
@Override
@SuppressWarnings("unchecked")
public void handle(RoutingContext routingContext) {
// 1. 解析参数
IotDeviceEventReportReqDTO reportReqDTO;
String identifier = null;
String requestId = IdUtil.fastSimpleUUID();
try {
String productKey = routingContext.pathParam("productKey");
String deviceName = routingContext.pathParam("deviceName");
identifier = routingContext.pathParam("identifier");
JsonObject body = routingContext.body().asJsonObject();
requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId);
// 按照标准JSON格式处理事件参数
Map<String, Object> params;
// 优先使用params字段符合标准
if (body.getJsonObject("params") != null) {
params = body.getJsonObject("params").getMap();
} else {
// 兼容旧格式
params = new HashMap<>();
}
reportReqDTO = ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId)
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName))
.setIdentifier(identifier).setParams(params);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
// 使用IotStandardResponse实体类返回错误
String method = identifier != null ? EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX
: "thing.event.unknown.post";
IotStandardResponse errorResponse = IotStandardResponse.error(
requestId, method, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
try {
// 2. 设备上线
deviceUpstreamApi.updateDeviceState(
((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID())
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(reportReqDTO.getProductKey()).setDeviceName(reportReqDTO.getDeviceName()))
.setState(IotDeviceStateEnum.ONLINE.getState()));
// 3.1 事件上报
CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
// 3.2 返回结果 - 使用IotStandardResponse实体类
String method = EVENT_METHOD_PREFIX + reportReqDTO.getIdentifier() + EVENT_METHOD_SUFFIX;
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reportReqDTO.getRequestId(), method, result.getData());
} else {
response = IotStandardResponse.error(
reportReqDTO.getRequestId(), method, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 事件上报异常]", reportReqDTO, e);
// 构建错误响应 - 使用IotStandardResponse实体类
String method = EVENT_METHOD_PREFIX + reportReqDTO.getIdentifier() + EVENT_METHOD_SUFFIX;
IotStandardResponse errorResponse = IotStandardResponse.error(
reportReqDTO.getRequestId(), method,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}
}

View File

@ -1,131 +0,0 @@
package cn.iocoder.yudao.module.iot.plugin.http.upstream.router;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
// TODO @芋艿待定 005要不要简化成解析后统一处理只有一个 Handler
/**
* IoT 设备属性上报的 Vert.x Handler
*
* @author haohao
*/
@RequiredArgsConstructor
@Slf4j
public class IotDevicePropertyReportVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/sys/:productKey/:deviceName/thing/event/property/post";
private static final String VERSION = "1.0";
private static final String METHOD = "thing.event.property.post";
private final IotDeviceUpstreamApi deviceUpstreamApi;
@Override
@SuppressWarnings("unchecked")
public void handle(RoutingContext routingContext) {
// 1. 解析参数
IotDevicePropertyReportReqDTO reportReqDTO;
String requestId = IdUtil.fastSimpleUUID();
try {
String productKey = routingContext.pathParam("productKey");
String deviceName = routingContext.pathParam("deviceName");
JsonObject body = routingContext.body().asJsonObject();
requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId);
// 按照标准JSON格式处理属性数据
Map<String, Object> properties = new HashMap<>();
// 优先使用params字段符合标准
Map<String, Object> params = body.getJsonObject("params") != null ? body.getJsonObject("params").getMap()
: null;
if (params != null) {
// 将标准格式的params转换为平台需要的properties格式
for (Map.Entry<String, Object> entry : params.entrySet()) {
String key = entry.getKey();
Object valueObj = entry.getValue();
// 如果是复杂结构包含value和time
if (valueObj instanceof Map) {
Map<String, Object> valueMap = (Map<String, Object>) valueObj;
if (valueMap.containsKey("value")) {
properties.put(key, valueMap.get("value"));
} else {
properties.put(key, valueObj);
}
} else {
properties.put(key, valueObj);
}
}
} else {
// 兼容旧格式直接使用properties字段
properties = body.getJsonObject("properties") != null ? body.getJsonObject("properties").getMap()
: new HashMap<>();
}
reportReqDTO = ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId)
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName))
.setProperties(properties);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
requestId, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
// TODO @芋艿secret 校验目前的想法
// 方案一请求的时候带上 secret 参数然后进行校验减少请求的频次不过可能要看下 mqtt 能不能复用
// 方案二本地有设备信息的缓存异步刷新这样可能 mqtt 的校验 http 校验都容易适配
try {
// 2. 设备上线
deviceUpstreamApi.updateDeviceState(
((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID())
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(reportReqDTO.getProductKey()).setDeviceName(reportReqDTO.getDeviceName()))
.setState(IotDeviceStateEnum.ONLINE.getState()));
// 3.1 属性上报
CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
// 3.2 返回结果 - 使用IotStandardResponse实体类
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reportReqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reportReqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 属性上报异常]", reportReqDTO, e);
// 构建错误响应 - 使用IotStandardResponse实体类
IotStandardResponse errorResponse = IotStandardResponse.error(
reportReqDTO.getRequestId(), METHOD,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}
}

View File

@ -0,0 +1,185 @@
package cn.iocoder.yudao.module.iot.plugin.http.upstream.router;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 设备上行统一处理的 Vert.x Handler
* <p>
* 统一处理设备属性上报和事件上报的请求
*
* @author haohao
*/
@RequiredArgsConstructor
@Slf4j
public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
// 属性上报路径
public static final String PROPERTY_PATH = "/sys/:productKey/:deviceName/thing/event/property/post";
// 事件上报路径
public static final String EVENT_PATH = "/sys/:productKey/:deviceName/thing/event/:identifier/post";
private static final String PROPERTY_METHOD = "thing.event.property.post";
private static final String EVENT_METHOD_PREFIX = "thing.event.";
private static final String EVENT_METHOD_SUFFIX = ".post";
private final IotDeviceUpstreamApi deviceUpstreamApi;
@Override
public void handle(RoutingContext routingContext) {
String path = routingContext.request().path();
String requestId = IdUtil.fastSimpleUUID();
try {
// 1. 解析通用参数
String productKey = routingContext.pathParam("productKey");
String deviceName = routingContext.pathParam("deviceName");
JsonObject body = routingContext.body().asJsonObject();
requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId);
// 2. 根据路径模式处理不同类型的请求
CommonResult<Boolean> result;
String method;
if (path.matches(".*/thing/event/property/post")) {
// 处理属性上报
IotDevicePropertyReportReqDTO reportReqDTO = parsePropertyReportRequest(productKey, deviceName, requestId, body);
// 设备上线
updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
// 属性上报
result = deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
method = PROPERTY_METHOD;
} else if (path.matches(".*/thing/event/.+/post")) {
// 处理事件上报
String identifier = routingContext.pathParam("identifier");
IotDeviceEventReportReqDTO reportReqDTO = parseEventReportRequest(productKey, deviceName, identifier, requestId, body);
// 设备上线
updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
// 事件上报
result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
method = EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX;
} else {
// 不支持的请求路径
IotStandardResponse errorResponse = IotStandardResponse.error(requestId, "unknown", BAD_REQUEST.getCode(), "不支持的请求路径");
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
// 3. 返回标准响应
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(requestId, method, result.getData());
} else {
response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][处理上行请求异常] path={}", path, e);
// 构建错误响应
String method = path.contains("/property/") ? PROPERTY_METHOD : EVENT_METHOD_PREFIX + (routingContext.pathParams().containsKey("identifier") ? routingContext.pathParam("identifier") : "unknown") + EVENT_METHOD_SUFFIX;
IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}
/**
* 更新设备状态
*
* @param productKey 产品Key
* @param deviceName 设备名称
*/
private void updateDeviceState(String productKey, String deviceName) {
deviceUpstreamApi.updateDeviceState(((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setState(IotDeviceStateEnum.ONLINE.getState()));
}
/**
* 解析属性上报请求
*
* @param productKey 产品Key
* @param deviceName 设备名称
* @param requestId 请求ID
* @param body 请求体
* @return 属性上报请求DTO
*/
@SuppressWarnings("unchecked")
private IotDevicePropertyReportReqDTO parsePropertyReportRequest(String productKey, String deviceName, String requestId, JsonObject body) {
// 按照标准JSON格式处理属性数据
Map<String, Object> properties = new HashMap<>();
// 优先使用params字段符合标准
Map<String, Object> params = body.getJsonObject("params") != null ? body.getJsonObject("params").getMap() : null;
if (params != null) {
// 将标准格式的params转换为平台需要的properties格式
for (Map.Entry<String, Object> entry : params.entrySet()) {
String key = entry.getKey();
Object valueObj = entry.getValue();
// 如果是复杂结构包含value和time
if (valueObj instanceof Map) {
Map<String, Object> valueMap = (Map<String, Object>) valueObj;
properties.put(key, valueMap.getOrDefault("value", valueObj));
} else {
properties.put(key, valueObj);
}
}
}
return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setProperties(properties);
}
/**
* 解析事件上报请求
*
* @param productKey 产品Key
* @param deviceName 设备名称
* @param identifier 事件标识符
* @param requestId 请求ID
* @param body 请求体
* @return 事件上报请求DTO
*/
@SuppressWarnings("unchecked")
private IotDeviceEventReportReqDTO parseEventReportRequest(String productKey, String deviceName, String identifier, String requestId, JsonObject body) {
// 按照标准JSON格式处理事件参数
Map<String, Object> params;
// 优先使用params字段符合标准
if (body.getJsonObject("params") != null) {
params = body.getJsonObject("params").getMap();
} else {
// 兼容旧格式
params = new HashMap<>();
}
return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setIdentifier(identifier).setParams(params);
}
}