feat:【IoT 物联网】新增 MQTT 协议支持,包含上行和下行消息处理器,完善设备认证和属性、事件、服务处理逻辑,更新配置以启用 EMQX 组件

This commit is contained in:
haohao 2025-06-08 22:24:32 +08:00
parent d7c57f5023
commit f58cf282dd
12 changed files with 1157 additions and 3 deletions

View File

@ -41,6 +41,12 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<!-- MQTT 相关 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -1,9 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@ -35,4 +36,24 @@ public class IotGatewayConfiguration {
}
}
/**
* IoT 网关 MQTT 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.emqx", name = "enabled", havingValue = "true")
@Slf4j
public static class MqttProtocolConfiguration {
@Bean
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) {
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx());
}
@Bean
public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol,
IotMessageBus messageBus) {
return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus);
}
}
}

View File

@ -0,0 +1,161 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 订阅者接收下行给设备的消息
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotMqttUpstreamProtocol protocol;
private final IotMessageBus messageBus;
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
log.info("[onMessage][接收到下行消息:{}]", message);
try {
// 根据消息类型处理不同的下行消息
String messageType = message.getType();
switch (messageType) {
case "property":
handlePropertyMessage(message);
break;
case "service":
handleServiceMessage(message);
break;
case "config":
handleConfigMessage(message);
break;
case "ota":
handleOtaMessage(message);
break;
default:
log.warn("[onMessage][未知的消息类型:{}]", messageType);
break;
}
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败:{}]", message, e);
}
}
/**
* 处理属性相关消息
*
* @param message 设备消息
*/
private void handlePropertyMessage(IotDeviceMessage message) {
String identifier = message.getIdentifier();
String productKey = message.getProductKey();
String deviceName = message.getDeviceName();
if ("set".equals(identifier)) {
// 属性设置
String topic = IotDeviceTopicEnum.buildPropertySetTopic(productKey, deviceName);
JSONObject payload = buildDownstreamPayload(message, "thing.service.property.set");
protocol.publishMessage(topic, payload.toString());
log.info("[handlePropertyMessage][发送属性设置消息][topic: {}]", topic);
} else if ("get".equals(identifier)) {
// 属性获取
String topic = IotDeviceTopicEnum.buildPropertyGetTopic(productKey, deviceName);
JSONObject payload = buildDownstreamPayload(message, "thing.service.property.get");
protocol.publishMessage(topic, payload.toString());
log.info("[handlePropertyMessage][发送属性获取消息][topic: {}]", topic);
} else {
log.warn("[handlePropertyMessage][未知的属性操作:{}]", identifier);
}
}
/**
* 处理服务调用消息
*
* @param message 设备消息
*/
private void handleServiceMessage(IotDeviceMessage message) {
String identifier = message.getIdentifier();
String productKey = message.getProductKey();
String deviceName = message.getDeviceName();
String topic = IotDeviceTopicEnum.buildServiceTopic(productKey, deviceName, identifier);
JSONObject payload = buildDownstreamPayload(message, "thing.service." + identifier);
protocol.publishMessage(topic, payload.toString());
log.info("[handleServiceMessage][发送服务调用消息][topic: {}]", topic);
}
/**
* 处理配置设置消息
*
* @param message 设备消息
*/
private void handleConfigMessage(IotDeviceMessage message) {
String productKey = message.getProductKey();
String deviceName = message.getDeviceName();
String topic = IotDeviceTopicEnum.buildConfigSetTopic(productKey, deviceName);
JSONObject payload = buildDownstreamPayload(message, "thing.service.config.set");
protocol.publishMessage(topic, payload.toString());
log.info("[handleConfigMessage][发送配置设置消息][topic: {}]", topic);
}
/**
* 处理 OTA 升级消息
*
* @param message 设备消息
*/
private void handleOtaMessage(IotDeviceMessage message) {
String productKey = message.getProductKey();
String deviceName = message.getDeviceName();
String topic = IotDeviceTopicEnum.buildOtaUpgradeTopic(productKey, deviceName);
JSONObject payload = buildDownstreamPayload(message, "thing.service.ota.upgrade");
protocol.publishMessage(topic, payload.toString());
log.info("[handleOtaMessage][发送 OTA 升级消息][topic: {}]", topic);
}
/**
* 构建下行消息载荷
*
* @param message 设备消息
* @param method 方法名
* @return JSON 载荷
*/
private JSONObject buildDownstreamPayload(IotDeviceMessage message, String method) {
JSONObject payload = new JSONObject();
payload.set("id", message.getMessageId());
payload.set("version", "1.0");
payload.set("method", method);
payload.set("params", message.getData());
return payload;
}
}

View File

@ -0,0 +1,267 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAuthRouter;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* IoT 网关 MQTT 协议接收设备上行消息
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotMqttUpstreamProtocol {
/**
* 默认 QoS 级别
*/
private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE;
private final IotGatewayProperties.EmqxProperties emqxProperties;
private Vertx vertx;
private MqttClient mqttClient;
private IotMqttUpstreamRouter messageRouter;
private IotMqttAuthRouter authRouter;
private IotDeviceMessageProducer deviceMessageProducer;
/**
* 服务运行状态标志
*/
private volatile boolean isRunning = false;
@PostConstruct
public void start() {
if (isRunning) {
log.warn("[start][MQTT 协议服务已经在运行中,请勿重复启动]");
return;
}
log.info("[start][开始启动 MQTT 协议服务]");
// 初始化组件
this.vertx = Vertx.vertx();
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
this.messageRouter = new IotMqttUpstreamRouter(this);
this.authRouter = new IotMqttAuthRouter(this);
// 创建 MQTT 客户端
MqttClientOptions options = new MqttClientOptions()
.setClientId("yudao-iot-gateway-" + IdUtil.fastSimpleUUID())
.setUsername(emqxProperties.getMqttUsername())
.setPassword(emqxProperties.getMqttPassword())
.setSsl(ObjUtil.defaultIfNull(emqxProperties.getMqttSsl(), false));
this.mqttClient = MqttClient.create(vertx, options);
// 连接 MQTT Broker
connectMqtt();
}
@PreDestroy
public void stop() {
if (!isRunning) {
log.warn("[stop][MQTT 协议服务已经停止,无需再次停止]");
return;
}
log.info("[stop][开始停止 MQTT 协议服务]");
// 1. 取消 MQTT 主题订阅
if (mqttClient != null && mqttClient.isConnected()) {
List<String> topicList = emqxProperties.getMqttTopics();
if (CollUtil.isNotEmpty(topicList)) {
for (String topic : topicList) {
try {
mqttClient.unsubscribe(topic);
log.debug("[stop][取消订阅主题: {}]", topic);
} catch (Exception e) {
log.warn("[stop][取消订阅主题异常: {}]", topic, e);
}
}
}
}
// 2. 关闭 MQTT 客户端
try {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
}
} catch (Exception e) {
log.warn("[stop][关闭 MQTT 客户端异常]", e);
}
// 3. 关闭 Vertx
try {
if (vertx != null) {
vertx.close();
}
} catch (Exception e) {
log.warn("[stop][关闭 Vertx 异常]", e);
}
// 4. 更新状态
isRunning = false;
log.info("[stop][MQTT 协议服务已停止]");
}
/**
* 连接 MQTT Broker 并订阅主题
*/
private void connectMqtt() {
// 检查必要的 MQTT 配置
String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort();
if (StrUtil.isBlank(host)) {
String msg = "[connectMqtt][MQTT Host 为空,无法连接]";
log.error(msg);
return;
}
if (port == null) {
log.warn("[connectMqtt][MQTT Port 为 null使用默认端口 1883]");
port = 1883; // 默认 MQTT 端口
}
final Integer finalPort = port;
CompletableFuture<Void> connectFuture = mqttClient.connect(finalPort, host)
.toCompletionStage()
.toCompletableFuture()
.thenAccept(connAck -> {
log.info("[connectMqtt][MQTT 客户端连接成功]");
// 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> {
log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
reconnectWithDelay();
});
// 设置消息处理器
setupMessageHandler();
// 订阅主题
subscribeToTopics();
})
.exceptionally(error -> {
log.error("[connectMqtt][连接 MQTT Broker 失败]", error);
reconnectWithDelay();
return null;
});
// 等待连接完成
try {
connectFuture.get(10, TimeUnit.SECONDS);
isRunning = true;
log.info("[connectMqtt][MQTT 协议服务启动完成]");
} catch (Exception e) {
log.error("[connectMqtt][MQTT 协议服务启动失败]", e);
}
}
/**
* 设置 MQTT 消息处理器
*/
private void setupMessageHandler() {
mqttClient.publishHandler(messageRouter::route);
log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]");
}
/**
* 订阅设备上行消息主题
*/
private void subscribeToTopics() {
List<String> topicList = emqxProperties.getMqttTopics();
if (CollUtil.isEmpty(topicList)) {
log.warn("[subscribeToTopics][未配置 MQTT 主题,使用默认主题]");
topicList = List.of("/sys/#"); // 默认订阅所有系统主题
}
for (String topic : topicList) {
if (StrUtil.isBlank(topic)) {
log.warn("[subscribeToTopics][跳过空主题]");
continue;
}
mqttClient.subscribe(topic, DEFAULT_QOS.value())
.onSuccess(ack -> log.info("[subscribeToTopics][订阅主题成功: {}]", topic))
.onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err));
}
}
/**
* 重连 MQTT 客户端
*/
private void reconnectWithDelay() {
if (!isRunning) {
log.info("[reconnectWithDelay][服务已停止,不再尝试重连]");
return;
}
// 默认重连延迟 5
int reconnectDelayMs = 5000;
vertx.setTimer(reconnectDelayMs, id -> {
log.info("[reconnectWithDelay][开始重新连接 MQTT]");
connectMqtt();
});
}
/**
* 发布消息到 MQTT
*
* @param topic 主题
* @param payload 消息内容
*/
public void publishMessage(String topic, String payload) {
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("[publishMessage][MQTT 客户端未连接,无法发送消息][topic: {}]", topic);
return;
}
mqttClient.publish(topic, io.vertx.core.buffer.Buffer.buffer(payload), DEFAULT_QOS, false, false)
.onSuccess(v -> log.debug("[publishMessage][发送消息成功][topic: {}]", topic))
.onFailure(err -> log.error("[publishMessage][发送消息失败][topic: {}]", topic, err));
}
/**
* 获取服务器 ID
*
* @return 服务器 ID
*/
public String getServerId() {
return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
}
/**
* 获取 MQTT 客户端
*
* @return MQTT 客户端
*/
public MqttClient getMqttClient() {
return mqttClient;
}
/**
* 获取认证路由器
*
* @return 认证路由器
*/
public IotMqttAuthRouter getAuthRouter() {
return authRouter;
}
}

View File

@ -0,0 +1,38 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 协议的路由处理器抽象基类
* <p>
* 提供通用的处理方法所有 MQTT 消息处理器都应继承此类
*
* @author 芋道源码
*/
@Slf4j
public abstract class IotMqttAbstractHandler {
/**
* 处理 MQTT 消息
*
* @param topic 主题
* @param payload 消息内容
*/
public abstract void handle(String topic, String payload);
/**
* 解析主题获取主题各部分
*
* @param topic 主题
* @return 主题各部分数组如果解析失败返回 null
*/
protected String[] parseTopic(String topic) {
String[] topicParts = topic.split("/");
if (topicParts.length < 7) {
log.warn("[parseTopic][主题格式不正确][topic: {}]", topic);
return null;
}
return topicParts;
}
}

View File

@ -0,0 +1,146 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 认证路由器
* <p>
* 处理设备的 MQTT 连接认证和连接状态管理
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotMqttAuthRouter {
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer;
private final IotDeviceTokenService deviceTokenService;
private final IotDeviceCommonApi deviceCommonApi;
public IotMqttAuthRouter(IotMqttUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
this.deviceCommonApi = SpringUtil.getBean(IotDeviceCommonApi.class);
}
/**
* 处理设备认证
*
* @param clientId 客户端 ID
* @param username 用户名
* @param password 密码
* @return 认证结果
*/
public boolean authenticate(String clientId, String username, String password) {
try {
log.info("[authenticate][开始认证设备][clientId: {}][username: {}]", clientId, username);
// 1. 参数校验
if (StrUtil.isEmpty(clientId) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) {
log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientId, username);
return false;
}
// 2. 执行认证
CommonResult<Boolean> result = deviceCommonApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(clientId).setUsername(username).setPassword(password));
result.checkError();
if (!Boolean.TRUE.equals(result.getData())) {
log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientId, username);
return false;
}
log.info("[authenticate][设备认证成功][clientId: {}][username: {}]", clientId, username);
return true;
} catch (Exception e) {
log.error("[authenticate][设备认证异常][clientId: {}][username: {}]", clientId, username, e);
return false;
}
}
/**
* 处理设备连接事件
*
* @param clientId 客户端 ID
* @param username 用户名
*/
public void handleClientConnected(String clientId, String username) {
try {
log.info("[handleClientConnected][设备连接][clientId: {}][username: {}]", clientId, username);
// 解析设备信息并发送上线消息
handleDeviceStateChange(username, true);
} catch (Exception e) {
log.error("[handleClientConnected][处理设备连接事件异常][clientId: {}][username: {}]", clientId, username, e);
}
}
/**
* 处理设备断开连接事件
*
* @param clientId 客户端 ID
* @param username 用户名
*/
public void handleClientDisconnected(String clientId, String username) {
try {
log.info("[handleClientDisconnected][设备断开连接][clientId: {}][username: {}]", clientId, username);
// 解析设备信息并发送下线消息
handleDeviceStateChange(username, false);
} catch (Exception e) {
log.error("[handleClientDisconnected][处理设备断开连接事件异常][clientId: {}][username: {}]", clientId, username, e);
}
}
/**
* 处理设备状态变化
*
* @param username 用户名
* @param online 是否在线
*/
private void handleDeviceStateChange(String username, boolean online) {
// 解析设备信息
if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
log.warn("[handleDeviceStateChange][用户名为空或未定义][username: {}]", username);
return;
}
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.parseUsername(username);
if (deviceInfo == null) {
log.warn("[handleDeviceStateChange][无法解析设备信息][username: {}]", username);
return;
}
try {
// 发送设备状态消息
IotDeviceMessage message = IotDeviceMessage.of(
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
if (online) {
message = message.ofStateOnline();
log.info("[handleDeviceStateChange][发送设备上线消息成功][username: {}]", username);
} else {
message = message.ofStateOffline();
log.info("[handleDeviceStateChange][发送设备下线消息成功][username: {}]", username);
}
deviceMessageProducer.sendDeviceMessage(message);
} catch (Exception e) {
log.error("[handleDeviceStateChange][发送设备状态消息失败][username: {}][online: {}]", username, online, e);
}
}
}

View File

@ -0,0 +1,120 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* IoT 网关 MQTT 事件处理器
* <p>
* 处理设备事件相关的 MQTT 消息
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotMqttEventHandler extends IotMqttAbstractHandler {
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer;
@Override
public void handle(String topic, String payload) {
try {
log.info("[handle][接收到设备事件上报][topic: {}]", topic);
// 解析消息内容
JSONObject jsonObject = JSONUtil.parseObj(payload);
String[] topicParts = parseTopic(topic);
if (topicParts == null) {
return;
}
// 构建设备消息
String productKey = topicParts[2];
String deviceName = topicParts[3];
String eventIdentifier = getEventIdentifier(topicParts, topic);
if (eventIdentifier == null) {
return;
}
Map<String, Object> eventData = parseEventDataFromPayload(jsonObject);
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId());
// 设置事件消息类型和标识符
message.setType("event");
message.setIdentifier(eventIdentifier);
message.setData(eventData);
// 发送消息
deviceMessageProducer.sendDeviceMessage(message);
log.info("[handle][处理设备事件上报成功][topic: {}]", topic);
// 发送响应消息
String method = "thing.event." + eventIdentifier + ".post";
sendResponse(topic, jsonObject, method);
} catch (Exception e) {
log.error("[handle][处理设备事件上报失败][topic: {}][payload: {}]", topic, payload, e);
}
}
/**
* 从主题部分中获取事件标识符
*
* @param topicParts 主题各部分
* @param topic 原始主题用于日志
* @return 事件标识符如果获取失败返回 null
*/
private String getEventIdentifier(String[] topicParts, String topic) {
try {
return topicParts[6];
} catch (ArrayIndexOutOfBoundsException e) {
log.warn("[getEventIdentifier][无法从主题中获取事件标识符][topic: {}]", topic);
return null;
}
}
/**
* 从消息载荷解析事件数据
*
* @param jsonObject 消息 JSON 对象
* @return 事件数据映射
*/
private Map<String, Object> parseEventDataFromPayload(JSONObject jsonObject) {
JSONObject params = jsonObject.getJSONObject("params");
if (params == null) {
log.warn("[parseEventDataFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject);
return Map.of();
}
return params;
}
/**
* 发送响应消息
*
* @param topic 原始主题
* @param jsonObject 原始消息 JSON 对象
* @param method 响应方法
*/
private void sendResponse(String topic, JSONObject jsonObject, String method) {
String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
// 构建响应消息
JSONObject response = new JSONObject();
response.set("id", jsonObject.getStr("id"));
response.set("code", 200);
response.set("method", method);
response.set("data", new JSONObject());
// 发送响应
protocol.publishMessage(replyTopic, response.toString());
log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic);
}
}

View File

@ -0,0 +1,147 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* IoT 网关 MQTT 属性处理器
* <p>
* 处理设备属性相关的 MQTT 消息
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer;
@Override
public void handle(String topic, String payload) {
if (topic.endsWith(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic())) {
// 属性上报
handlePropertyPost(topic, payload);
} else if (topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic())) {
// 属性设置响应
handlePropertySetReply(topic, payload);
} else if (topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic())) {
// 属性获取响应
handlePropertyGetReply(topic, payload);
} else {
log.warn("[handle][未知的属性主题][topic: {}]", topic);
}
}
/**
* 处理设备属性上报消息
*
* @param topic 主题
* @param payload 消息内容
*/
private void handlePropertyPost(String topic, String payload) {
try {
log.info("[handlePropertyPost][接收到设备属性上报][topic: {}]", topic);
// 解析消息内容
JSONObject jsonObject = JSONUtil.parseObj(payload);
String[] topicParts = parseTopic(topic);
if (topicParts == null) {
return;
}
// 构建设备消息
String productKey = topicParts[2];
String deviceName = topicParts[3];
Map<String, Object> properties = parsePropertiesFromPayload(jsonObject);
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId())
.ofPropertyReport(properties);
// 发送消息
deviceMessageProducer.sendDeviceMessage(message);
log.info("[handlePropertyPost][处理设备属性上报成功][topic: {}]", topic);
// 发送响应消息
sendResponse(topic, jsonObject, "thing.event.property.post");
} catch (Exception e) {
log.error("[handlePropertyPost][处理设备属性上报失败][topic: {}][payload: {}]", topic, payload, e);
}
}
/**
* 处理属性设置响应消息
*
* @param topic 主题
* @param payload 消息内容
*/
private void handlePropertySetReply(String topic, String payload) {
try {
log.info("[handlePropertySetReply][接收到属性设置响应][topic: {}]", topic);
// TODO: 处理属性设置响应逻辑
} catch (Exception e) {
log.error("[handlePropertySetReply][处理属性设置响应失败][topic: {}][payload: {}]", topic, payload, e);
}
}
/**
* 处理属性获取响应消息
*
* @param topic 主题
* @param payload 消息内容
*/
private void handlePropertyGetReply(String topic, String payload) {
try {
log.info("[handlePropertyGetReply][接收到属性获取响应][topic: {}]", topic);
// TODO: 处理属性获取响应逻辑
} catch (Exception e) {
log.error("[handlePropertyGetReply][处理属性获取响应失败][topic: {}][payload: {}]", topic, payload, e);
}
}
/**
* 从消息载荷解析属性
*
* @param jsonObject 消息 JSON 对象
* @return 属性映射
*/
private Map<String, Object> parsePropertiesFromPayload(JSONObject jsonObject) {
JSONObject params = jsonObject.getJSONObject("params");
if (params == null) {
log.warn("[parsePropertiesFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject);
return Map.of();
}
return params;
}
/**
* 发送响应消息
*
* @param topic 原始主题
* @param jsonObject 原始消息 JSON 对象
* @param method 响应方法
*/
private void sendResponse(String topic, JSONObject jsonObject, String method) {
String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
// 构建响应消息
JSONObject response = new JSONObject();
response.set("id", jsonObject.getStr("id"));
response.set("code", 200);
response.set("method", method);
response.set("data", new JSONObject());
// 发送响应
protocol.publishMessage(replyTopic, response.toString());
log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic);
}
}

View File

@ -0,0 +1,121 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* IoT 网关 MQTT 服务处理器
* <p>
* 处理设备服务调用相关的 MQTT 消息
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotMqttServiceHandler extends IotMqttAbstractHandler {
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer;
@Override
public void handle(String topic, String payload) {
try {
log.info("[handle][接收到设备服务调用响应][topic: {}]", topic);
// 解析消息内容
JSONObject jsonObject = JSONUtil.parseObj(payload);
String[] topicParts = parseTopic(topic);
if (topicParts == null) {
return;
}
// 构建设备消息
String productKey = topicParts[2];
String deviceName = topicParts[3];
String serviceIdentifier = getServiceIdentifier(topicParts, topic);
if (serviceIdentifier == null) {
return;
}
Map<String, Object> serviceData = parseServiceDataFromPayload(jsonObject);
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId());
// 设置服务消息类型和标识符
message.setType("service");
message.setIdentifier(serviceIdentifier);
message.setData(serviceData);
// 发送消息
deviceMessageProducer.sendDeviceMessage(message);
log.info("[handle][处理设备服务调用响应成功][topic: {}]", topic);
// 发送响应消息
String method = "thing.service." + serviceIdentifier;
sendResponse(topic, jsonObject, method);
} catch (Exception e) {
log.error("[handle][处理设备服务调用响应失败][topic: {}][payload: {}]", topic, payload, e);
}
}
/**
* 从主题部分中获取服务标识符
*
* @param topicParts 主题各部分
* @param topic 原始主题用于日志
* @return 服务标识符如果获取失败返回 null
*/
private String getServiceIdentifier(String[] topicParts, String topic) {
try {
// 服务主题格式/sys/{productKey}/{deviceName}/thing/service/{serviceIdentifier}
return topicParts[6];
} catch (ArrayIndexOutOfBoundsException e) {
log.warn("[getServiceIdentifier][无法从主题中获取服务标识符][topic: {}]", topic);
return null;
}
}
/**
* 从消息载荷解析服务数据
*
* @param jsonObject 消息 JSON 对象
* @return 服务数据映射
*/
private Map<String, Object> parseServiceDataFromPayload(JSONObject jsonObject) {
JSONObject params = jsonObject.getJSONObject("params");
if (params == null) {
log.warn("[parseServiceDataFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject);
return Map.of();
}
return params;
}
/**
* 发送响应消息
*
* @param topic 原始主题
* @param jsonObject 原始消息 JSON 对象
* @param method 响应方法
*/
private void sendResponse(String topic, JSONObject jsonObject, String method) {
String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
// 构建响应消息
JSONObject response = new JSONObject();
response.set("id", jsonObject.getStr("id"));
response.set("code", 200);
response.set("method", method);
response.set("data", new JSONObject());
// 发送响应
protocol.publishMessage(replyTopic, response.toString());
log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic);
}
}

View File

@ -0,0 +1,105 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 上行路由器
* <p>
* 根据消息主题路由到不同的处理器
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotMqttUpstreamRouter {
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer;
// 处理器实例
private IotMqttPropertyHandler propertyHandler;
private IotMqttEventHandler eventHandler;
private IotMqttServiceHandler serviceHandler;
public IotMqttUpstreamRouter(IotMqttUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
// 初始化处理器
this.propertyHandler = new IotMqttPropertyHandler(protocol, deviceMessageProducer);
this.eventHandler = new IotMqttEventHandler(protocol, deviceMessageProducer);
this.serviceHandler = new IotMqttServiceHandler(protocol, deviceMessageProducer);
}
/**
* 路由 MQTT 消息
*
* @param message MQTT 发布消息
*/
public void route(MqttPublishMessage message) {
String topic = message.topicName();
String payload = message.payload().toString();
log.info("[route][接收到 MQTT 消息][topic: {}][payload: {}]", topic, payload);
try {
if (StrUtil.isEmpty(payload)) {
log.warn("[route][消息内容为空][topic: {}]", topic);
return;
}
// 根据主题路由到相应的处理器
if (isPropertyTopic(topic)) {
propertyHandler.handle(topic, payload);
} else if (isEventTopic(topic)) {
eventHandler.handle(topic, payload);
} else if (isServiceTopic(topic)) {
serviceHandler.handle(topic, payload);
} else {
log.warn("[route][未知的消息类型][topic: {}]", topic);
}
} catch (Exception e) {
log.error("[route][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, payload, e);
}
}
/**
* 判断是否为属性相关主题
*
* @param topic 主题
* @return 是否为属性主题
*/
private boolean isPropertyTopic(String topic) {
return topic.endsWith(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic()) ||
topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic()) ||
topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic());
}
/**
* 判断是否为事件相关主题
*
* @param topic 主题
* @return 是否为事件主题
*/
private boolean isEventTopic(String topic) {
return topic.contains(IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic()) &&
topic.endsWith(IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic());
}
/**
* 判断是否为服务相关主题
*
* @param topic 主题
* @return 是否为服务主题
*/
private boolean isServiceTopic(String topic) {
return topic.contains(IotDeviceTopicEnum.SERVICE_TOPIC_PREFIX.getTopic()) &&
!isPropertyTopic(topic); // 排除属性相关的服务调用
}
}

View File

@ -0,0 +1,22 @@
/**
* MQTT 协议路由器包
* <p>
* 包含 MQTT 协议的所有路由处理器和抽象基类
* <ul>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAbstractHandler}
* - 抽象路由处理器基类</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter}
* - 上行消息路由器</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAuthRouter}
* - 认证路由器</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttPropertyHandler}
* - 属性处理器</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttEventHandler}
* - 事件处理器</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttServiceHandler}
* - 服务处理器</li>
* </ul>
*
* @author 芋道源码
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;

View File

@ -37,7 +37,7 @@ yudao:
# 针对引入的 EMQX 组件的配置
# ====================================
emqx:
enabled: false
enabled: true
mqtt-ssl: false
mqtt-topics:
- "/sys/#" # 系统主题(设备上报)