【代码评审】IoT:mqtt 协议的接入

This commit is contained in:
YunaiV 2025-03-16 20:43:11 +08:00
parent 86e4379e62
commit 0bdd000226
13 changed files with 61 additions and 100 deletions

View File

@ -42,6 +42,11 @@
</dependency>
<!-- DB 相关 -->
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-mybatis</artifactId>
@ -107,12 +112,6 @@
<version>24.1.2</version>
</dependency>
<!-- TDengine 相关依赖 -->
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -46,7 +46,6 @@ public class IotDeviceConfigSetVertxHandler implements Handler<RoutingContext> {
.setConfig(config);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
null, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
@ -57,18 +56,16 @@ public class IotDeviceConfigSetVertxHandler implements Handler<RoutingContext> {
try {
CommonResult<Boolean> result = deviceDownstreamHandler.setDeviceConfig(reqDTO);
// 使用IotStandardResponse实体类返回结果
// 3. 响应结果
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
response = IotStandardResponse.error(reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 配置设置异常]", reqDTO, e);
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);

View File

@ -51,7 +51,6 @@ public class IotDeviceOtaUpgradeVertxHandler implements Handler<RoutingContext>
.setInformation(information);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
null, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
@ -62,18 +61,16 @@ public class IotDeviceOtaUpgradeVertxHandler implements Handler<RoutingContext>
try {
CommonResult<Boolean> result = deviceDownstreamHandler.upgradeDeviceOta(reqDTO);
// 使用IotStandardResponse实体类返回结果
// 3. 响应结果
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
response = IotStandardResponse.error(reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) OTA 升级异常]", reqDTO, e);
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);

View File

@ -46,7 +46,6 @@ public class IotDevicePropertyGetVertxHandler implements Handler<RoutingContext>
.setIdentifiers(identifiers);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
null, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
@ -57,18 +56,16 @@ public class IotDevicePropertyGetVertxHandler implements Handler<RoutingContext>
try {
CommonResult<Boolean> result = deviceDownstreamHandler.getDeviceProperty(reqDTO);
// 使用IotStandardResponse实体类返回结果
// 3. 响应结果
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
response = IotStandardResponse.error(reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 属性获取异常]", reqDTO, e);
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);

View File

@ -46,7 +46,6 @@ public class IotDevicePropertySetVertxHandler implements Handler<RoutingContext>
.setProperties(properties);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
null, METHOD, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
@ -57,18 +56,16 @@ public class IotDevicePropertySetVertxHandler implements Handler<RoutingContext>
try {
CommonResult<Boolean> result = deviceDownstreamHandler.setDeviceProperty(reqDTO);
// 使用IotStandardResponse实体类返回结果
// 3. 响应结果
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), METHOD, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
response = IotStandardResponse.error(reqDTO.getRequestId(), METHOD, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 属性设置异常]", reqDTO, e);
// 使用IotStandardResponse实体类返回错误
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), METHOD, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);

View File

@ -48,7 +48,6 @@ public class IotDeviceServiceInvokeVertxHandler implements Handler<RoutingContex
.setIdentifier(identifier).setParams(params);
} catch (Exception e) {
log.error("[handle][路径参数({}) 解析参数失败]", routingContext.pathParams(), e);
// 使用IotStandardResponse实体类返回错误
String method = METHOD_PREFIX + routingContext.pathParam("identifier") + METHOD_SUFFIX;
IotStandardResponse errorResponse = IotStandardResponse.error(
null, method, BAD_REQUEST.getCode(), BAD_REQUEST.getMsg());
@ -60,19 +59,17 @@ public class IotDeviceServiceInvokeVertxHandler implements Handler<RoutingContex
try {
CommonResult<Boolean> result = deviceDownstreamHandler.invokeDeviceService(reqDTO);
// 使用IotStandardResponse实体类返回结果
// 3. 响应结果
String method = METHOD_PREFIX + reqDTO.getIdentifier() + METHOD_SUFFIX;
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(reqDTO.getRequestId(), method, result.getData());
} else {
response = IotStandardResponse.error(
reqDTO.getRequestId(), method, result.getCode(), result.getMsg());
response = IotStandardResponse.error(reqDTO.getRequestId(), method, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][请求参数({}) 服务调用异常]", reqDTO, e);
// 使用IotStandardResponse实体类返回错误
String method = METHOD_PREFIX + reqDTO.getIdentifier() + METHOD_SUFFIX;
IotStandardResponse errorResponse = IotStandardResponse.error(
reqDTO.getRequestId(), method, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.plugin.common.pojo;
import lombok.Data;
import lombok.experimental.Accessors;
// TODO @芋艿1后续考虑要不要叫 Iot 网关之类的 Response2包名 pojo
/**
* IoT 标准协议响应实体类
* <p>

View File

@ -10,10 +10,9 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext
/**
* EMQX 插件实现类
*
* 基于 PF4J 插件框架实现 EMQX 消息中间件的集成
* 负责插件的生命周期管理包括启动停止和应用上下文的创建
* 基于 PF4J 插件框架实现 EMQX 消息中间件的集成负责插件的生命周期管理包括启动停止和应用上下文的创建
*
* @author 芋道源码
* @author haohao
*/
@Slf4j
public class IotEmqxPlugin extends SpringPlugin {
@ -26,7 +25,6 @@ public class IotEmqxPlugin extends SpringPlugin {
public void start() {
log.info("[EmqxPlugin][EmqxPlugin 插件启动开始...]");
try {
log.info("[EmqxPlugin][EmqxPlugin 插件启动成功...]");
} catch (Exception e) {
log.error("[EmqxPlugin][EmqxPlugin 插件开启动异常...]", e);
@ -52,6 +50,7 @@ public class IotEmqxPlugin extends SpringPlugin {
// 继续使用插件自己的 ClassLoader 以加载插件内部的类
pluginContext.setClassLoader(getWrapper().getPluginClassLoader());
// 扫描当前插件的自动配置包
// TODO @芋艿是不是要配置下包
pluginContext.scan("cn.iocoder.yudao.module.iot.plugin.emqx.config");
pluginContext.refresh();
return pluginContext;

View File

@ -17,7 +17,6 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.MQTT_TOPIC_IL
/**
* EMQX 插件的 {@link IotDeviceDownstreamHandler} 实现类
* <p>
*
* @author 芋道源码
*/
@ -26,8 +25,7 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
private static final String SYS_TOPIC_PREFIX = "/sys/";
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈
// 回复 都使用 Alink 格式方便后续扩展
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈回复 都使用 Alink 格式方便后续扩展
// 设备服务调用 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}
// 响应Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply
@ -62,11 +60,8 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
try {
// 构建请求主题
String topic = buildServiceTopic(reqDTO.getProductKey(), reqDTO.getDeviceName(), reqDTO.getIdentifier());
// 生成请求ID如果没有提供
String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
// 构建请求消息
String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
JSONObject request = buildServiceRequest(requestId, reqDTO.getIdentifier(), reqDTO.getParams());
// 发送消息
@ -98,11 +93,8 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
try {
// 构建请求主题
String topic = buildPropertySetTopic(reqDTO.getProductKey(), reqDTO.getDeviceName());
// 生成请求ID如果没有提供
String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
// 构建请求消息
String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
JSONObject request = buildPropertySetRequest(requestId, reqDTO.getProperties());
// 发送消息
@ -163,7 +155,7 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
}
/**
* 发布MQTT消息
* 发布 MQTT 消息
*/
private void publishMessage(String topic, JSONObject payload) {
mqttClient.publish(

View File

@ -13,13 +13,12 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
/**
* IoT Emqx 连接认证的 Vert.x Handler
* <a href=
* "https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">MQTT
* HTTP</a>
* IoT EMQX 连接认证的 Vert.x Handler
*
* <a href="https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">EMQX HTTP</a>
*
* 注意该处理器需要返回特定格式{"result": "allow"} {"result": "deny"}
* 以符合 EMQX 认证插件的要求因此不使用 IotStandardResponse 实体类
* 以符合 EMQX 认证插件的要求因此不使用 IotStandardResponse 实体类
*
* @author haohao
*/

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
@ -21,26 +22,20 @@ import java.util.Map;
/**
* IoT 设备 MQTT 消息处理器
* <p>
* 参考
* <p>
* "<a href=
* "https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services?spm=a2c4g.11186623.0.0.97a72915vRck44#section-g4j-5zg-12b">...</a>">
*
* 参考"<a href="https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services">设备属性、事件、服务</a>">
*/
@Slf4j
public class IotDeviceMqttMessageHandler {
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈
// 回复 都使用 Alink 格式方便后续扩展
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈回复 都使用 Alink 格式方便后续扩展
// 设备上报属性 标准 JSON
// 请求 Topic/sys/${productKey}/${deviceName}/thing/event/property/post
// 响应 Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply
// 设备上报事件 标准 JSON
// 请求
// Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
// 响应
// Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply
// 请求 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
// 响应 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply
private static final String SYS_TOPIC_PREFIX = "/sys/";
private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post";
@ -70,7 +65,7 @@ public class IotDeviceMqttMessageHandler {
log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload);
try {
if (payload == null || payload.isEmpty()) {
if (StrUtil.isEmpty(payload)) {
log.warn("[messageHandler][消息内容为空][topic: {}]", topic);
return;
}
@ -214,27 +209,20 @@ public class IotDeviceMqttMessageHandler {
* @param topic 原始主题
* @param jsonObject 原始消息JSON对象
* @param method 响应方法
* @param customData 自定义数据可为null
* @param customData 自定义数据可为 null
*/
private void sendResponse(String topic, JSONObject jsonObject, String method, Object customData) {
String replyTopic = topic + REPLY_SUFFIX;
// 使用IotStandardResponse实体类构建响应
// 响应结果
IotStandardResponse response = IotStandardResponse.success(
jsonObject.getStr("id"),
method,
customData);
jsonObject.getStr("id"), method, customData);
try {
mqttClient.publish(replyTopic,
Buffer.buffer(JsonUtils.toJsonString(response)),
MqttQoS.AT_LEAST_ONCE,
false,
false);
mqttClient.publish(replyTopic, Buffer.buffer(JsonUtils.toJsonString(response)),
MqttQoS.AT_LEAST_ONCE, false, false);
log.info("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
} catch (Exception e) {
log.error("[sendResponse][发送响应消息失败][topic: {}][response: {}]",
replyTopic, response, e);
log.error("[sendResponse][发送响应消息失败][topic: {}][response: {}]", replyTopic, response, e);
}
}
@ -304,4 +292,5 @@ public class IotDeviceMqttMessageHandler {
return reportReqDTO;
}
}

View File

@ -16,13 +16,12 @@ import java.time.LocalDateTime;
import java.util.Collections;
/**
* IoT Emqx Webhook 事件处理的 Vert.x Handler
* IoT EMQX Webhook 事件处理的 Vert.x Handler
*
* <a href=
* "https://docs.emqx.com/zh/emqx/latest/data-integration/webhook.html">EMQXWebhook</a>
* <a href="https://docs.emqx.com/zh/emqx/latest/data-integration/webhook.html">EMQX Webhook</a>
*
* 注意该处理器需要返回特定格式{"result": "success"} {"result": "error"}
* 以符合 EMQX Webhook 插件的要求因此不使用 IotStandardResponse 实体类
* 以符合 EMQX Webhook 插件的要求因此不使用 IotStandardResponse 实体类
*
* @author haohao
*/
@ -137,7 +136,7 @@ public class IotDeviceWebhookVertxHandler implements Handler<RoutingContext> {
* 解析用户名格式为 deviceName&productKey
*
* @param username 用户名
* @return 解析结果[0] deviceName[1] productKey解析失败返回 null
* @return 解析结果[0] deviceName[1] productKey解析失败返回 null
*/
private String[] parseUsername(String username) {
if (StrUtil.isEmpty(username)) {

View File

@ -34,9 +34,13 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
@Slf4j
public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
// 属性上报路径
/**
* 属性上报路径
*/
public static final String PROPERTY_PATH = "/sys/:productKey/:deviceName/thing/event/property/post";
// 事件上报路径
/**
* 事件上报路径
*/
public static final String EVENT_PATH = "/sys/:productKey/:deviceName/thing/event/:identifier/post";
private static final String PROPERTY_METHOD = "thing.event.property.post";
@ -60,7 +64,6 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
// 2. 根据路径模式处理不同类型的请求
CommonResult<Boolean> result;
String method;
if (path.matches(".*/thing/event/property/post")) {
// 处理属性上报
IotDevicePropertyReportReqDTO reportReqDTO = parsePropertyReportRequest(productKey, deviceName, requestId, body);
@ -97,13 +100,13 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][处理上行请求异常] path={}", path, e);
// 构建错误响应
String method = path.contains("/property/") ? PROPERTY_METHOD : EVENT_METHOD_PREFIX + (routingContext.pathParams().containsKey("identifier") ? routingContext.pathParam("identifier") : "unknown") + EVENT_METHOD_SUFFIX;
String method = path.contains("/property/") ? PROPERTY_METHOD
: EVENT_METHOD_PREFIX + (routingContext.pathParams().containsKey("identifier")
? routingContext.pathParam("identifier")
: "unknown") + EVENT_METHOD_SUFFIX;
IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method, INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
@ -130,20 +133,16 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
*/
@SuppressWarnings("unchecked")
private IotDevicePropertyReportReqDTO parsePropertyReportRequest(String productKey, String deviceName, String requestId, JsonObject body) {
// 按照标准JSON格式处理属性数据
// 按照标准 JSON 格式处理属性数据
Map<String, Object> properties = new HashMap<>();
// 优先使用params字段符合标准
// 优先使用 params 字段符合标准
Map<String, Object> params = body.getJsonObject("params") != null ? body.getJsonObject("params").getMap() : null;
if (params != null) {
// 将标准格式的params转换为平台需要的properties格式
// 将标准格式的 params 转换为平台需要的 properties 格式
for (Map.Entry<String, Object> entry : params.entrySet()) {
String key = entry.getKey();
Object valueObj = entry.getValue();
// 如果是复杂结构包含value和time
// 如果是复杂结构包含 value time
if (valueObj instanceof Map) {
Map<String, Object> valueMap = (Map<String, Object>) valueObj;
properties.put(key, valueMap.getOrDefault("value", valueObj));
@ -153,6 +152,7 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
}
}
// 构建属性上报请求 DTO
return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setProperties(properties);
}
@ -166,12 +166,9 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
* @param body 请求体
* @return 事件上报请求DTO
*/
@SuppressWarnings("unchecked")
private IotDeviceEventReportReqDTO parseEventReportRequest(String productKey, String deviceName, String identifier, String requestId, JsonObject body) {
// 按照标准JSON格式处理事件参数
Map<String, Object> params;
// 优先使用params字段符合标准
if (body.getJsonObject("params") != null) {
params = body.getJsonObject("params").getMap();
@ -180,6 +177,7 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
params = new HashMap<>();
}
// 构建事件上报请求 DTO
return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId).setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()).setProductKey(productKey).setDeviceName(deviceName)).setIdentifier(identifier).setParams(params);
}
}