feat:【IoT 物联网】更新设备消息处理逻辑,重构 MQTT 下行消息处理器,优化主题构建和消息发布流程

This commit is contained in:
haohao 2025-06-13 11:51:05 +08:00
parent b06556da2d
commit 569eef4a74
13 changed files with 344 additions and 749 deletions

View File

@ -22,7 +22,7 @@ import java.time.LocalDateTime;
/**
* 针对 {@link IotDeviceMessage} 的业务处理器调用 method 对应的逻辑例如说
* 1. {@link IotDeviceMessageMethodEnum#PROPERTY_REPORT} 属性上报时记录设备属性
* 1. {@link IotDeviceMessageMethodEnum#PROPERTY_POST} 属性上报时记录设备属性
*
* @author alwayssuper
*/

View File

@ -126,7 +126,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
@Override
public void handleUpstreamDeviceMessage(IotDeviceMessage message, IotDeviceDO device) {
// 1. 理消息
// 1. 理消息
Object replyData = null;
ServiceException serviceException = null;
try {
@ -175,8 +175,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
}
// 属性上报
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_REPORT.getMethod()) ||
Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) {
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) {
devicePropertyService.saveDeviceProperty(device, message);
return null;
}

View File

@ -23,15 +23,13 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
STATE_OFFLINE("thing.state.offline", true),
// ========== 设备属性 ==========
// 可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
// TODO @haohao使用 report
// 可参考
// https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
PROPERTY_POST("thing.property.post", true),
PROPERTY_REPORT("thing.property.report", true),
PROPERTY_SET("thing.property.set", false),
PROPERTY_GET("thing.property.get", false),
//
// ========== 设备事件 ==========
EVENT_POST("thing.event.post", true),
;

View File

@ -25,9 +25,9 @@ public class IotDeviceMessage {
public static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
/**
* 消息总线设备消息 Topic iot-biz 发送给 iot-gateway 的某个 server(protocol) 进行消费
* 消息总线设备消息 Topic iot-biz 发送给 iot-gateway 的某个 "server"(protocol) 进行消费
*
* 其中%s 就是该server(protocol) 的标识
* 其中%s 就是该"server"(protocol) 的标识
*/
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
@ -92,7 +92,7 @@ public class IotDeviceMessage {
*/
private String msg;
// ========== 基础方法只传递codec编解码字段 ==========
// ========== 基础方法只传递"codec编解码字段" ==========
public static IotDeviceMessage requestOf(String method) {
return requestOf(null, method, null);

View File

@ -45,7 +45,7 @@ public class IotGatewayConfiguration {
public static class MqttProtocolConfiguration {
@Bean
public IotMqttUpstreamProtocol iotMqttUnifiedProtocol(IotGatewayProperties gatewayProperties) {
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) {
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx());
}

View File

@ -134,7 +134,7 @@ public class IotGatewayProperties {
private String mqttPassword;
/**
* MQTT 是否开启 SSL默认false
* MQTT 客户端的 SSL 开关
*/
@NotNull(message = "MQTT 是否开启 SSL 不能为空")
private Boolean mqttSsl = false;
@ -145,11 +145,30 @@ public class IotGatewayProperties {
private String mqttClientId;
/**
* MQTT 主题列表
* MQTT 订阅的主题
*/
@NotEmpty(message = "MQTT 主题不能为空")
private List<String> mqttTopics;
/**
* 默认 QoS 级别
* <p>
* 0 - 最多一次
* 1 - 至少一次
* 2 - 刚好一次
*/
private Integer mqttQos = 1;
/**
* 连接超时时间
*/
private Integer connectTimeoutSeconds = 10;
/**
* 重连延迟时间毫秒
*/
private Long reconnectDelayMs = 5000L;
/**
* 获取 MQTT 客户端 ID如果未配置则自动生成
*

View File

@ -1,262 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
// TODO @haohao这个类我们是不是可以删除了哈
/**
* IoT 设备主题枚举
* <p>
* 用于统一管理 MQTT 协议中的主题常量基于 Alink 协议规范
*
* @author haohao
*/
@RequiredArgsConstructor
@Getter
public enum IotDeviceTopicEnum {
/**
* 设备属性设置主题
* 请求 Topic/sys/${productKey}/${deviceName}/thing/property/set
* 响应 Topic/sys/${productKey}/${deviceName}/thing/property/set_reply
*/
PROPERTY_SET_TOPIC("/thing/property/set", "设备属性设置主题"),
/**
* 设备属性获取主题
* 请求 Topic/sys/${productKey}/${deviceName}/thing/property/get
* 响应 Topic/sys/${productKey}/${deviceName}/thing/property/get_reply
*/
PROPERTY_GET_TOPIC("/thing/property/get", "设备属性获取主题"),
/**
* 设备配置设置主题
* 请求 Topic/sys/${productKey}/${deviceName}/thing/config/set
* 响应 Topic/sys/${productKey}/${deviceName}/thing/config/set_reply
*/
CONFIG_SET_TOPIC("/thing/config/set", "设备配置设置主题"),
/**
* 设备 OTA 升级主题
* 请求 Topic/sys/${productKey}/${deviceName}/thing/ota/upgrade
* 响应 Topic/sys/${productKey}/${deviceName}/thing/ota/upgrade_reply
*/
OTA_UPGRADE_TOPIC("/thing/ota/upgrade", "设备 OTA 升级主题"),
/**
* 设备属性上报主题
* 请求 Topic/sys/${productKey}/${deviceName}/thing/event/property/post
* 响应 Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply
*/
PROPERTY_POST_TOPIC("/thing/property/post", "设备属性上报主题"),
/**
* 设备事件上报主题前缀
*/
EVENT_POST_TOPIC_PREFIX("/thing/event/", "设备事件上报主题前缀"),
/**
* 设备事件上报主题后缀
*/
EVENT_POST_TOPIC_SUFFIX("/post", "设备事件上报主题后缀");
// ========== 静态常量 ==========
/**
* 系统主题前缀
*/
public static final String SYS_TOPIC_PREFIX = "/sys/";
/**
* 服务调用主题前缀
*/
public static final String SERVICE_TOPIC_PREFIX = "/thing/";
/**
* 响应主题后缀
*/
public static final String REPLY_SUFFIX = "_reply";
// ========== 方法常量 ==========
/**
* 服务方法前缀
*/
public static final String SERVICE_METHOD_PREFIX = "thing.";
/**
* 属性服务方法前缀
*/
public static final String PROPERTY_SERVICE_METHOD_PREFIX = "thing.property.";
/**
* 配置服务方法前缀
*/
public static final String CONFIG_SERVICE_METHOD_PREFIX = "thing.config.";
/**
* OTA 服务方法前缀
*/
public static final String OTA_SERVICE_METHOD_PREFIX = "thing.ota.";
/**
* 属性设置方法
*/
public static final String PROPERTY_SET_METHOD = "thing.property.set";
/**
* 属性获取方法
*/
public static final String PROPERTY_GET_METHOD = "thing.property.get";
// ========== 主题匹配常量 ==========
/**
* 事件上报主题模式
*/
public static final String EVENT_POST_TOPIC_PATTERN = "/thing/event/";
/**
* 主题后缀post
*/
public static final String POST_SUFFIX = "/post";
/**
* 属性上报主题后缀
*/
public static final String PROPERTY_POST_SUFFIX = "/thing/property/post";
/**
* 属性设置响应主题包含
*/
public static final String PROPERTY_SET_TOPIC_CONTAINS = "/thing/property/set";
/**
* 属性获取响应主题包含
*/
public static final String PROPERTY_GET_TOPIC_CONTAINS = "/thing/property/get";
// ========== MQTT 认证路径常量 ==========
/**
* MQTT 认证路径
*/
public static final String MQTT_AUTH_AUTHENTICATE_PATH = "/mqtt/auth/authenticate";
/**
* MQTT 连接事件路径
*/
public static final String MQTT_AUTH_CONNECTED_PATH = "/mqtt/auth/connected";
/**
* MQTT 断开事件路径
*/
public static final String MQTT_AUTH_DISCONNECTED_PATH = "/mqtt/auth/disconnected";
private final String topic;
private final String description;
// ========== 工具方法 ==========
/**
* 构建设备主题前缀
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 设备主题前缀/sys/{productKey}/{deviceName}
*/
private static String buildDeviceTopicPrefix(String productKey, String deviceName) {
return SYS_TOPIC_PREFIX + productKey + "/" + deviceName;
}
/**
* 构建设备服务调用主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serviceIdentifier 服务标识符
* @return 完整的主题路径
*/
public static String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) {
return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier;
}
/**
* 构建设备属性设置主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildPropertySetTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_SET_TOPIC.getTopic();
}
/**
* 构建设备属性获取主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildPropertyGetTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_GET_TOPIC.getTopic();
}
/**
* 构建设备配置设置主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildConfigSetTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + CONFIG_SET_TOPIC.getTopic();
}
/**
* 构建设备 OTA 升级主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildOtaUpgradeTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + OTA_UPGRADE_TOPIC.getTopic();
}
/**
* 构建设备属性上报主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildPropertyPostTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + PROPERTY_POST_TOPIC.getTopic();
}
/**
* 构建设备事件上报主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param eventIdentifier 事件标识符
* @return 完整的主题路径
*/
public static String buildEventPostTopic(String productKey, String deviceName, String eventIdentifier) {
return buildDeviceTopicPrefix(productKey, deviceName) +
EVENT_POST_TOPIC_PREFIX.getTopic() + eventIdentifier + EVENT_POST_TOPIC_SUFFIX.getTopic();
}
/**
* 获取响应主题
*
* @param requestTopic 请求主题
* @return 响应主题
*/
public static String getReplyTopic(String requestTopic) {
return requestTopic + REPLY_SUFFIX;
}
}

View File

@ -1,18 +1,12 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
@ -20,15 +14,18 @@ import lombok.extern.slf4j.Slf4j;
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotMqttUpstreamProtocol protocol;
private final IotMqttDownstreamHandler downstreamHandler;
private final IotMessageBus messageBus;
private final IotMqttUpstreamProtocol protocol;
@Resource
private IotDeviceService deviceService;
public IotMqttDownstreamSubscriber(IotMqttUpstreamProtocol protocol, IotMessageBus messageBus) {
this.protocol = protocol;
this.messageBus = messageBus;
this.downstreamHandler = new IotMqttDownstreamHandler(protocol);
}
@PostConstruct
public void init() {
@ -51,7 +48,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
log.info("[onMessage][接收到下行消息][messageId: {}][method: {}][deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
try {
// 根据消息方法处理不同的下行消息
// 1. 校验
String method = message.getMethod();
if (method == null) {
log.warn("[onMessage][消息方法为空][messageId: {}][deviceId: {}]",
@ -59,114 +56,14 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
return;
}
// 过滤上行消息下行订阅者只处理下行消息
if (isUpstreamMessage(method)) {
// TODO @haohao打个 erroor log按道理不会发生
log.debug("[onMessage][忽略上行消息][method: {}][messageId: {}]", method, message.getId());
return;
}
// 处理下行消息
handleDownstreamMessage(message);
// 2. 处理下行消息
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败][messageId: {}][method: {}][deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId(), e);
}
}
/**
* 处理下行消息
*
* @param message 设备消息
*/
private void handleDownstreamMessage(IotDeviceMessage message) {
// 1. 获取设备信息使用缓存
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
if (deviceInfo == null) {
log.warn("[handleDownstreamMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
return;
}
// 2. 根据方法构建主题
String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (StrUtil.isBlank(topic)) {
log.warn("[handleDownstreamMessage][未知的消息方法:{}]", message.getMethod());
return;
}
// 3. 构建载荷
JSONObject payload = buildDownstreamPayload(message, message.getMethod());
// 4. 发送消息
protocol.publishMessage(topic, payload.toString());
log.info("[handleDownstreamMessage][发送下行消息成功][method: {}][topic: {}]", message.getMethod(), topic);
}
/**
* 根据方法构建主题
*
* @param method 消息方法
* @param productKey 产品标识
* @param deviceName 设备名称
* @return 构建的主题如果方法不支持返回 null
*/
private String buildTopicByMethod(String method, String productKey, String deviceName) {
if (StrUtil.isBlank(method)) {
return null;
}
// 属性相关操作
if (method.startsWith(IotDeviceTopicEnum.PROPERTY_SERVICE_METHOD_PREFIX)) {
if (IotDeviceTopicEnum.PROPERTY_SET_METHOD.equals(method)) {
return IotDeviceTopicEnum.buildPropertySetTopic(productKey, deviceName);
} else if (IotDeviceTopicEnum.PROPERTY_GET_METHOD.equals(method)) {
return IotDeviceTopicEnum.buildPropertyGetTopic(productKey, deviceName);
}
return null;
}
// 配置设置操作
if (method.startsWith(IotDeviceTopicEnum.CONFIG_SERVICE_METHOD_PREFIX)) {
return IotDeviceTopicEnum.buildConfigSetTopic(productKey, deviceName);
}
// OTA 升级操作
if (method.startsWith(IotDeviceTopicEnum.OTA_SERVICE_METHOD_PREFIX)) {
return IotDeviceTopicEnum.buildOtaUpgradeTopic(productKey, deviceName);
}
// 一般服务调用操作
if (method.startsWith(IotDeviceTopicEnum.SERVICE_METHOD_PREFIX)) {
// 排除属性配置OTA 相关的服务调用
if (method.contains("property") || method.contains("config") || method.contains("ota")) {
return null; // 已在上面处理
}
// 从方法中提取服务标识符
String serviceIdentifier = method.substring(IotDeviceTopicEnum.SERVICE_METHOD_PREFIX.length());
return IotDeviceTopicEnum.buildServiceTopic(productKey, deviceName, serviceIdentifier);
}
// 不支持的方法
return null;
}
// TODO @haohao按道理说这里的应该是通过 encodeMessage
/**
* 构建下行消息载荷
*
* @param message 设备消息
* @param method 方法名
* @return JSON 载荷
*/
private JSONObject buildDownstreamPayload(IotDeviceMessage message, String method) {
JSONObject payload = new JSONObject();
payload.set("id", message.getId()); // 使用正确的消息ID字段
payload.set("version", "1.0");
payload.set("method", method);
payload.set("params", message.getData());
return payload;
}
/**
* 判断是否为上行消息
*

View File

@ -4,9 +4,9 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
@ -21,13 +21,10 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
// TODO @haohao看看有没多余的 log可以不打噢
// TODO @haohao有没多余的注释可以去掉减少 ai 保持简洁
/**
* IoT 网关 MQTT 统一协议
* IoT 网关 MQTT 协议接收设备上行消息
* <p>
* 1. MQTT 客户端连接 EMQX消费处理设备上行和下行消息
* 2. HTTP 认证服务 EMQX 提供设备认证连接断开接口
@ -37,24 +34,6 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class IotMqttUpstreamProtocol {
// TODO @haohao是不是也丢到配置里
/**
* 默认 QoS 级别 - 至少一次
*/
private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE;
// TODO @haohao这个也是
/**
* 连接超时时间
*/
private static final int CONNECT_TIMEOUT_SECONDS = 10;
// TODO @haohao重连也是
/**
* 重连延迟时间毫秒
*/
private static final long RECONNECT_DELAY_MS = 5000;
private final IotGatewayProperties.EmqxProperties emqxProperties;
private Vertx vertx;
@ -68,8 +47,6 @@ public class IotMqttUpstreamProtocol {
// HTTP 认证服务相关
private HttpServer httpAuthServer;
// TODO @haohaoauthHandler 可以 local
private IotMqttHttpAuthHandler authHandler;
/**
* 服务运行状态标志
@ -92,7 +69,6 @@ public class IotMqttUpstreamProtocol {
try {
// 1. 创建共享的 Vertx 实例
this.vertx = Vertx.vertx();
log.info("[start][共享 Vertx 实例创建成功]");
// 2. 启动 HTTP 认证服务
startHttpAuthServer();
@ -103,7 +79,6 @@ public class IotMqttUpstreamProtocol {
isRunning = true;
log.info("[start][MQTT 统一协议服务启动完成]");
} catch (Exception e) {
// TODO @haohao失败是不是直接 System.exit
log.error("[start][MQTT 统一协议服务启动失败]", e);
// 启动失败时清理资源
stop();
@ -150,12 +125,10 @@ public class IotMqttUpstreamProtocol {
router.route().handler(BodyHandler.create());
// 创建认证处理器
this.authHandler = new IotMqttHttpAuthHandler();
// 添加认证路由
router.post(IotDeviceTopicEnum.MQTT_AUTH_AUTHENTICATE_PATH).handler(authHandler::authenticate);
router.post(IotDeviceTopicEnum.MQTT_AUTH_CONNECTED_PATH).handler(authHandler::connected);
router.post(IotDeviceTopicEnum.MQTT_AUTH_DISCONNECTED_PATH).handler(authHandler::disconnected);
IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler();
router.post(IotMqttTopicUtils.MQTT_AUTH_AUTHENTICATE_PATH).handler(authHandler::authenticate);
router.post(IotMqttTopicUtils.MQTT_AUTH_CONNECTED_PATH).handler(authHandler::connected);
router.post(IotMqttTopicUtils.MQTT_AUTH_DISCONNECTED_PATH).handler(authHandler::disconnected);
// 启动 HTTP 服务器
int authPort = emqxProperties.getHttpAuthPort();
@ -175,14 +148,14 @@ public class IotMqttUpstreamProtocol {
* 停止 HTTP 认证服务
*/
private void stopHttpAuthServer() {
// TODO @haohao一些 if return 最好搞下
if (httpAuthServer != null) {
try {
httpAuthServer.close().result();
log.info("[stopHttpAuthServer][HTTP 认证服务已停止]");
} catch (Exception e) {
log.error("[stopHttpAuthServer][HTTP 认证服务停止失败]", e);
}
if (httpAuthServer == null) {
return;
}
try {
httpAuthServer.close().result();
log.info("[stopHttpAuthServer][HTTP 认证服务已停止]");
} catch (Exception e) {
log.error("[stopHttpAuthServer][HTTP 认证服务停止失败]", e);
}
}
@ -192,10 +165,10 @@ public class IotMqttUpstreamProtocol {
private void startMqttClient() {
log.info("[startMqttClient][开始启动 MQTT 客户端]");
// 初始化消息处理器
// 1. 初始化消息处理器
this.upstreamHandler = new IotMqttUpstreamHandler(this);
// 创建 MQTT 客户端
// 2. 创建 MQTT 客户端
log.info("[startMqttClient][使用 MQTT 客户端 ID: {}]", emqxProperties.getMqttClientId());
MqttClientOptions options = new MqttClientOptions()
@ -205,7 +178,7 @@ public class IotMqttUpstreamProtocol {
.setSsl(emqxProperties.getMqttSsl());
this.mqttClient = MqttClient.create(vertx, options);
// 连接 MQTT Broker
// 3. 连接 MQTT Broker
connectMqtt();
}
@ -229,13 +202,13 @@ public class IotMqttUpstreamProtocol {
}
// 2. 关闭 MQTT 客户端
try {
if (mqttClient != null && mqttClient.isConnected()) {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.disconnect();
log.info("[stopMqttClient][MQTT 客户端已断开]");
} catch (Exception e) {
log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e);
}
} catch (Exception e) {
log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e);
}
}
@ -243,10 +216,9 @@ public class IotMqttUpstreamProtocol {
* 连接 MQTT Broker 并订阅主题
*/
private void connectMqtt() {
// 参数校验
// 1. 参数校验
String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort();
if (StrUtil.isBlank(host)) {
log.error("[connectMqtt][MQTT Host 为空,无法连接]");
throw new IllegalArgumentException("MQTT Host 不能为空");
@ -255,63 +227,47 @@ public class IotMqttUpstreamProtocol {
log.error("[connectMqtt][MQTT Port 无效:{}]", port);
throw new IllegalArgumentException("MQTT Port 必须为正整数");
}
log.info("[connectMqtt][开始连接 MQTT Broker][host: {}][port: {}]", host, port);
CompletableFuture<Void> connectFuture = mqttClient.connect(port, host)
.toCompletionStage()
.toCompletableFuture()
.thenAccept(connAck -> {
// TODO @haohao是不是可以连接完然后在执行里面不用 通过 thenAccept
log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port);
// 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> {
log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
reconnectWithDelay();
});
// 设置消息处理器
setupMessageHandler();
// 订阅主题
subscribeToTopics();
})
.exceptionally(error -> {
// TODO @haohao这里的异常是不是不用重连哈因为直接就退出了然后有 closeHandler 监听重连了
log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, error);
// 连接失败时也要尝试重连
reconnectWithDelay();
return null;
});
// 等待连接完成
// 2. 连接
try {
connectFuture.get(CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[connectMqtt][MQTT 客户端启动完成]");
mqttClient.connect(port, host)
.toCompletionStage()
.toCompletableFuture()
.get(emqxProperties.getConnectTimeoutSeconds(), TimeUnit.SECONDS);
log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port);
// 3. 设置处理器
// 3.1 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> {
log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
reconnectWithDelay();
});
// 3.2 设置消息处理器
mqttClient.publishHandler(upstreamHandler::handle);
log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]");
// 4. 订阅主题
subscribeToTopics();
} catch (Exception e) {
log.error("[connectMqtt][MQTT 客户端启动失败]", e);
log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, e);
reconnectWithDelay(); // 连接失败时也要尝试重连
throw new RuntimeException("MQTT 客户端启动失败", e);
}
}
/**
* 设置 MQTT 消息处理器
*/
private void setupMessageHandler() {
mqttClient.publishHandler(upstreamHandler::handle);
log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]");
}
/**
* 订阅设备上行消息主题
*/
private void subscribeToTopics() {
List<String> topicList = emqxProperties.getMqttTopics();
int qos = emqxProperties.getMqttQos();
log.info("[subscribeToTopics][开始订阅主题,共 {} 个]", topicList.size());
for (String topic : topicList) {
mqttClient.subscribe(topic, DEFAULT_QOS.value(), subscribeResult -> {
mqttClient.subscribe(topic, qos, subscribeResult -> {
if (subscribeResult.succeeded()) {
log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, DEFAULT_QOS.value());
log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, qos);
} else {
log.error("[subscribeToTopics][订阅主题失败: {}]", topic, subscribeResult.cause());
}
@ -323,17 +279,17 @@ public class IotMqttUpstreamProtocol {
* 延迟重连
*/
private void reconnectWithDelay() {
vertx.setTimer(RECONNECT_DELAY_MS, timerId -> {
// TODO @haohaoif return括号少一些
if (isRunning && (mqttClient == null || !mqttClient.isConnected())) {
log.info("[reconnectWithDelay][开始重连 MQTT Broker延迟 {} 毫秒]", RECONNECT_DELAY_MS);
try {
connectMqtt();
} catch (Exception e) {
log.error("[reconnectWithDelay][重连失败,将继续尝试重连]", e);
// 重连失败时继续尝试重连
reconnectWithDelay();
}
long delay = emqxProperties.getReconnectDelayMs();
vertx.setTimer(delay, timerId -> {
if (!isRunning || (mqttClient != null && mqttClient.isConnected())) {
return;
}
log.info("[reconnectWithDelay][开始重连 MQTT Broker延迟 {} 毫秒]", delay);
try {
connectMqtt();
} catch (Exception e) {
log.error("[reconnectWithDelay][重连失败,将继续尝试重连]", e);
reconnectWithDelay(); // 失败后继续尝试
}
});
}
@ -345,12 +301,12 @@ public class IotMqttUpstreamProtocol {
* @param payload 消息内容
*/
public void publishMessage(String topic, String payload) {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.publish(topic, Buffer.buffer(payload), DEFAULT_QOS, false, false);
log.debug("[publishMessage][发布消息成功][topic: {}]", topic);
} else {
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("[publishMessage][MQTT 客户端未连接,无法发布消息][topic: {}]", topic);
return;
}
MqttQoS qos = MqttQoS.valueOf(emqxProperties.getMqttQos());
mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false);
}
}

View File

@ -1,95 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
// TODO @haohao是不是不用基类哈
/**
* IoT 网关 MQTT 协议的处理器抽象基类
* <p>
* 提供通用的异常处理参数校验等功能
*
* @author 芋道源码
*/
@Slf4j
public abstract class IotMqttAbstractHandler {
/**
* 处理 MQTT 消息的模板方法
*
* @param topic 主题
* @param payload 消息内容
*/
public final void handle(String topic, String payload) {
try {
// 1. 前置校验
if (!validateInput(topic, payload)) {
return;
}
// 2. 执行具体逻辑
doHandle(topic, payload);
} catch (Exception e) {
log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, payload, e);
handleException(topic, payload, e);
}
}
/**
* 具体的处理逻辑由子类实现
*
* @param topic 主题
* @param payload 消息内容
*/
protected abstract void doHandle(String topic, String payload);
/**
* 输入参数校验
*
* @param topic 主题
* @param payload 消息内容
* @return 校验是否通过
*/
protected boolean validateInput(String topic, String payload) {
if (StrUtil.isBlank(topic)) {
log.warn("[validateInput][主题为空,忽略消息]");
return false;
}
if (StrUtil.isBlank(payload)) {
log.warn("[validateInput][消息内容为空][topic: {}]", topic);
return false;
}
return true;
}
/**
* 异常处理
*
* @param topic 主题
* @param payload 消息内容
* @param e 异常
*/
protected void handleException(String topic, String payload, Exception e) {
// 默认实现记录错误日志
// 子类可以重写此方法添加特定的异常处理逻辑
log.error("[handleException][MQTT 消息处理异常][topic: {}]", topic, e);
}
/**
* 解析主题获取主题各部分
*
* @param topic 主题
* @return 主题各部分数组如果解析失败返回 null
*/
protected String[] parseTopic(String topic) {
String[] topicParts = topic.split("/");
if (topicParts.length < 7) {
log.warn("[parseTopic][主题格式不正确][topic: {}]", topic);
return null;
}
return topicParts;
}
}

View File

@ -0,0 +1,98 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 下行消息处理器
* <p>
* 从消息总线接收到下行消息然后发布到 MQTT Broker
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttDownstreamHandler {
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceService deviceService;
private final IotDeviceMessageService deviceMessageService;
public IotMqttDownstreamHandler(IotMqttUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
}
/**
* 处理下行消息
*
* @param message 设备消息
*/
public void handle(IotDeviceMessage message) {
// 1. 获取设备信息使用缓存
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
if (deviceInfo == null) {
log.warn("[handle][设备信息不存在][deviceId: {}]", message.getDeviceId());
return;
}
// 2. 根据方法构建主题
String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (StrUtil.isBlank(topic)) {
log.warn("[handle][未知的消息方法:{}]", message.getMethod());
return;
}
// 3. 构建载荷
JSONObject payload = buildDownstreamPayload(message);
// 4. 发布消息
protocol.publishMessage(topic, payload.toString());
log.info("[handle][发布下行消息成功][method: {}][topic: {}]", message.getMethod(), topic);
}
/**
* 根据方法构建主题
*
* @param method 消息方法
* @param productKey 产品标识
* @param deviceName 设备名称
* @return 构建的主题如果方法不支持返回 null
*/
private String buildTopicByMethod(String method, String productKey, String deviceName) {
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(method);
if (methodEnum == null) {
return null;
}
return switch (methodEnum) {
case PROPERTY_POST -> IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName);
case PROPERTY_SET -> IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName);
default -> null;
};
}
/**
* 构建下行消息载荷
*
* @param message 设备消息
* @return JSON 载荷
*/
private JSONObject buildDownstreamPayload(IotDeviceMessage message) {
// 使用 IotDeviceMessageService 进行消息编码
IotDeviceRespDTO device = deviceService.getDeviceFromCache(message.getDeviceId());
byte[] encodedBytes = deviceMessageService.encodeDeviceMessage(message, device.getProductKey(),
device.getDeviceName());
return new JSONObject(new String(encodedBytes));
}
}

View File

@ -1,27 +1,23 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import org.springframework.util.Assert;
/**
* IoT 网关 MQTT 协议的上行处理器
* <p>
* 处理设备上行消息包括事件上报属性上报服务调用响应等
* IoT 网关 MQTT 上行消息处理器
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
public class IotMqttUpstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final String serverId;
public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol) {
@ -34,167 +30,66 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
*/
public void handle(MqttPublishMessage message) {
String topic = message.topicName();
// TODO @haohao message.payload().getBytes();
String payload = message.payload().toString(StandardCharsets.UTF_8);
byte[] payload = message.payload().getBytes();
log.debug("[handle][收到 MQTT 消息][topic: {}]", topic);
// 调用父类的 handle 方法父类会进行参数校验
handle(topic, payload);
}
@Override
protected void doHandle(String topic, String payload) {
// 1. 识别并验证消息类型
String messageType = getMessageType(topic);
if (messageType == null) {
// TODO @haohaolog 是不是把 payload 也打印下哈
log.warn("[doHandle][未知的消息类型][topic: {}]", topic);
return;
try {
// 1. 前置校验
if (StrUtil.isBlank(topic)) {
log.warn("[validateInput][主题为空,忽略消息]");
return;
}
// 注意payload 可以为空
// 2. 识别并验证消息类型
String messageType = getMessageType(topic);
Assert.notNull(messageType, String.format("未知的消息类型, topic(%s)", topic));
log.info("[handle][接收到{}][topic: {}]", messageType, topic);
// 3. 解析主题获取 productKey deviceName
String[] topicParts = topic.split("/");
if (topicParts.length < 4) {
log.warn("[handle][主题格式不正确,无法解析 productKey 和 deviceName][topic: {}]", topic);
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
if (StrUtil.isAllBlank(productKey, deviceName)) {
log.warn("[handle][主题中 productKey 或 deviceName 为空][topic: {}]", topic);
return;
}
// 4. 解码消息
IotDeviceMessage deviceMessage = deviceMessageService.decodeDeviceMessage(
payload, productKey, deviceName);
if (deviceMessage == null) {
log.warn("[handle][消息解码失败][topic: {}]", topic);
return;
}
// 5. 发送消息到队列
deviceMessageService.sendDeviceMessage(deviceMessage, productKey, deviceName, serverId);
// 6. 记录成功日志
log.info("[handle][处理{}成功,已转发到 MQ][topic: {}]", messageType, topic);
} catch (Exception e) {
log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, new String(payload), e);
}
// 2. 处理消息
processMessage(topic, payload, messageType);
}
/**
* 处理消息的统一逻辑
*/
private void processMessage(String topic, String payload, String messageType) {
// TODO @haohaomessageType 解析是不是作用不大哈
log.info("[processMessage][接收到{}][topic: {}]", messageType, topic);
// 解析主题获取设备信息
// TODO @haohao不一定是 7 个哈阿里云 topic 有点差异的可以考虑解析到 topicParts[2]topicParts[3] topic
String[] topicParts = parseTopic(topic);
if (topicParts == null) {
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
// TODO @haohao解析不到可以打个 error log
// 解码消息
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
messageBytes, productKey, deviceName);
// 发送消息到队列
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
// 记录成功日志
log.info("[processMessage][处理{}成功][topic: {}]", messageType, topic);
}
// TODO @haohao合并下处理不搞成每个 topic 一个处理
/**
* 识别消息类型
* 从主题中获得消息类型
*
* @param topic 主题
* @return 消息类型描述如果不支持返回 null
* @return 消息类型
*/
private String getMessageType(String topic) {
// 此方法由 doHandle 调用topic 已经在父类中校验过无需重复校验
// 按优先级匹配主题类型避免误匹配
// 1. 设备属性上报: /sys/{productKey}/{deviceName}/thing/property/post
if (isPropertyPostTopic(topic)) {
return IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getDescription();
String[] topicParts = topic.split("/");
if (topicParts.length < 7) {
return null;
}
// 2. 设备事件上报: /sys/{productKey}/{deviceName}/thing/{eventIdentifier}/post
if (isEventPostTopic(topic)) {
return "设备事件上报";
}
// 3. 设备属性设置响应: /sys/{productKey}/{deviceName}/thing/property/set_reply
if (isPropertySetReplyTopic(topic)) {
return "设备属性设置响应";
}
// 4. 设备属性获取响应: /sys/{productKey}/{deviceName}/thing/property/get_reply
if (isPropertyGetReplyTopic(topic)) {
return "设备属性获取响应";
}
// 5. 设备配置设置响应: /sys/{productKey}/{deviceName}/thing/config/set_reply
if (isConfigSetReplyTopic(topic)) {
return IotDeviceTopicEnum.CONFIG_SET_TOPIC.getDescription() + "响应";
}
// 6. 设备 OTA 升级响应:
// /sys/{productKey}/{deviceName}/thing/ota/upgrade_reply
if (isOtaUpgradeReplyTopic(topic)) {
return IotDeviceTopicEnum.OTA_UPGRADE_TOPIC.getDescription() + "响应";
}
// 7. 其他服务调用响应: 通用服务调用响应
if (isServiceReplyTopic(topic)) {
return "设备服务调用响应";
}
// 不支持的消息类型
return null;
}
/**
* 判断是否为属性上报主题
*/
private boolean isPropertyPostTopic(String topic) {
return topic.contains(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic());
}
/**
* 判断是否为事件上报主题
*/
private boolean isEventPostTopic(String topic) {
return topic.contains(IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic())
&& topic.endsWith(IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic())
&& !topic.contains("property"); // 排除属性上报主题
}
/**
* 判断是否为属性设置响应主题
*/
private boolean isPropertySetReplyTopic(String topic) {
return topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic())
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX);
}
/**
* 判断是否为属性获取响应主题
*/
private boolean isPropertyGetReplyTopic(String topic) {
return topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic())
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX);
}
/**
* 判断是否为配置设置响应主题
*/
private boolean isConfigSetReplyTopic(String topic) {
return topic.contains(IotDeviceTopicEnum.CONFIG_SET_TOPIC.getTopic())
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX);
}
/**
* 判断是否为 OTA 升级响应主题
*/
private boolean isOtaUpgradeReplyTopic(String topic) {
return topic.contains(IotDeviceTopicEnum.OTA_UPGRADE_TOPIC.getTopic())
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX);
}
/**
* 判断是否为服务调用响应主题排除已处理的特殊服务
*/
private boolean isServiceReplyTopic(String topic) {
return topic.contains(IotDeviceTopicEnum.SERVICE_TOPIC_PREFIX)
&& topic.endsWith(IotDeviceTopicEnum.REPLY_SUFFIX)
&& !topic.contains("property")
&& !topic.contains("config")
&& !topic.contains("ota");
return topicParts[3];
}
}

View File

@ -0,0 +1,90 @@
package cn.iocoder.yudao.module.iot.gateway.util;
/**
* IoT 网关 MQTT 主题工具类
* <p>
* 用于统一管理 MQTT 协议中的主题常量基于 Alink 协议规范
*
* @author 芋道源码
*/
public final class IotMqttTopicUtils {
// ========== 静态常量 ==========
/**
* 系统主题前缀
*/
private static final String SYS_TOPIC_PREFIX = "/sys/";
/**
* 服务调用主题前缀
*/
private static final String SERVICE_TOPIC_PREFIX = "/thing/";
// ========== MQTT 认证路径常量 ==========
/**
* MQTT 认证路径
*/
public static final String MQTT_AUTH_AUTHENTICATE_PATH = "/mqtt/auth/authenticate";
/**
* MQTT 连接事件路径
*/
public static final String MQTT_AUTH_CONNECTED_PATH = "/mqtt/auth/connected";
/**
* MQTT 断开事件路径
*/
public static final String MQTT_AUTH_DISCONNECTED_PATH = "/mqtt/auth/disconnected";
// ========== 工具方法 ==========
/**
* 构建设备主题前缀
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 设备主题前缀/sys/{productKey}/{deviceName}
*/
private static String buildDeviceTopicPrefix(String productKey, String deviceName) {
return SYS_TOPIC_PREFIX + productKey + "/" + deviceName;
}
/**
* 构建设备属性设置主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildPropertySetTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/set";
}
/**
* 构建设备属性上报回复主题
* <p>
* 当设备上报属性时会收到该主题的回复
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildPropertyPostReplyTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/post_reply";
}
/**
* 构建设备服务调用主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serviceIdentifier 服务标识符
* @return 完整的主题路径
*/
public static String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) {
return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier;
}
}