From 4cefea68809b5ca38903c6254357ac985b974810 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com> Date: Tue, 25 Feb 2025 09:51:39 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84?= =?UTF-8?q?=E3=80=91IoT:=20=E6=B7=BB=E5=8A=A0=20MQTT=20=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=99=A8=EF=BC=8C=E9=87=8D=E6=9E=84=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E5=B1=9E=E6=80=A7=E5=92=8C=E4=BA=8B=E4=BB=B6=E4=B8=8A?= =?UTF-8?q?=E6=8A=A5=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BC=98=E5=8C=96=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=A4=84=E7=90=86=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotDevicePropertySetVertxHandler.java | 2 +- .../IotPluginEmqxAutoConfiguration.java | 8 +- .../IotDeviceDownstreamHandlerImpl.java | 6 + .../upstream/IotDeviceUpstreamServer.java | 143 +------------ .../router/IotDeviceMqttMessageHandler.java | 194 ++++++++++++++++++ 5 files changed, 212 insertions(+), 141 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDevicePropertySetVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDevicePropertySetVertxHandler.java index c3a71ff806..ddcebccffb 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDevicePropertySetVertxHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/downstream/router/IotDevicePropertySetVertxHandler.java @@ -16,7 +16,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; /** - * IoT 设备服务设置 Vertx Handler + * IoT 设置设备属性 Vertx Handler * * 芋道源码 */ diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java index 4b9b104aa2..d504e5704f 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java @@ -1,9 +1,7 @@ package cn.iocoder.yudao.module.iot.plugin.emqx.config; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties; import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler; -import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer; import cn.iocoder.yudao.module.iot.plugin.emqx.downstream.IotDeviceDownstreamHandlerImpl; import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.IotDeviceUpstreamServer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -21,10 +19,8 @@ public class IotPluginEmqxAutoConfiguration { @Bean(initMethod = "start", destroyMethod = "stop") public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi, - IotPluginCommonProperties commonProperties, - IotPluginEmqxProperties emqxProperties, - IotDeviceDownstreamServer deviceDownstreamServer) { - return new IotDeviceUpstreamServer(commonProperties, emqxProperties, deviceUpstreamApi, deviceDownstreamServer); + IotPluginEmqxProperties emqxProperties) { + return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi); } @Bean diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java index fbbfd7a5c8..b1a8eebbf4 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java @@ -14,6 +14,9 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle @Override public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) { + // 设备服务调用 + // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier} + // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply return CommonResult.success(true); } @@ -24,6 +27,9 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle @Override public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) { + // 设置设备属性 标准 JSON + // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/property/set + // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/property/set_reply return CommonResult.success(true); } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java index 423f0d233b..baeddcb152 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java @@ -1,16 +1,11 @@ package cn.iocoder.yudao.module.iot.plugin.emqx.upstream; import cn.hutool.core.util.IdUtil; -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO; -import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties; -import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer; -import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils; import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties; import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler; +import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceMqttMessageHandler; +import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; import io.vertx.ext.web.Router; @@ -19,47 +14,28 @@ import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import lombok.extern.slf4j.Slf4j; -import java.time.LocalDateTime; - /** * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器 *

* 协议:HTTP、MQTT - * 参考:... * * @author haohao */ @Slf4j public class IotDeviceUpstreamServer { - // 设备上报属性 标准 JSON - // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/property/post - // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply - // 设备上报事件 标准 JSON - // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post - // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply - - private static final String SYS_TOPIC_PREFIX = "/sys/"; - private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post"; - private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/"; - private static final String EVENT_POST_TOPIC_SUFFIX = "/post"; - private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒) - private static final int QOS_LEVEL = 1; private final Vertx vertx; private final HttpServer server; private final MqttClient client; private final IotPluginEmqxProperties emqxProperties; - private final IotDeviceUpstreamApi deviceUpstreamApi; + private final IotDeviceMqttMessageHandler mqttMessageHandler; - public IotDeviceUpstreamServer(IotPluginCommonProperties commonProperties, - IotPluginEmqxProperties emqxProperties, - IotDeviceUpstreamApi deviceUpstreamApi, - IotDeviceDownstreamServer deviceDownstreamServer) { + public IotDeviceUpstreamServer(IotPluginEmqxProperties emqxProperties, + IotDeviceUpstreamApi deviceUpstreamApi) { this.emqxProperties = emqxProperties; - this.deviceUpstreamApi = deviceUpstreamApi; + // 创建 Vertx 实例 this.vertx = Vertx.vertx(); // 创建 Router 实例 @@ -77,6 +53,7 @@ public class IotDeviceUpstreamServer { .setPassword(emqxProperties.getMqttPassword()) .setSsl(emqxProperties.isMqttSsl()); client = MqttClient.create(vertx, options); + this.mqttMessageHandler = new IotDeviceMqttMessageHandler(deviceUpstreamApi, client); } /** @@ -108,109 +85,7 @@ public class IotDeviceUpstreamServer { * 设置 MQTT 消息处理器 */ private void setupMessageHandler() { - client.publishHandler(message -> { - String topic = message.topicName(); - String payload = message.payload().toString(); - log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload); - - try { - handleMessage(topic, payload); - } catch (Exception e) { - log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e); - } - }); - } - - /** - * 处理 MQTT 消息 - */ - private void handleMessage(String topic, String payload) { - // 校验前缀 - if (!topic.startsWith(SYS_TOPIC_PREFIX)) { - log.warn("[handleMessage][未知的消息类型][topic: {}]", topic); - return; - } - - // 处理设备属性上报消息 - if (topic.endsWith(PROPERTY_POST_TOPIC)) { - log.info("[handleMessage][接收到设备属性上报][topic: {}]", topic); - handlePropertyPost(topic, payload); - return; - } - - // 处理设备事件上报消息 - if (topic.contains(EVENT_POST_TOPIC_PREFIX) && topic.endsWith(EVENT_POST_TOPIC_SUFFIX)) { - log.info("[handleMessage][接收到设备事件上报][topic: {}]", topic); - handleEventPost(topic, payload); - return; - } - - // 未知消息类型 - log.warn("[handleMessage][未知的消息类型][topic: {}]", topic); - } - - /** - * 处理设备属性上报 - */ - private void handlePropertyPost(String topic, String payload) { - // /sys/${productKey}/${deviceName}/thing/event/property/post - // 解析消息内容 - JSONObject jsonObject = JSONUtil.parseObj(payload); - String[] topicParts = topic.split("/"); - - // 构建设备属性上报请求对象 - IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts); - - // 调用上游 API 处理设备上报数据 - deviceUpstreamApi.reportDeviceProperty(reportReqDTO); - log.info("[handlePropertyPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]", - topic, JSONUtil.toJsonStr(reportReqDTO)); - } - - /** - * 处理设备事件上报 - */ - private void handleEventPost(String topic, String payload) { - // /sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post - // 解析消息内容 - JSONObject jsonObject = JSONUtil.parseObj(payload); - String[] topicParts = topic.split("/"); - - // 构建设备事件上报请求对象 - IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts); - - // 调用上游 API 处理设备上报数据 - deviceUpstreamApi.reportDeviceEvent(reportReqDTO); - log.info("[handleEventPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]", - topic, JSONUtil.toJsonStr(reportReqDTO)); - } - - /** - * 构建设备属性上报请求对象 - */ - private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, - String[] topicParts) { - return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO() - .setRequestId(jsonObject.getStr("id")) - .setProcessId(IotPluginCommonUtils.getProcessId()) - .setReportTime(LocalDateTime.now()) - .setProductKey(topicParts[2]) - .setDeviceName(topicParts[3])) - .setProperties(jsonObject.getJSONObject("params")); - } - - /** - * 构建设备事件上报请求对象 - */ - private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) { - return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO() - .setRequestId(jsonObject.getStr("id")) - .setProcessId(IotPluginCommonUtils.getProcessId()) - .setReportTime(LocalDateTime.now()) - .setProductKey(topicParts[2]) - .setDeviceName(topicParts[3])) - .setIdentifier(topicParts[4]) - .setParams(jsonObject.getJSONObject("params")); + client.publishHandler(mqttMessageHandler::handle); } /** @@ -244,7 +119,7 @@ public class IotDeviceUpstreamServer { private void subscribeToTopics() { String[] topics = emqxProperties.getMqttTopics().split(","); for (String topic : topics) { - client.subscribe(topic, QOS_LEVEL) + client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value()) .onSuccess(v -> log.info("[subscribeToTopics][成功订阅主题: {}]", topic)) .onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err)); } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java new file mode 100644 index 0000000000..6b99d781a4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java @@ -0,0 +1,194 @@ +package cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router; + +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO; +import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.messages.MqttPublishMessage; +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDateTime; + +/** + * IoT 设备 MQTT 消息处理器 + *

+ * 参考: + *

+ * "..."> + */ +@Slf4j +public class IotDeviceMqttMessageHandler { + + // 设备上报属性 标准 JSON + // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/property/post + // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply + // 设备上报事件 标准 JSON + // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post + // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply + + private static final String SYS_TOPIC_PREFIX = "/sys/"; + private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post"; + private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/"; + private static final String EVENT_POST_TOPIC_SUFFIX = "/post"; + + private final IotDeviceUpstreamApi deviceUpstreamApi; + private final MqttClient mqttClient; + + public IotDeviceMqttMessageHandler(IotDeviceUpstreamApi deviceUpstreamApi, MqttClient mqttClient) { + this.deviceUpstreamApi = deviceUpstreamApi; + this.mqttClient = mqttClient; + } + + public void handle(MqttPublishMessage message) { + String topic = message.topicName(); + String payload = message.payload().toString(); + log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload); + + try { + handleMessage(topic, payload); + } catch (Exception e) { + log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e); + } + } + + private void handleMessage(String topic, String payload) { + // 校验前缀 + if (!topic.startsWith(SYS_TOPIC_PREFIX)) { + log.warn("[handleMessage][未知的消息类型][topic: {}]", topic); + return; + } + + // 处理设备属性上报消息 + if (topic.endsWith(PROPERTY_POST_TOPIC)) { + log.info("[handleMessage][接收到设备属性上报][topic: {}]", topic); + handlePropertyPost(topic, payload); + return; + } + + // 处理设备事件上报消息 + if (topic.contains(EVENT_POST_TOPIC_PREFIX) && topic.endsWith(EVENT_POST_TOPIC_SUFFIX)) { + log.info("[handleMessage][接收到设备事件上报][topic: {}]", topic); + handleEventPost(topic, payload); + return; + } + + // 未知消息类型 + log.warn("[handleMessage][未知的消息类型][topic: {}]", topic); + } + + /** + * 处理设备属性上报消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + private void handlePropertyPost(String topic, String payload) { + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = topic.split("/"); + + // 构建设备属性上报请求对象 + IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts); + + // 调用上游 API 处理设备上报数据 + deviceUpstreamApi.reportDeviceProperty(reportReqDTO); + log.info("[handlePropertyPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]", + topic, JSONUtil.toJsonStr(reportReqDTO)); + + // 发送响应消息 + String replyTopic = topic + "_reply"; + JSONObject response = new JSONObject() + .set("id", jsonObject.getStr("id")) + .set("code", 200) + .set("data", new JSONObject()) + .set("message", "success") + .set("method", "thing.event.property.post"); + + mqttClient.publish(replyTopic, + Buffer.buffer(response.toString()), + MqttQoS.AT_LEAST_ONCE, + false, + false); + log.info("[handlePropertyPost][发送响应消息成功][topic: {}][response: {}]", + replyTopic, response.toString()); + } + + /** + * 处理设备事件上报消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + private void handleEventPost(String topic, String payload) { + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = topic.split("/"); + + // 构建设备事件上报请求对象 + IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts); + + // 调用上游 API 处理设备上报数据 + deviceUpstreamApi.reportDeviceEvent(reportReqDTO); + log.info("[handleEventPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]", + topic, JSONUtil.toJsonStr(reportReqDTO)); + + // 发送响应消息 + String replyTopic = topic + "_reply"; + String eventIdentifier = topicParts[6]; // 从 topic 中获取事件标识符 + JSONObject response = new JSONObject() + .set("id", jsonObject.getStr("id")) + .set("code", 200) + .set("data", new JSONObject()) + .set("message", "success") + .set("method", "thing.event." + eventIdentifier + ".post"); + + mqttClient.publish(replyTopic, + Buffer.buffer(response.toString()), + MqttQoS.AT_LEAST_ONCE, + false, + false); + log.info("[handleEventPost][发送响应消息成功][topic: {}][response: {}]", + replyTopic, response.toString()); + } + + /** + * 构建设备属性上报请求对象 + * + * @param jsonObject 消息内容 + * @param topicParts 主题部分 + * @return 设备属性上报请求对象 + */ + private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, + String[] topicParts) { + return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO() + .setRequestId(jsonObject.getStr("id")) + .setProcessId(IotPluginCommonUtils.getProcessId()) + .setReportTime(LocalDateTime.now()) + .setProductKey(topicParts[2]) + .setDeviceName(topicParts[3])) + .setProperties(jsonObject.getJSONObject("params")); + } + + /** + * 构建设备事件上报请求对象 + * + * @param jsonObject 消息内容 + * @param topicParts 主题部分 + * @return 设备事件上报请求对象 + */ + private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) { + return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO() + .setRequestId(jsonObject.getStr("id")) + .setProcessId(IotPluginCommonUtils.getProcessId()) + .setReportTime(LocalDateTime.now()) + .setProductKey(topicParts[2]) + .setDeviceName(topicParts[3])) + .setIdentifier(topicParts[4]) + .setParams(jsonObject.getJSONObject("params")); + } +} \ No newline at end of file