From 4746281df96b59c38d5f33ef940cc1b228414eb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com> Date: Tue, 25 Feb 2025 08:50:02 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84?= =?UTF-8?q?=E3=80=91IoT:=20=E6=9B=B4=E6=96=B0=20MQTT=20=E4=B8=BB=E9=A2=98?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=EF=BC=8C=E9=87=8D=E6=9E=84=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E5=B1=9E=E6=80=A7=E5=92=8C=E4=BA=8B=E4=BB=B6=E4=B8=8A=E6=8A=A5?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../upstream/IotDeviceUpstreamServer.java | 86 ++++++++++++++++--- .../src/main/resources/application.yml | 2 +- 2 files changed, 74 insertions(+), 14 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java index 195e5b0009..423f0d233b 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.IdUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO; import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties; import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer; @@ -24,13 +25,26 @@ import java.time.LocalDateTime; * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器 *
* 协议:HTTP、MQTT + * 参考:... * * @author haohao */ @Slf4j public class IotDeviceUpstreamServer { - private static final String PROPERTY_POST_TOPIC = "/event/property/post"; + // 设备上报属性 标准 JSON + // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/property/post + // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply + // 设备上报事件 标准 JSON + // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post + // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply + + private static final String SYS_TOPIC_PREFIX = "/sys/"; + private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post"; + private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/"; + private static final String EVENT_POST_TOPIC_SUFFIX = "/post"; + private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒) private static final int QOS_LEVEL = 1; @@ -111,26 +125,41 @@ public class IotDeviceUpstreamServer { * 处理 MQTT 消息 */ private void handleMessage(String topic, String payload) { - // 处理设备属性上报消息 - if (topic.contains(PROPERTY_POST_TOPIC)) { - handlePropertyPost(topic, payload); + // 校验前缀 + if (!topic.startsWith(SYS_TOPIC_PREFIX)) { + log.warn("[handleMessage][未知的消息类型][topic: {}]", topic); + return; } + + // 处理设备属性上报消息 + if (topic.endsWith(PROPERTY_POST_TOPIC)) { + log.info("[handleMessage][接收到设备属性上报][topic: {}]", topic); + handlePropertyPost(topic, payload); + return; + } + + // 处理设备事件上报消息 + if (topic.contains(EVENT_POST_TOPIC_PREFIX) && topic.endsWith(EVENT_POST_TOPIC_SUFFIX)) { + log.info("[handleMessage][接收到设备事件上报][topic: {}]", topic); + handleEventPost(topic, payload); + return; + } + + // 未知消息类型 + log.warn("[handleMessage][未知的消息类型][topic: {}]", topic); } /** * 处理设备属性上报 */ private void handlePropertyPost(String topic, String payload) { + // /sys/${productKey}/${deviceName}/thing/event/property/post // 解析消息内容 JSONObject jsonObject = JSONUtil.parseObj(payload); String[] topicParts = topic.split("/"); // 构建设备属性上报请求对象 - IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO( - jsonObject, - topicParts[1], // productKey - topicParts[2] // deviceName - ); + IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts); // 调用上游 API 处理设备上报数据 deviceUpstreamApi.reportDeviceProperty(reportReqDTO); @@ -138,21 +167,52 @@ public class IotDeviceUpstreamServer { topic, JSONUtil.toJsonStr(reportReqDTO)); } + /** + * 处理设备事件上报 + */ + private void handleEventPost(String topic, String payload) { + // /sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = topic.split("/"); + + // 构建设备事件上报请求对象 + IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts); + + // 调用上游 API 处理设备上报数据 + deviceUpstreamApi.reportDeviceEvent(reportReqDTO); + log.info("[handleEventPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]", + topic, JSONUtil.toJsonStr(reportReqDTO)); + } + /** * 构建设备属性上报请求对象 */ private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, - String productKey, - String deviceName) { + String[] topicParts) { return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO() .setRequestId(jsonObject.getStr("id")) .setProcessId(IotPluginCommonUtils.getProcessId()) .setReportTime(LocalDateTime.now()) - .setProductKey(productKey) - .setDeviceName(deviceName)) + .setProductKey(topicParts[2]) + .setDeviceName(topicParts[3])) .setProperties(jsonObject.getJSONObject("params")); } + /** + * 构建设备事件上报请求对象 + */ + private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) { + return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO() + .setRequestId(jsonObject.getStr("id")) + .setProcessId(IotPluginCommonUtils.getProcessId()) + .setReportTime(LocalDateTime.now()) + .setProductKey(topicParts[2]) + .setDeviceName(topicParts[3])) + .setIdentifier(topicParts[4]) + .setParams(jsonObject.getJSONObject("params")); + } + /** * 重连 MQTT 客户端 */ diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml index eb9f9f6a96..9343d3614e 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml @@ -15,5 +15,5 @@ yudao: mqtt-ssl: false mqtt-username: yudao mqtt-password: 123456 - mqtt-topics: "/+/#" + mqtt-topics: "/sys/#" auth-port: 8101