fix:【IoT 物联网】修复设备属性上报和配置设置主题,优化 MQTT 消息处理逻辑,更新相关配置文件
This commit is contained in:
parent
c658ac69c0
commit
a4e80d45fe
|
@ -144,12 +144,13 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
|||
|
||||
// 3. 回复消息。前提:非 _reply 消息,并且非禁用回复的消息
|
||||
if (IotDeviceMessageUtils.isReplyMessage(message)
|
||||
|| IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod())
|
||||
|| StrUtil.isEmpty(message.getServerId())) {
|
||||
|| IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod())
|
||||
|| StrUtil.isEmpty(message.getServerId())) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
IotDeviceMessage replyMessage = IotDeviceMessage.replyOf(message.getRequestId(), message.getMethod(), replyData,
|
||||
IotDeviceMessage replyMessage = IotDeviceMessage.replyOf(message.getRequestId(), message.getMethod(),
|
||||
replyData,
|
||||
serviceException != null ? serviceException.getCode() : null,
|
||||
serviceException != null ? serviceException.getMessage() : null);
|
||||
sendDeviceMessage(replyMessage, device, message.getServerId());
|
||||
|
@ -175,7 +176,8 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
|||
}
|
||||
|
||||
// 属性上报
|
||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_REPORT.getMethod())) {
|
||||
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_REPORT.getMethod()) ||
|
||||
Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) {
|
||||
devicePropertyService.saveDeviceProperty(device, message);
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
|
|||
|
||||
// ========== 设备属性 ==========
|
||||
// 可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
|
||||
PROPERTY_POST("thing.property.post", true),
|
||||
PROPERTY_REPORT("thing.property.report", true),
|
||||
|
||||
PROPERTY_SET("thing.property.set", false),
|
||||
|
@ -33,7 +34,8 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
|
|||
|
||||
;
|
||||
|
||||
public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod).toArray(String[]::new);
|
||||
public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod)
|
||||
.toArray(String[]::new);
|
||||
|
||||
/**
|
||||
* 不进行 reply 回复的方法集合
|
||||
|
|
|
@ -127,6 +127,10 @@ public class IotGatewayProperties {
|
|||
* MQTT 是否开启 SSL
|
||||
*/
|
||||
private Boolean mqttSsl;
|
||||
/**
|
||||
* MQTT客户端 ID
|
||||
*/
|
||||
private String mqttClientId;
|
||||
/**
|
||||
* MQTT 主题
|
||||
*/
|
||||
|
|
|
@ -14,52 +14,40 @@ import lombok.RequiredArgsConstructor;
|
|||
@Getter
|
||||
public enum IotDeviceTopicEnum {
|
||||
|
||||
// TODO @haohao:SYS_TOPIC_PREFIX、SERVICE_TOPIC_PREFIX、REPLY_SUFFIX 类似这种,要不搞成这个里面的静态变量?不是枚举值
|
||||
/**
|
||||
* 系统主题前缀
|
||||
*/
|
||||
SYS_TOPIC_PREFIX("/sys/", "系统主题前缀"),
|
||||
|
||||
/**
|
||||
* 服务调用主题前缀
|
||||
*/
|
||||
SERVICE_TOPIC_PREFIX("/thing/service/", "服务调用主题前缀"),
|
||||
|
||||
// TODO @haohao:注释时,中英文之间,有个空格;
|
||||
/**
|
||||
* 设备属性设置主题
|
||||
* 请求 Topic:/sys/${productKey}/${deviceName}/thing/service/property/set
|
||||
* 响应 Topic:/sys/${productKey}/${deviceName}/thing/service/property/set_reply
|
||||
* 请求 Topic:/sys/${productKey}/${deviceName}/thing/property/set
|
||||
* 响应 Topic:/sys/${productKey}/${deviceName}/thing/property/set_reply
|
||||
*/
|
||||
PROPERTY_SET_TOPIC("/thing/service/property/set", "设备属性设置主题"),
|
||||
PROPERTY_SET_TOPIC("/thing/property/set", "设备属性设置主题"),
|
||||
|
||||
/**
|
||||
* 设备属性获取主题
|
||||
* 请求 Topic:/sys/${productKey}/${deviceName}/thing/service/property/get
|
||||
* 响应 Topic:/sys/${productKey}/${deviceName}/thing/service/property/get_reply
|
||||
* 请求 Topic:/sys/${productKey}/${deviceName}/thing/property/get
|
||||
* 响应 Topic:/sys/${productKey}/${deviceName}/thing/property/get_reply
|
||||
*/
|
||||
PROPERTY_GET_TOPIC("/thing/service/property/get", "设备属性获取主题"),
|
||||
PROPERTY_GET_TOPIC("/thing/property/get", "设备属性获取主题"),
|
||||
|
||||
/**
|
||||
* 设备配置设置主题
|
||||
* 请求 Topic:/sys/${productKey}/${deviceName}/thing/service/config/set
|
||||
* 响应 Topic:/sys/${productKey}/${deviceName}/thing/service/config/set_reply
|
||||
* 请求 Topic:/sys/${productKey}/${deviceName}/thing/config/set
|
||||
* 响应 Topic:/sys/${productKey}/${deviceName}/thing/config/set_reply
|
||||
*/
|
||||
CONFIG_SET_TOPIC("/thing/service/config/set", "设备配置设置主题"),
|
||||
CONFIG_SET_TOPIC("/thing/config/set", "设备配置设置主题"),
|
||||
|
||||
/**
|
||||
* 设备OTA升级主题
|
||||
* 请求 Topic:/sys/${productKey}/${deviceName}/thing/service/ota/upgrade
|
||||
* 响应 Topic:/sys/${productKey}/${deviceName}/thing/service/ota/upgrade_reply
|
||||
* 设备 OTA 升级主题
|
||||
* 请求 Topic:/sys/${productKey}/${deviceName}/thing/ota/upgrade
|
||||
* 响应 Topic:/sys/${productKey}/${deviceName}/thing/ota/upgrade_reply
|
||||
*/
|
||||
OTA_UPGRADE_TOPIC("/thing/service/ota/upgrade", "设备OTA升级主题"),
|
||||
OTA_UPGRADE_TOPIC("/thing/ota/upgrade", "设备 OTA 升级主题"),
|
||||
|
||||
/**
|
||||
* 设备属性上报主题
|
||||
* 请求 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post
|
||||
* 响应 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply
|
||||
*/
|
||||
PROPERTY_POST_TOPIC("/thing/event/property/post", "设备属性上报主题"),
|
||||
PROPERTY_POST_TOPIC("/thing/property/post", "设备属性上报主题"),
|
||||
|
||||
/**
|
||||
* 设备事件上报主题前缀
|
||||
|
@ -69,95 +57,194 @@ public enum IotDeviceTopicEnum {
|
|||
/**
|
||||
* 设备事件上报主题后缀
|
||||
*/
|
||||
EVENT_POST_TOPIC_SUFFIX("/post", "设备事件上报主题后缀"),
|
||||
EVENT_POST_TOPIC_SUFFIX("/post", "设备事件上报主题后缀");
|
||||
|
||||
// ========== 静态常量 ==========
|
||||
|
||||
/**
|
||||
* 系统主题前缀
|
||||
*/
|
||||
public static final String SYS_TOPIC_PREFIX = "/sys/";
|
||||
|
||||
/**
|
||||
* 服务调用主题前缀
|
||||
*/
|
||||
public static final String SERVICE_TOPIC_PREFIX = "/thing/";
|
||||
|
||||
/**
|
||||
* 响应主题后缀
|
||||
*/
|
||||
REPLY_SUFFIX("_reply", "响应主题后缀");
|
||||
public static final String REPLY_SUFFIX = "_reply";
|
||||
|
||||
// ========== 方法常量 ==========
|
||||
|
||||
/**
|
||||
* 服务方法前缀
|
||||
*/
|
||||
public static final String SERVICE_METHOD_PREFIX = "thing.";
|
||||
|
||||
/**
|
||||
* 属性服务方法前缀
|
||||
*/
|
||||
public static final String PROPERTY_SERVICE_METHOD_PREFIX = "thing.property.";
|
||||
|
||||
/**
|
||||
* 配置服务方法前缀
|
||||
*/
|
||||
public static final String CONFIG_SERVICE_METHOD_PREFIX = "thing.config.";
|
||||
|
||||
/**
|
||||
* OTA 服务方法前缀
|
||||
*/
|
||||
public static final String OTA_SERVICE_METHOD_PREFIX = "thing.ota.";
|
||||
|
||||
/**
|
||||
* 属性设置方法
|
||||
*/
|
||||
public static final String PROPERTY_SET_METHOD = "thing.property.set";
|
||||
|
||||
/**
|
||||
* 属性获取方法
|
||||
*/
|
||||
public static final String PROPERTY_GET_METHOD = "thing.property.get";
|
||||
|
||||
// ========== 主题匹配常量 ==========
|
||||
|
||||
/**
|
||||
* 事件上报主题模式
|
||||
*/
|
||||
public static final String EVENT_POST_TOPIC_PATTERN = "/thing/event/";
|
||||
|
||||
/**
|
||||
* 主题后缀:post
|
||||
*/
|
||||
public static final String POST_SUFFIX = "/post";
|
||||
|
||||
/**
|
||||
* 属性上报主题后缀
|
||||
*/
|
||||
public static final String PROPERTY_POST_SUFFIX = "/thing/property/post";
|
||||
|
||||
/**
|
||||
* 属性设置响应主题包含
|
||||
*/
|
||||
public static final String PROPERTY_SET_TOPIC_CONTAINS = "/thing/property/set";
|
||||
|
||||
/**
|
||||
* 属性获取响应主题包含
|
||||
*/
|
||||
public static final String PROPERTY_GET_TOPIC_CONTAINS = "/thing/property/get";
|
||||
|
||||
// ========== MQTT 认证路径常量 ==========
|
||||
|
||||
/**
|
||||
* MQTT 认证路径
|
||||
*/
|
||||
public static final String MQTT_AUTH_AUTHENTICATE_PATH = "/mqtt/auth/authenticate";
|
||||
|
||||
/**
|
||||
* MQTT 连接事件路径
|
||||
*/
|
||||
public static final String MQTT_AUTH_CONNECTED_PATH = "/mqtt/auth/connected";
|
||||
|
||||
/**
|
||||
* MQTT 断开事件路径
|
||||
*/
|
||||
public static final String MQTT_AUTH_DISCONNECTED_PATH = "/mqtt/auth/disconnected";
|
||||
|
||||
private final String topic;
|
||||
private final String description;
|
||||
|
||||
// ========== 工具方法 ==========
|
||||
|
||||
/**
|
||||
* 构建设备主题前缀
|
||||
*
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @return 设备主题前缀:/sys/{productKey}/{deviceName}
|
||||
*/
|
||||
private static String buildDeviceTopicPrefix(String productKey, String deviceName) {
|
||||
return SYS_TOPIC_PREFIX + productKey + "/" + deviceName;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建设备服务调用主题
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @param serviceIdentifier 服务标识符
|
||||
* @return 完整的主题路径
|
||||
*/
|
||||
public static String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) {
|
||||
// TODO @haohao:貌似 SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName 是统一的;
|
||||
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName +
|
||||
SERVICE_TOPIC_PREFIX.getTopic() + serviceIdentifier;
|
||||
return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建设备属性设置主题
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @return 完整的主题路径
|
||||
*/
|
||||
public static String buildPropertySetTopic(String productKey, String deviceName) {
|
||||
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + PROPERTY_SET_TOPIC.getTopic();
|
||||
return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_SET_TOPIC.getTopic();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建设备属性获取主题
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @return 完整的主题路径
|
||||
*/
|
||||
public static String buildPropertyGetTopic(String productKey, String deviceName) {
|
||||
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + PROPERTY_GET_TOPIC.getTopic();
|
||||
return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_GET_TOPIC.getTopic();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建设备配置设置主题
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @return 完整的主题路径
|
||||
*/
|
||||
public static String buildConfigSetTopic(String productKey, String deviceName) {
|
||||
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + CONFIG_SET_TOPIC.getTopic();
|
||||
return buildDeviceTopicPrefix(productKey, deviceName) + CONFIG_SET_TOPIC.getTopic();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建设备 OTA 升级主题
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @return 完整的主题路径
|
||||
*/
|
||||
public static String buildOtaUpgradeTopic(String productKey, String deviceName) {
|
||||
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + OTA_UPGRADE_TOPIC.getTopic();
|
||||
return buildDeviceTopicPrefix(productKey, deviceName) + OTA_UPGRADE_TOPIC.getTopic();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建设备属性上报主题
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @return 完整的主题路径
|
||||
*/
|
||||
public static String buildPropertyPostTopic(String productKey, String deviceName) {
|
||||
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + PROPERTY_POST_TOPIC.getTopic();
|
||||
return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_POST_TOPIC.getTopic();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建设备事件上报主题
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @param eventIdentifier 事件标识符
|
||||
* @return 完整的主题路径
|
||||
*/
|
||||
public static String buildEventPostTopic(String productKey, String deviceName, String eventIdentifier) {
|
||||
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName +
|
||||
return buildDeviceTopicPrefix(productKey, deviceName) +
|
||||
EVENT_POST_TOPIC_PREFIX.getTopic() + eventIdentifier + EVENT_POST_TOPIC_SUFFIX.getTopic();
|
||||
}
|
||||
|
||||
|
@ -168,7 +255,7 @@ public enum IotDeviceTopicEnum {
|
|||
* @return 响应主题
|
||||
*/
|
||||
public static String getReplyTopic(String requestTopic) {
|
||||
return requestTopic + REPLY_SUFFIX.getTopic();
|
||||
return requestTopic + REPLY_SUFFIX;
|
||||
}
|
||||
|
||||
}
|
|
@ -105,33 +105,33 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
}
|
||||
|
||||
// 属性相关操作
|
||||
if (method.startsWith("thing.service.property.")) {
|
||||
if ("thing.service.property.set".equals(method)) {
|
||||
if (method.startsWith(IotDeviceTopicEnum.PROPERTY_SERVICE_METHOD_PREFIX)) {
|
||||
if (IotDeviceTopicEnum.PROPERTY_SET_METHOD.equals(method)) {
|
||||
return IotDeviceTopicEnum.buildPropertySetTopic(productKey, deviceName);
|
||||
} else if ("thing.service.property.get".equals(method)) {
|
||||
} else if (IotDeviceTopicEnum.PROPERTY_GET_METHOD.equals(method)) {
|
||||
return IotDeviceTopicEnum.buildPropertyGetTopic(productKey, deviceName);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// 配置设置操作
|
||||
if (method.startsWith("thing.service.config.")) {
|
||||
if (method.startsWith(IotDeviceTopicEnum.CONFIG_SERVICE_METHOD_PREFIX)) {
|
||||
return IotDeviceTopicEnum.buildConfigSetTopic(productKey, deviceName);
|
||||
}
|
||||
|
||||
// OTA 升级操作
|
||||
if (method.startsWith("thing.service.ota.")) {
|
||||
if (method.startsWith(IotDeviceTopicEnum.OTA_SERVICE_METHOD_PREFIX)) {
|
||||
return IotDeviceTopicEnum.buildOtaUpgradeTopic(productKey, deviceName);
|
||||
}
|
||||
|
||||
// 一般服务调用操作
|
||||
if (method.startsWith("thing.service.")) {
|
||||
// 排除属性、配置、OTA相关的服务调用
|
||||
if (method.startsWith(IotDeviceTopicEnum.SERVICE_METHOD_PREFIX)) {
|
||||
// 排除属性、配置、OTA 相关的服务调用
|
||||
if (method.contains("property") || method.contains("config") || method.contains("ota")) {
|
||||
return null; // 已在上面处理
|
||||
}
|
||||
// 从方法中提取服务标识符
|
||||
String serviceIdentifier = method.substring("thing.service.".length());
|
||||
String serviceIdentifier = method.substring(IotDeviceTopicEnum.SERVICE_METHOD_PREFIX.length());
|
||||
return IotDeviceTopicEnum.buildServiceTopic(productKey, deviceName, serviceIdentifier);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
|
@ -137,9 +137,9 @@ public class IotMqttUpstreamProtocol {
|
|||
this.authHandler = new IotMqttHttpAuthHandler();
|
||||
|
||||
// 添加认证路由
|
||||
router.post("/mqtt/auth/authenticate").handler(authHandler::authenticate);
|
||||
router.post("/mqtt/auth/connected").handler(authHandler::connected);
|
||||
router.post("/mqtt/auth/disconnected").handler(authHandler::disconnected);
|
||||
router.post(IotDeviceTopicEnum.MQTT_AUTH_AUTHENTICATE_PATH).handler(authHandler::authenticate);
|
||||
router.post(IotDeviceTopicEnum.MQTT_AUTH_CONNECTED_PATH).handler(authHandler::connected);
|
||||
router.post(IotDeviceTopicEnum.MQTT_AUTH_DISCONNECTED_PATH).handler(authHandler::disconnected);
|
||||
|
||||
// 启动 HTTP 服务器
|
||||
int authPort = emqxProperties.getHttpAuthPort();
|
||||
|
@ -176,11 +176,13 @@ public class IotMqttUpstreamProtocol {
|
|||
log.info("[startMqttClient][开始启动 MQTT 客户端]");
|
||||
|
||||
// 初始化消息处理器
|
||||
this.upstreamHandler = new IotMqttUpstreamHandler();
|
||||
this.upstreamHandler = new IotMqttUpstreamHandler(this);
|
||||
|
||||
// 创建 MQTT 客户端
|
||||
log.info("[startMqttClient][使用 MQTT 客户端 ID: {}]", emqxProperties.getMqttClientId());
|
||||
|
||||
MqttClientOptions options = new MqttClientOptions()
|
||||
.setClientId("yudao-iot-gateway-" + IdUtil.fastSimpleUUID())
|
||||
.setClientId(emqxProperties.getMqttClientId())
|
||||
.setUsername(emqxProperties.getMqttUsername())
|
||||
.setPassword(emqxProperties.getMqttPassword())
|
||||
.setSsl(ObjUtil.defaultIfNull(emqxProperties.getMqttSsl(), false));
|
||||
|
|
|
@ -4,6 +4,7 @@ import cn.hutool.core.util.StrUtil;
|
|||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.mqtt.messages.MqttPublishMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -22,10 +23,12 @@ public class IotMqttUpstreamHandler {
|
|||
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
private final String serverId;
|
||||
|
||||
public IotMqttUpstreamHandler() {
|
||||
public IotMqttUpstreamHandler(cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol protocol) {
|
||||
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
this.serverId = protocol.getServerId();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -92,8 +95,8 @@ public class IotMqttUpstreamHandler {
|
|||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息到队列
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
// 发送消息到队列(需要补充设备信息)
|
||||
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
|
||||
|
||||
// 记录成功日志
|
||||
log.info("[processMessage][处理{}成功][topic: {}]", messageType, topic);
|
||||
|
@ -106,28 +109,111 @@ public class IotMqttUpstreamHandler {
|
|||
* @return 消息类型描述,如果不支持返回 null
|
||||
*/
|
||||
private String getMessageType(String topic) {
|
||||
// 设备事件上报: /sys/{productKey}/{deviceName}/thing/event/{eventIdentifier}/post
|
||||
if (topic.contains("/thing/event/") && topic.endsWith("/post")) {
|
||||
if (StrUtil.isBlank(topic)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 按优先级匹配主题类型,避免误匹配
|
||||
|
||||
// 1. 设备属性上报: /sys/{productKey}/{deviceName}/thing/event/property/post
|
||||
if (isPropertyPostTopic(topic)) {
|
||||
return IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getDescription();
|
||||
}
|
||||
|
||||
// 2. 设备事件上报: /sys/{productKey}/{deviceName}/thing/event/{eventIdentifier}/post
|
||||
if (isEventPostTopic(topic)) {
|
||||
return "设备事件上报";
|
||||
}
|
||||
|
||||
// 设备属性操作: /sys/{productKey}/{deviceName}/thing/property/post
|
||||
// 或属性响应: /sys/{productKey}/{deviceName}/thing/service/property/set_reply
|
||||
if (topic.endsWith("/thing/property/post") ||
|
||||
topic.contains("/thing/service/property/set") ||
|
||||
topic.contains("/thing/service/property/get")) {
|
||||
return "设备属性操作";
|
||||
// 3. 设备属性设置响应: /sys/{productKey}/{deviceName}/thing/service/property/set_reply
|
||||
if (isPropertySetReplyTopic(topic)) {
|
||||
return "设备属性设置响应";
|
||||
}
|
||||
|
||||
// 设备服务调用: /sys/{productKey}/{deviceName}/thing/service/{serviceIdentifier}
|
||||
if (topic.contains("/thing/service/") && !topic.contains("/property/")) {
|
||||
return "设备服务调用";
|
||||
// 4. 设备属性获取响应: /sys/{productKey}/{deviceName}/thing/service/property/get_reply
|
||||
if (isPropertyGetReplyTopic(topic)) {
|
||||
return "设备属性获取响应";
|
||||
}
|
||||
|
||||
// 5. 设备配置设置响应: /sys/{productKey}/{deviceName}/thing/service/config/set_reply
|
||||
if (isConfigSetReplyTopic(topic)) {
|
||||
return IotDeviceTopicEnum.CONFIG_SET_TOPIC.getDescription() + "响应";
|
||||
}
|
||||
|
||||
// 6. 设备 OTA 升级响应:
|
||||
// /sys/{productKey}/{deviceName}/thing/service/ota/upgrade_reply
|
||||
if (isOtaUpgradeReplyTopic(topic)) {
|
||||
return IotDeviceTopicEnum.OTA_UPGRADE_TOPIC.getDescription() + "响应";
|
||||
}
|
||||
|
||||
// 7. 其他服务调用响应: 通用服务调用响应
|
||||
if (isServiceReplyTopic(topic)) {
|
||||
return "设备服务调用响应";
|
||||
}
|
||||
|
||||
// 不支持的消息类型
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为属性上报主题
|
||||
*/
|
||||
private boolean isPropertyPostTopic(String topic) {
|
||||
return topic.contains(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic());
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为事件上报主题
|
||||
*/
|
||||
private boolean isEventPostTopic(String topic) {
|
||||
return topic.contains(IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic())
|
||||
&& topic.endsWith(IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic())
|
||||
&& !topic.contains("property"); // 排除属性上报主题
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为属性设置响应主题
|
||||
*/
|
||||
private boolean isPropertySetReplyTopic(String topic) {
|
||||
return topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic())
|
||||
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为属性获取响应主题
|
||||
*/
|
||||
private boolean isPropertyGetReplyTopic(String topic) {
|
||||
return topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic())
|
||||
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为配置设置响应主题
|
||||
*/
|
||||
private boolean isConfigSetReplyTopic(String topic) {
|
||||
return topic.contains(IotDeviceTopicEnum.CONFIG_SET_TOPIC.getTopic())
|
||||
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为 OTA 升级响应主题
|
||||
*/
|
||||
private boolean isOtaUpgradeReplyTopic(String topic) {
|
||||
return topic.contains(IotDeviceTopicEnum.OTA_UPGRADE_TOPIC.getTopic())
|
||||
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为服务调用响应主题(排除已处理的特殊服务)
|
||||
*/
|
||||
private boolean isServiceReplyTopic(String topic) {
|
||||
return topic.contains(IotDeviceTopicEnum.SERVICE_TOPIC_PREFIX)
|
||||
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX)
|
||||
&& !topic.contains("property")
|
||||
&& !topic.contains("config")
|
||||
&& !topic.contains("ota");
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析主题,获取主题各部分
|
||||
*
|
||||
|
|
|
@ -39,9 +39,7 @@ yudao:
|
|||
mqtt-password: public # MQTT 密码
|
||||
mqtt-ssl: false # 是否开启 SSL
|
||||
mqtt-topics:
|
||||
- "/sys/#" # 系统主题(设备上报)
|
||||
- "/ota/#" # OTA 升级主题
|
||||
- "/config/#" # 配置主题
|
||||
- "/sys/#" # 系统主题
|
||||
|
||||
# 消息总线配置
|
||||
message-bus:
|
||||
|
|
|
@ -40,9 +40,7 @@ yudao:
|
|||
enabled: true
|
||||
mqtt-ssl: false
|
||||
mqtt-topics:
|
||||
- "/sys/#" # 系统主题(设备上报)
|
||||
- "/ota/#" # OTA 升级主题
|
||||
- "/config/#" # 配置主题
|
||||
- "/sys/#" # 系统主题
|
||||
|
||||
# 消息总线配置
|
||||
message-bus:
|
||||
|
|
Loading…
Reference in New Issue