diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java index 195e5b0009..423f0d233b 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.IdUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; +import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO; import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties; import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer; @@ -24,13 +25,26 @@ import java.time.LocalDateTime; * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器 *

* 协议:HTTP、MQTT + * 参考:... * * @author haohao */ @Slf4j public class IotDeviceUpstreamServer { - private static final String PROPERTY_POST_TOPIC = "/event/property/post"; + // 设备上报属性 标准 JSON + // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/property/post + // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply + // 设备上报事件 标准 JSON + // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post + // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply + + private static final String SYS_TOPIC_PREFIX = "/sys/"; + private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post"; + private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/"; + private static final String EVENT_POST_TOPIC_SUFFIX = "/post"; + private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒) private static final int QOS_LEVEL = 1; @@ -111,26 +125,41 @@ public class IotDeviceUpstreamServer { * 处理 MQTT 消息 */ private void handleMessage(String topic, String payload) { - // 处理设备属性上报消息 - if (topic.contains(PROPERTY_POST_TOPIC)) { - handlePropertyPost(topic, payload); + // 校验前缀 + if (!topic.startsWith(SYS_TOPIC_PREFIX)) { + log.warn("[handleMessage][未知的消息类型][topic: {}]", topic); + return; } + + // 处理设备属性上报消息 + if (topic.endsWith(PROPERTY_POST_TOPIC)) { + log.info("[handleMessage][接收到设备属性上报][topic: {}]", topic); + handlePropertyPost(topic, payload); + return; + } + + // 处理设备事件上报消息 + if (topic.contains(EVENT_POST_TOPIC_PREFIX) && topic.endsWith(EVENT_POST_TOPIC_SUFFIX)) { + log.info("[handleMessage][接收到设备事件上报][topic: {}]", topic); + handleEventPost(topic, payload); + return; + } + + // 未知消息类型 + log.warn("[handleMessage][未知的消息类型][topic: {}]", topic); } /** * 处理设备属性上报 */ private void handlePropertyPost(String topic, String payload) { + // /sys/${productKey}/${deviceName}/thing/event/property/post // 解析消息内容 JSONObject jsonObject = JSONUtil.parseObj(payload); String[] topicParts = topic.split("/"); // 构建设备属性上报请求对象 - IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO( - jsonObject, - topicParts[1], // productKey - topicParts[2] // deviceName - ); + IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts); // 调用上游 API 处理设备上报数据 deviceUpstreamApi.reportDeviceProperty(reportReqDTO); @@ -138,21 +167,52 @@ public class IotDeviceUpstreamServer { topic, JSONUtil.toJsonStr(reportReqDTO)); } + /** + * 处理设备事件上报 + */ + private void handleEventPost(String topic, String payload) { + // /sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = topic.split("/"); + + // 构建设备事件上报请求对象 + IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts); + + // 调用上游 API 处理设备上报数据 + deviceUpstreamApi.reportDeviceEvent(reportReqDTO); + log.info("[handleEventPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]", + topic, JSONUtil.toJsonStr(reportReqDTO)); + } + /** * 构建设备属性上报请求对象 */ private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, - String productKey, - String deviceName) { + String[] topicParts) { return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO() .setRequestId(jsonObject.getStr("id")) .setProcessId(IotPluginCommonUtils.getProcessId()) .setReportTime(LocalDateTime.now()) - .setProductKey(productKey) - .setDeviceName(deviceName)) + .setProductKey(topicParts[2]) + .setDeviceName(topicParts[3])) .setProperties(jsonObject.getJSONObject("params")); } + /** + * 构建设备事件上报请求对象 + */ + private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) { + return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO() + .setRequestId(jsonObject.getStr("id")) + .setProcessId(IotPluginCommonUtils.getProcessId()) + .setReportTime(LocalDateTime.now()) + .setProductKey(topicParts[2]) + .setDeviceName(topicParts[3])) + .setIdentifier(topicParts[4]) + .setParams(jsonObject.getJSONObject("params")); + } + /** * 重连 MQTT 客户端 */ diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml index eb9f9f6a96..9343d3614e 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml @@ -15,5 +15,5 @@ yudao: mqtt-ssl: false mqtt-username: yudao mqtt-password: 123456 - mqtt-topics: "/+/#" + mqtt-topics: "/sys/#" auth-port: 8101