【功能完善】IoT: 添加 MQTT 消息处理器,重构设备属性和事件上报逻辑,优化消息处理流程

This commit is contained in:
安浩浩 2025-02-25 09:51:39 +08:00
parent 4746281df9
commit 4cefea6880
5 changed files with 212 additions and 141 deletions

View File

@ -16,7 +16,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/** /**
* IoT 备服务设 Vertx Handler * IoT 设备属性 Vertx Handler
* *
* 芋道源码 * 芋道源码
*/ */

View File

@ -1,9 +1,7 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.config; package cn.iocoder.yudao.module.iot.plugin.emqx.config;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler; import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.plugin.emqx.downstream.IotDeviceDownstreamHandlerImpl; import cn.iocoder.yudao.module.iot.plugin.emqx.downstream.IotDeviceDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.IotDeviceUpstreamServer; import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.IotDeviceUpstreamServer;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
@ -21,10 +19,8 @@ public class IotPluginEmqxAutoConfiguration {
@Bean(initMethod = "start", destroyMethod = "stop") @Bean(initMethod = "start", destroyMethod = "stop")
public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi, public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi,
IotPluginCommonProperties commonProperties, IotPluginEmqxProperties emqxProperties) {
IotPluginEmqxProperties emqxProperties, return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi);
IotDeviceDownstreamServer deviceDownstreamServer) {
return new IotDeviceUpstreamServer(commonProperties, emqxProperties, deviceUpstreamApi, deviceDownstreamServer);
} }
@Bean @Bean

View File

@ -14,6 +14,9 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
@Override @Override
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) { public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
// 设备服务调用
// 请求Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}
// 响应Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply
return CommonResult.success(true); return CommonResult.success(true);
} }
@ -24,6 +27,9 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
@Override @Override
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) { public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
// 设置设备属性 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/service/property/set
// 响应Topic/sys/${productKey}/${deviceName}/thing/service/property/set_reply
return CommonResult.success(true); return CommonResult.success(true);
} }

View File

@ -1,16 +1,11 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.upstream; package cn.iocoder.yudao.module.iot.plugin.emqx.upstream;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties; import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler; import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceMqttMessageHandler;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router; import io.vertx.ext.web.Router;
@ -19,47 +14,28 @@ import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions; import io.vertx.mqtt.MqttClientOptions;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
/** /**
* IoT 设备下行服务端接收来自 device 设备的请求转发给 server 服务器 * IoT 设备下行服务端接收来自 device 设备的请求转发给 server 服务器
* <p> * <p>
* 协议HTTPMQTT * 协议HTTPMQTT
* 参考<a href=
* "https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services?spm=a2c4g.11186623.0.0.97a72915vRck44#section-g4j-5zg-12b">...</a>
* *
* @author haohao * @author haohao
*/ */
@Slf4j @Slf4j
public class IotDeviceUpstreamServer { public class IotDeviceUpstreamServer {
// 设备上报属性 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/event/property/post
// 响应Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply
// 设备上报事件 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
// 响应Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply
private static final String SYS_TOPIC_PREFIX = "/sys/";
private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post";
private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/";
private static final String EVENT_POST_TOPIC_SUFFIX = "/post";
private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒) private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒)
private static final int QOS_LEVEL = 1;
private final Vertx vertx; private final Vertx vertx;
private final HttpServer server; private final HttpServer server;
private final MqttClient client; private final MqttClient client;
private final IotPluginEmqxProperties emqxProperties; private final IotPluginEmqxProperties emqxProperties;
private final IotDeviceUpstreamApi deviceUpstreamApi; private final IotDeviceMqttMessageHandler mqttMessageHandler;
public IotDeviceUpstreamServer(IotPluginCommonProperties commonProperties, public IotDeviceUpstreamServer(IotPluginEmqxProperties emqxProperties,
IotPluginEmqxProperties emqxProperties, IotDeviceUpstreamApi deviceUpstreamApi) {
IotDeviceUpstreamApi deviceUpstreamApi,
IotDeviceDownstreamServer deviceDownstreamServer) {
this.emqxProperties = emqxProperties; this.emqxProperties = emqxProperties;
this.deviceUpstreamApi = deviceUpstreamApi;
// 创建 Vertx 实例 // 创建 Vertx 实例
this.vertx = Vertx.vertx(); this.vertx = Vertx.vertx();
// 创建 Router 实例 // 创建 Router 实例
@ -77,6 +53,7 @@ public class IotDeviceUpstreamServer {
.setPassword(emqxProperties.getMqttPassword()) .setPassword(emqxProperties.getMqttPassword())
.setSsl(emqxProperties.isMqttSsl()); .setSsl(emqxProperties.isMqttSsl());
client = MqttClient.create(vertx, options); client = MqttClient.create(vertx, options);
this.mqttMessageHandler = new IotDeviceMqttMessageHandler(deviceUpstreamApi, client);
} }
/** /**
@ -108,109 +85,7 @@ public class IotDeviceUpstreamServer {
* 设置 MQTT 消息处理器 * 设置 MQTT 消息处理器
*/ */
private void setupMessageHandler() { private void setupMessageHandler() {
client.publishHandler(message -> { client.publishHandler(mqttMessageHandler::handle);
String topic = message.topicName();
String payload = message.payload().toString();
log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload);
try {
handleMessage(topic, payload);
} catch (Exception e) {
log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
}
});
}
/**
* 处理 MQTT 消息
*/
private void handleMessage(String topic, String payload) {
// 校验前缀
if (!topic.startsWith(SYS_TOPIC_PREFIX)) {
log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
return;
}
// 处理设备属性上报消息
if (topic.endsWith(PROPERTY_POST_TOPIC)) {
log.info("[handleMessage][接收到设备属性上报][topic: {}]", topic);
handlePropertyPost(topic, payload);
return;
}
// 处理设备事件上报消息
if (topic.contains(EVENT_POST_TOPIC_PREFIX) && topic.endsWith(EVENT_POST_TOPIC_SUFFIX)) {
log.info("[handleMessage][接收到设备事件上报][topic: {}]", topic);
handleEventPost(topic, payload);
return;
}
// 未知消息类型
log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
}
/**
* 处理设备属性上报
*/
private void handlePropertyPost(String topic, String payload) {
// /sys/${productKey}/${deviceName}/thing/event/property/post
// 解析消息内容
JSONObject jsonObject = JSONUtil.parseObj(payload);
String[] topicParts = topic.split("/");
// 构建设备属性上报请求对象
IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts);
// 调用上游 API 处理设备上报数据
deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
log.info("[handlePropertyPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]",
topic, JSONUtil.toJsonStr(reportReqDTO));
}
/**
* 处理设备事件上报
*/
private void handleEventPost(String topic, String payload) {
// /sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
// 解析消息内容
JSONObject jsonObject = JSONUtil.parseObj(payload);
String[] topicParts = topic.split("/");
// 构建设备事件上报请求对象
IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts);
// 调用上游 API 处理设备上报数据
deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
log.info("[handleEventPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]",
topic, JSONUtil.toJsonStr(reportReqDTO));
}
/**
* 构建设备属性上报请求对象
*/
private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject,
String[] topicParts) {
return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
.setRequestId(jsonObject.getStr("id"))
.setProcessId(IotPluginCommonUtils.getProcessId())
.setReportTime(LocalDateTime.now())
.setProductKey(topicParts[2])
.setDeviceName(topicParts[3]))
.setProperties(jsonObject.getJSONObject("params"));
}
/**
* 构建设备事件上报请求对象
*/
private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) {
return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO()
.setRequestId(jsonObject.getStr("id"))
.setProcessId(IotPluginCommonUtils.getProcessId())
.setReportTime(LocalDateTime.now())
.setProductKey(topicParts[2])
.setDeviceName(topicParts[3]))
.setIdentifier(topicParts[4])
.setParams(jsonObject.getJSONObject("params"));
} }
/** /**
@ -244,7 +119,7 @@ public class IotDeviceUpstreamServer {
private void subscribeToTopics() { private void subscribeToTopics() {
String[] topics = emqxProperties.getMqttTopics().split(","); String[] topics = emqxProperties.getMqttTopics().split(",");
for (String topic : topics) { for (String topic : topics) {
client.subscribe(topic, QOS_LEVEL) client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value())
.onSuccess(v -> log.info("[subscribeToTopics][成功订阅主题: {}]", topic)) .onSuccess(v -> log.info("[subscribeToTopics][成功订阅主题: {}]", topic))
.onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err)); .onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err));
} }

View File

@ -0,0 +1,194 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
/**
* IoT 设备 MQTT 消息处理器
* <p>
* 参考
* <p>
* "<a href="https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services?spm=a2c4g.11186623.0.0.97a72915vRck44#section-g4j-5zg-12b">...</a>">
*/
@Slf4j
public class IotDeviceMqttMessageHandler {
// 设备上报属性 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/event/property/post
// 响应Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply
// 设备上报事件 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
// 响应Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply
private static final String SYS_TOPIC_PREFIX = "/sys/";
private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post";
private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/";
private static final String EVENT_POST_TOPIC_SUFFIX = "/post";
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final MqttClient mqttClient;
public IotDeviceMqttMessageHandler(IotDeviceUpstreamApi deviceUpstreamApi, MqttClient mqttClient) {
this.deviceUpstreamApi = deviceUpstreamApi;
this.mqttClient = mqttClient;
}
public void handle(MqttPublishMessage message) {
String topic = message.topicName();
String payload = message.payload().toString();
log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload);
try {
handleMessage(topic, payload);
} catch (Exception e) {
log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
}
}
private void handleMessage(String topic, String payload) {
// 校验前缀
if (!topic.startsWith(SYS_TOPIC_PREFIX)) {
log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
return;
}
// 处理设备属性上报消息
if (topic.endsWith(PROPERTY_POST_TOPIC)) {
log.info("[handleMessage][接收到设备属性上报][topic: {}]", topic);
handlePropertyPost(topic, payload);
return;
}
// 处理设备事件上报消息
if (topic.contains(EVENT_POST_TOPIC_PREFIX) && topic.endsWith(EVENT_POST_TOPIC_SUFFIX)) {
log.info("[handleMessage][接收到设备事件上报][topic: {}]", topic);
handleEventPost(topic, payload);
return;
}
// 未知消息类型
log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
}
/**
* 处理设备属性上报消息
*
* @param topic 主题
* @param payload 消息内容
*/
private void handlePropertyPost(String topic, String payload) {
// 解析消息内容
JSONObject jsonObject = JSONUtil.parseObj(payload);
String[] topicParts = topic.split("/");
// 构建设备属性上报请求对象
IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts);
// 调用上游 API 处理设备上报数据
deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
log.info("[handlePropertyPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]",
topic, JSONUtil.toJsonStr(reportReqDTO));
// 发送响应消息
String replyTopic = topic + "_reply";
JSONObject response = new JSONObject()
.set("id", jsonObject.getStr("id"))
.set("code", 200)
.set("data", new JSONObject())
.set("message", "success")
.set("method", "thing.event.property.post");
mqttClient.publish(replyTopic,
Buffer.buffer(response.toString()),
MqttQoS.AT_LEAST_ONCE,
false,
false);
log.info("[handlePropertyPost][发送响应消息成功][topic: {}][response: {}]",
replyTopic, response.toString());
}
/**
* 处理设备事件上报消息
*
* @param topic 主题
* @param payload 消息内容
*/
private void handleEventPost(String topic, String payload) {
// 解析消息内容
JSONObject jsonObject = JSONUtil.parseObj(payload);
String[] topicParts = topic.split("/");
// 构建设备事件上报请求对象
IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts);
// 调用上游 API 处理设备上报数据
deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
log.info("[handleEventPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]",
topic, JSONUtil.toJsonStr(reportReqDTO));
// 发送响应消息
String replyTopic = topic + "_reply";
String eventIdentifier = topicParts[6]; // topic 中获取事件标识符
JSONObject response = new JSONObject()
.set("id", jsonObject.getStr("id"))
.set("code", 200)
.set("data", new JSONObject())
.set("message", "success")
.set("method", "thing.event." + eventIdentifier + ".post");
mqttClient.publish(replyTopic,
Buffer.buffer(response.toString()),
MqttQoS.AT_LEAST_ONCE,
false,
false);
log.info("[handleEventPost][发送响应消息成功][topic: {}][response: {}]",
replyTopic, response.toString());
}
/**
* 构建设备属性上报请求对象
*
* @param jsonObject 消息内容
* @param topicParts 主题部分
* @return 设备属性上报请求对象
*/
private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject,
String[] topicParts) {
return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
.setRequestId(jsonObject.getStr("id"))
.setProcessId(IotPluginCommonUtils.getProcessId())
.setReportTime(LocalDateTime.now())
.setProductKey(topicParts[2])
.setDeviceName(topicParts[3]))
.setProperties(jsonObject.getJSONObject("params"));
}
/**
* 构建设备事件上报请求对象
*
* @param jsonObject 消息内容
* @param topicParts 主题部分
* @return 设备事件上报请求对象
*/
private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) {
return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO()
.setRequestId(jsonObject.getStr("id"))
.setProcessId(IotPluginCommonUtils.getProcessId())
.setReportTime(LocalDateTime.now())
.setProductKey(topicParts[2])
.setDeviceName(topicParts[3]))
.setIdentifier(topicParts[4])
.setParams(jsonObject.getJSONObject("params"));
}
}