【功能完善】IoT:引入 IotStandardResponse 实体类,统一处理器的响应格式,优化错误处理逻辑

This commit is contained in:
安浩浩 2025-03-15 00:26:44 +08:00
parent 34453a3f70
commit 348c138749
12 changed files with 415 additions and 91 deletions

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.plugin.common.downstream.router;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceConfigSetReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
@ -25,6 +26,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
public class IotDeviceConfigSetVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/sys/:productKey/:deviceName/thing/service/config/set";
public static final String METHOD = "thing.service.config.set";
private final IotDeviceDownstreamHandler deviceDownstreamHandler;
@ -44,17 +46,32 @@ public class IotDeviceConfigSetVertxHandler implements Handler<RoutingContext> {
.setConfig(config);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(BAD_REQUEST));
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
null, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
// 2. 调用处理器
try {
CommonResult<Boolean> result = deviceDownstreamHandler.setDeviceConfig(reqDTO);
IotPluginCommonUtils.writeJson(routingContext, result);
// 使用IotStandardResponse实体类返回结果
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 配置设置异常]", reqDTO, e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(INTERNAL_SERVER_ERROR));
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.plugin.common.downstream.router;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceOtaUpgradeReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
@ -23,6 +24,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
public class IotDeviceOtaUpgradeVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/ota/:productKey/:deviceName/upgrade";
public static final String METHOD = "ota.device.upgrade";
private final IotDeviceDownstreamHandler deviceDownstreamHandler;
@ -49,17 +51,32 @@ public class IotDeviceOtaUpgradeVertxHandler implements Handler<RoutingContext>
.setInformation(information);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(BAD_REQUEST));
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
null, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
// 2. 调用处理器
try {
CommonResult<Boolean> result = deviceDownstreamHandler.upgradeDeviceOta(reqDTO);
IotPluginCommonUtils.writeJson(routingContext, result);
// 使用IotStandardResponse实体类返回结果
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) OTA 升级异常]", reqDTO, e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(INTERNAL_SERVER_ERROR));
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}
}

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.plugin.common.downstream.router;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDevicePropertyGetReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
@ -25,6 +26,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
public class IotDevicePropertyGetVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/sys/:productKey/:deviceName/thing/service/property/get";
public static final String METHOD = "thing.service.property.get";
private final IotDeviceDownstreamHandler deviceDownstreamHandler;
@ -44,17 +46,32 @@ public class IotDevicePropertyGetVertxHandler implements Handler<RoutingContext>
.setIdentifiers(identifiers);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(BAD_REQUEST));
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
null, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
// 2. 调用处理器
try {
CommonResult<Boolean> result = deviceDownstreamHandler.getDeviceProperty(reqDTO);
IotPluginCommonUtils.writeJson(routingContext, result);
// 使用IotStandardResponse实体类返回结果
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 属性获取异常]", reqDTO, e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(INTERNAL_SERVER_ERROR));
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.plugin.common.downstream.router;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDevicePropertySetReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
@ -25,6 +26,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
public class IotDevicePropertySetVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/sys/:productKey/:deviceName/thing/service/property/set";
public static final String METHOD = "thing.service.property.set";
private final IotDeviceDownstreamHandler deviceDownstreamHandler;
@ -44,17 +46,32 @@ public class IotDevicePropertySetVertxHandler implements Handler<RoutingContext>
.setProperties(properties);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(BAD_REQUEST));
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
null, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
// 2. 调用处理器
try {
CommonResult<Boolean> result = deviceDownstreamHandler.setDeviceProperty(reqDTO);
IotPluginCommonUtils.writeJson(routingContext, result);
// 使用IotStandardResponse实体类返回结果
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 属性设置异常]", reqDTO, e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(INTERNAL_SERVER_ERROR));
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.plugin.common.downstream.router;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceServiceInvokeReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
@ -25,6 +26,8 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
public class IotDeviceServiceInvokeVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/sys/:productKey/:deviceName/thing/service/:identifier";
public static final String METHOD_PREFIX = "thing.service.";
public static final String METHOD_SUFFIX = "";
private final IotDeviceDownstreamHandler deviceDownstreamHandler;
@ -45,17 +48,35 @@ public class IotDeviceServiceInvokeVertxHandler implements Handler<RoutingContex
.setIdentifier(identifier).setParams(params);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(BAD_REQUEST));
// 使用IotStandardResponse实体类返回错误
String method = METHOD_PREFIX + routingContext.pathParam("identifier") + METHOD_SUFFIX;
IotStandardResponse errorResponse = IotStandardResponse.error(
null, method, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
// 2. 调用处理器
try {
CommonResult<Boolean> result = deviceDownstreamHandler.invokeDeviceService(reqDTO);
IotPluginCommonUtils.writeJson(routingContext, result);
// 使用IotStandardResponse实体类返回结果
String method = METHOD_PREFIX + reqDTO.getIdentifier() + METHOD_SUFFIX;
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), method, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), method, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 服务调用异常]", reqDTO, e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(INTERNAL_SERVER_ERROR));
// 使用IotStandardResponse实体类返回错误
String method = METHOD_PREFIX + reqDTO.getIdentifier() + METHOD_SUFFIX;
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), method, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}

View File

@ -0,0 +1,94 @@
package cn.iocoder.yudao.module.iot.plugin.common.pojo;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* IoT 标准协议响应实体类
* <p>
* 用于统一 MQTT HTTP 的响应格式
*
* @author haohao
*/
@Data
@Accessors(chain = true)
public class IotStandardResponse {
/**
* 消息ID
*/
private String id;
/**
* 状态码
*/
private Integer code;
/**
* 响应数据
*/
private Object data;
/**
* 响应消息
*/
private String message;
/**
* 方法名
*/
private String method;
/**
* 协议版本
*/
private String version;
/**
* 创建成功响应
*
* @param id 消息ID
* @param method 方法名
* @return 成功响应
*/
public static IotStandardResponse success(String id, String method) {
return success(id, method, null);
}
/**
* 创建成功响应
*
* @param id 消息ID
* @param method 方法名
* @param data 响应数据
* @return 成功响应
*/
public static IotStandardResponse success(String id, String method, Object data) {
return new IotStandardResponse()
.setId(id)
.setCode(200)
.setData(data)
.setMessage("success")
.setMethod(method)
.setVersion("1.0");
}
/**
* 创建错误响应
*
* @param id 消息ID
* @param method 方法名
* @param code 错误码
* @param message 错误消息
* @return 错误响应
*/
public static IotStandardResponse error(String id, String method, Integer code, String message) {
return new IotStandardResponse()
.setId(id)
.setCode(code)
.setData(null)
.setMessage(message)
.setMethod(method)
.setVersion("1.0");
}
}

View File

@ -3,8 +3,8 @@ package cn.iocoder.yudao.module.iot.plugin.common.util;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.ext.web.RoutingContext;
import org.springframework.http.MediaType;
@ -12,7 +12,7 @@ import org.springframework.http.MediaType;
/**
* IoT 插件的通用工具类
*
* 芋道源码
* @author 芋道源码
*/
public class IotPluginCommonUtils {
@ -33,34 +33,43 @@ public class IotPluginCommonUtils {
SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID(), IdUtil.fastSimpleUUID());
}
@SuppressWarnings("deprecation")
public static void writeJson(RoutingContext routingContext, CommonResult<?> result) {
routingContext.response()
.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(JsonUtils.toJsonString(result));
}
@SuppressWarnings("deprecation")
public static void writeJson(RoutingContext routingContext, String result) {
routingContext.response()
.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(result);
}
/**
* 将对象转换为 JSON 字符串后写入响应
* 将对象转换为JSON字符串后写入HTTP响应
*
* @param routingContext 路由上下文
* @param data 数据对象
*/
@SuppressWarnings("deprecation")
public static void writeJson(RoutingContext routingContext, Object data) {
public static void writeJsonResponse(RoutingContext routingContext, Object data) {
routingContext.response()
.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(JsonUtils.toJsonString(data));
}
/**
* 生成标准JSON格式的响应并写入HTTP响应基于IotStandardResponse
* <p>
* 推荐使用此方法统一MQTT和HTTP的响应格式使用方式
*
* <pre>
* // 成功响应
* IotStandardResponse response = IotStandardResponse.success(requestId, method, data);
* IotPluginCommonUtils.writeJsonResponse(routingContext, response);
*
* // 错误响应
* IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method, code, message);
* IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
* </pre>
*
* @param routingContext 路由上下文
* @param response IotStandardResponse响应对象
*/
@SuppressWarnings("deprecation")
public static void writeJsonResponse(RoutingContext routingContext, IotStandardResponse response) {
routingContext.response()
.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(JsonUtils.toJsonString(response));
}
}

View File

@ -14,7 +14,12 @@ import java.util.Collections;
/**
* IoT Emqx 连接认证的 Vert.x Handler
* <a href="https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">MQTT HTTP</a>
* <a href=
* "https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">MQTT
* HTTP</a>
*
* 注意该处理器需要返回特定格式{"result": "allow"} {"result": "deny"}
* 以符合 EMQX 认证插件的要求因此不使用 IotStandardResponse 实体类
*
* @author haohao
*/
@ -43,15 +48,18 @@ public class IotDeviceAuthVertxHandler implements Handler<RoutingContext> {
// 调用认证 API
CommonResult<Boolean> authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO);
if (authResult.getCode() != 0 || !authResult.getData()) {
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny"));
// 注意这里必须返回 {"result": "deny"} 格式以符合 EMQX 认证插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "deny"));
return;
}
// 响应结果
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "allow"));
// 注意这里必须返回 {"result": "allow"} 格式以符合 EMQX 认证插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "allow"));
} catch (Exception e) {
log.error("[handle][EMQX 认证异常]", e);
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny"));
// 注意这里必须返回 {"result": "deny"} 格式以符合 EMQX 认证插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "deny"));
}
}

View File

@ -2,9 +2,11 @@ package cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
@ -14,6 +16,8 @@ import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* IoT 设备 MQTT 消息处理器
@ -32,16 +36,16 @@ public class IotDeviceMqttMessageHandler {
// 响应 Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply
// 设备上报事件 标准 JSON
// 请求 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
// 响应 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply
// 请求
// Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
// 响应
// Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply
private static final String SYS_TOPIC_PREFIX = "/sys/";
private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post";
private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/";
private static final String EVENT_POST_TOPIC_SUFFIX = "/post";
private static final String REPLY_SUFFIX = "_reply";
private static final int SUCCESS_CODE = 200;
private static final String SUCCESS_MESSAGE = "success";
private static final String PROPERTY_METHOD = "thing.event.property.post";
private static final String EVENT_METHOD_PREFIX = "thing.event.";
private static final String EVENT_METHOD_SUFFIX = ".post";
@ -211,27 +215,25 @@ public class IotDeviceMqttMessageHandler {
* @param method 响应方法
* @param customData 自定义数据可为null
*/
private void sendResponse(String topic, JSONObject jsonObject, String method, JSONObject customData) {
private void sendResponse(String topic, JSONObject jsonObject, String method, Object customData) {
String replyTopic = topic + REPLY_SUFFIX;
JSONObject data = customData != null ? customData : new JSONObject();
JSONObject response = new JSONObject()
.set("id", jsonObject.getStr("id"))
.set("code", SUCCESS_CODE)
.set("data", data)
.set("message", SUCCESS_MESSAGE)
.set("method", method);
// 使用IotStandardResponse实体类构建响应
IotStandardResponse response = IotStandardResponse.success(
jsonObject.getStr("id"),
method,
customData);
try {
mqttClient.publish(replyTopic,
Buffer.buffer(response.toString()),
Buffer.buffer(JsonUtils.toJsonString(response)),
MqttQoS.AT_LEAST_ONCE,
false,
false);
log.info("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
} catch (Exception e) {
log.error("[sendResponse][发送响应消息失败][topic: {}][response: {}]",
replyTopic, response.toString(), e);
replyTopic, response, e);
}
}
@ -249,7 +251,29 @@ public class IotDeviceMqttMessageHandler {
reportReqDTO.setReportTime(LocalDateTime.now());
reportReqDTO.setProductKey(topicParts[2]);
reportReqDTO.setDeviceName(topicParts[3]);
reportReqDTO.setProperties(jsonObject.getJSONObject("params"));
// 只使用标准JSON格式处理属性数据
JSONObject params = jsonObject.getJSONObject("params");
if (params == null) {
log.warn("[buildPropertyReportDTO][消息格式不正确缺少params字段][jsonObject: {}]", jsonObject);
params = new JSONObject();
}
// 将标准格式的params转换为平台需要的properties格式
Map<String, Object> properties = new HashMap<>();
for (Map.Entry<String, Object> entry : params.entrySet()) {
String key = entry.getKey();
Object valueObj = entry.getValue();
// 如果是复杂结构包含value和time
if (valueObj instanceof JSONObject valueJson) {
properties.put(key, valueJson.getOrDefault("value", valueObj));
} else {
properties.put(key, valueObj);
}
}
reportReqDTO.setProperties(properties);
return reportReqDTO;
}
@ -268,7 +292,15 @@ public class IotDeviceMqttMessageHandler {
reportReqDTO.setProductKey(topicParts[2]);
reportReqDTO.setDeviceName(topicParts[3]);
reportReqDTO.setIdentifier(topicParts[6]);
reportReqDTO.setParams(jsonObject.getJSONObject("params"));
// 只使用标准JSON格式处理事件参数
JSONObject params = jsonObject.getJSONObject("params");
if (params == null) {
log.warn("[buildEventReportDTO][消息格式不正确缺少params字段][jsonObject: {}]", jsonObject);
params = new JSONObject();
}
reportReqDTO.setParams(params);
return reportReqDTO;
}
}

View File

@ -18,7 +18,11 @@ import java.util.Collections;
/**
* IoT Emqx Webhook 事件处理的 Vert.x Handler
*
* <a href="https://docs.emqx.com/zh/emqx/latest/data-integration/webhook.html">EMQXWebhook</a>
* <a href=
* "https://docs.emqx.com/zh/emqx/latest/data-integration/webhook.html">EMQXWebhook</a>
*
* 注意该处理器需要返回特定格式{"result": "success"} {"result": "error"}
* 以符合 EMQX Webhook 插件的要求因此不使用 IotStandardResponse 实体类
*
* @author haohao
*/
@ -54,10 +58,12 @@ public class IotDeviceWebhookVertxHandler implements Handler<RoutingContext> {
}
// 返回成功响应
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "success"));
// 注意这里必须返回 {"result": "success"} 格式以符合 EMQX Webhook 插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "success"));
} catch (Exception e) {
log.error("[handle][处理 Webhook 事件异常]", e);
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "error"));
// 注意这里必须返回 {"result": "error"} 格式以符合 EMQX Webhook 插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "error"));
}
}

View File

@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
@ -15,6 +16,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
@ -28,6 +30,9 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
public class IotDeviceEventReportVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/sys/:productKey/:deviceName/thing/event/:identifier/post";
private static final String VERSION = "1.0";
private static final String EVENT_METHOD_PREFIX = "thing.event.";
private static final String EVENT_METHOD_SUFFIX = ".post";
private final IotDeviceUpstreamApi deviceUpstreamApi;
@ -36,40 +41,71 @@ public class IotDeviceEventReportVertxHandler implements Handler<RoutingContext>
public void handle(RoutingContext routingContext) {
// 1. 解析参数
IotDeviceEventReportReqDTO reportReqDTO;
String identifier = null;
String requestId = IdUtil.fastSimpleUUID();
try {
String productKey = routingContext.pathParam("productKey");
String deviceName = routingContext.pathParam("deviceName");
String identifier = routingContext.pathParam("identifier");
identifier = routingContext.pathParam("identifier");
JsonObject body = routingContext.body().asJsonObject();
String id = ObjUtil.defaultIfBlank(body.getString("id"), IdUtil.fastSimpleUUID());
Map<String, Object> params = (Map<String, Object>) body.getMap().get("params");
reportReqDTO = ((IotDeviceEventReportReqDTO)
new IotDeviceEventReportReqDTO().setRequestId(id)
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName))
requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId);
// 按照标准JSON格式处理事件参数
Map<String, Object> params;
// 优先使用params字段符合标准
if (body.getJsonObject("params") != null) {
params = body.getJsonObject("params").getMap();
} else {
// 兼容旧格式
params = new HashMap<>();
}
reportReqDTO = ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId)
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName))
.setIdentifier(identifier).setParams(params);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(BAD_REQUEST));
// 使用IotStandardResponse实体类返回错误
String method = identifier != null ? EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX
: "thing.event.unknown.post";
IotStandardResponse errorResponse = IotStandardResponse.error(
requestId, method, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
try {
// 2. 设备上线
deviceUpstreamApi.updateDeviceState(((IotDeviceStateUpdateReqDTO)
new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID())
deviceUpstreamApi.updateDeviceState(
((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID())
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(reportReqDTO.getProductKey()).setDeviceName(reportReqDTO.getDeviceName()))
.setState(IotDeviceStateEnum.ONLINE.getState()));
.setState(IotDeviceStateEnum.ONLINE.getState()));
// 3.1 属性上报
// 3.1 事件上报
CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
// 3.2 返回结果
IotPluginCommonUtils.writeJson(routingContext, result);
// 3.2 返回结果 - 使用IotStandardResponse实体类
String method = EVENT_METHOD_PREFIX + reportReqDTO.getIdentifier() + EVENT_METHOD_SUFFIX;
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reportReqDTO.getRequestId(), method, result.getData());
} else {
response = IotStandardResponse.error(
reportReqDTO.getRequestId(), method, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 时间上报异常]", reportReqDTO, e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(INTERNAL_SERVER_ERROR));
log.error("[handle][请求参数({}) 事件上报异常]", reportReqDTO, e);
// 构建错误响应 - 使用IotStandardResponse实体类
String method = EVENT_METHOD_PREFIX + reportReqDTO.getIdentifier() + EVENT_METHOD_SUFFIX;
IotStandardResponse errorResponse = IotStandardResponse.error(
reportReqDTO.getRequestId(), method,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}
}

View File

@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.plugin.common.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
@ -15,6 +16,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
@ -31,6 +33,8 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
public class IotDevicePropertyReportVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/sys/:productKey/:deviceName/thing/event/property/post";
private static final String VERSION = "1.0";
private static final String METHOD = "thing.event.property.post";
private final IotDeviceUpstreamApi deviceUpstreamApi;
@ -39,43 +43,89 @@ public class IotDevicePropertyReportVertxHandler implements Handler<RoutingConte
public void handle(RoutingContext routingContext) {
// 1. 解析参数
IotDevicePropertyReportReqDTO reportReqDTO;
String requestId = IdUtil.fastSimpleUUID();
try {
String productKey = routingContext.pathParam("productKey");
String deviceName = routingContext.pathParam("deviceName");
JsonObject body = routingContext.body().asJsonObject();
String id = ObjUtil.defaultIfBlank(body.getString("id"), IdUtil.fastSimpleUUID());
Map<String, Object> properties = (Map<String, Object>) body.getMap().get("properties");
reportReqDTO = ((IotDevicePropertyReportReqDTO)
new IotDevicePropertyReportReqDTO().setRequestId(id)
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName))
requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId);
// 按照标准JSON格式处理属性数据
Map<String, Object> properties = new HashMap<>();
// 优先使用params字段符合标准
Map<String, Object> params = body.getJsonObject("params") != null ? body.getJsonObject("params").getMap()
: null;
if (params != null) {
// 将标准格式的params转换为平台需要的properties格式
for (Map.Entry<String, Object> entry : params.entrySet()) {
String key = entry.getKey();
Object valueObj = entry.getValue();
// 如果是复杂结构包含value和time
if (valueObj instanceof Map) {
Map<String, Object> valueMap = (Map<String, Object>) valueObj;
if (valueMap.containsKey("value")) {
properties.put(key, valueMap.get("value"));
} else {
properties.put(key, valueObj);
}
} else {
properties.put(key, valueObj);
}
}
} else {
// 兼容旧格式直接使用properties字段
properties = body.getJsonObject("properties") != null ? body.getJsonObject("properties").getMap()
: new HashMap<>();
}
reportReqDTO = ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId)
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName))
.setProperties(properties);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(BAD_REQUEST));
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
requestId, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
// TODO @芋艿secret 校验目前的想法
// 方案一请求的时候带上 secret 参数然后进行校验减少请求的频次不过可能要看下 mqtt 能不能复用
// 方案二本地有设备信息的缓存异步刷新这样可能 mqtt 的校验 http 校验都容易适配
// 方案一请求的时候带上 secret 参数然后进行校验减少请求的频次不过可能要看下 mqtt 能不能复用
// 方案二本地有设备信息的缓存异步刷新这样可能 mqtt 的校验 http 校验都容易适配
try {
// 2. 设备上线
deviceUpstreamApi.updateDeviceState(((IotDeviceStateUpdateReqDTO)
new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID())
deviceUpstreamApi.updateDeviceState(
((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID())
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(reportReqDTO.getProductKey()).setDeviceName(reportReqDTO.getDeviceName()))
.setState(IotDeviceStateEnum.ONLINE.getState()));
.setState(IotDeviceStateEnum.ONLINE.getState()));
// 3.1 属性上报
CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
// 3.2 返回结果
IotPluginCommonUtils.writeJson(routingContext, result);
// 3.2 返回结果 - 使用IotStandardResponse实体类
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reportReqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reportReqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 属性上报异常]", reportReqDTO, e);
IotPluginCommonUtils.writeJson(routingContext, CommonResult.error(INTERNAL_SERVER_ERROR));
// 构建错误响应 - 使用IotStandardResponse实体类
IotStandardResponse errorResponse = IotStandardResponse.error(
reportReqDTO.getRequestId(), METHOD,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}
}