review:【IoT 物联网】mqtt 协议的 review

This commit is contained in:
YunaiV 2025-06-12 22:04:18 +08:00
parent 7b10b59541
commit d1b2fb41ae
6 changed files with 41 additions and 34 deletions

View File

@ -44,9 +44,6 @@ public class IotGatewayConfiguration {
@Slf4j @Slf4j
public static class MqttProtocolConfiguration { public static class MqttProtocolConfiguration {
/**
* MQTT 统一协议集成上行协议和HTTP认证协议
*/
@Bean @Bean
public IotMqttUpstreamProtocol iotMqttUnifiedProtocol(IotGatewayProperties gatewayProperties) { public IotMqttUpstreamProtocol iotMqttUnifiedProtocol(IotGatewayProperties gatewayProperties) {
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx());

View File

@ -11,7 +11,7 @@ import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler; import io.vertx.ext.web.handler.BodyHandler;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
* *
* @author 芋道源码 * @author 芋道源码
*/ */
@RequiredArgsConstructor
@Slf4j @Slf4j
public class IotHttpUpstreamProtocol extends AbstractVerticle { public class IotHttpUpstreamProtocol extends AbstractVerticle {
@ -27,6 +26,14 @@ public class IotHttpUpstreamProtocol extends AbstractVerticle {
private HttpServer httpServer; private HttpServer httpServer;
@Getter
private final String serverId;
public IotHttpUpstreamProtocol(IotGatewayProperties.HttpProperties httpProperties) {
this.httpProperties = httpProperties;
this.serverId = IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort());
}
@Override @Override
@PostConstruct @PostConstruct
public void start() { public void start() {
@ -67,8 +74,4 @@ public class IotHttpUpstreamProtocol extends AbstractVerticle {
} }
} }
public String getServerId() {
return IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort());
}
} }

View File

@ -61,6 +61,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
// 过滤上行消息下行订阅者只处理下行消息 // 过滤上行消息下行订阅者只处理下行消息
if (isUpstreamMessage(method)) { if (isUpstreamMessage(method)) {
// TODO @haohao打个 erroor log按道理不会发生
log.debug("[onMessage][忽略上行消息][method: {}][messageId: {}]", method, message.getId()); log.debug("[onMessage][忽略上行消息][method: {}][messageId: {}]", method, message.getId());
return; return;
} }
@ -149,6 +150,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
return null; return null;
} }
// TODO @haohao按道理说这里的应该是通过 encodeMessage
/** /**
* 构建下行消息载荷 * 构建下行消息载荷
* *

View File

@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
@ -18,34 +17,39 @@ import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions; import io.vertx.mqtt.MqttClientOptions;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
// TODO @haohao看看有没多余的 log可以不打噢
// TODO @haohao有没多余的注释可以去掉减少 ai 保持简洁
/** /**
* IoT 网关 MQTT 统一协议 * IoT 网关 MQTT 统一协议
* <p> * <p>
* 集成了 MQTT 上行协议和 HTTP 认证协议的功能 * 1. MQTT 客户端连接 EMQX消费处理设备上行和下行消息
* 1. MQTT 客户端连接 EMQX处理设备上行和下行消息 * 2. HTTP 认证服务 EMQX 提供设备认证连接断开接口
* 2. HTTP 认证服务 EMQX 提供设备认证接口
* *
* @author 芋道源码 * @author 芋道源码
*/ */
@Slf4j @Slf4j
public class IotMqttUpstreamProtocol { public class IotMqttUpstreamProtocol {
// TODO @haohao是不是也丢到配置里
/** /**
* 默认 QoS 级别 - 至少一次 * 默认 QoS 级别 - 至少一次
*/ */
private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE; private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE;
// TODO @haohao这个也是
/** /**
* 连接超时时间 * 连接超时时间
*/ */
private static final int CONNECT_TIMEOUT_SECONDS = 10; private static final int CONNECT_TIMEOUT_SECONDS = 10;
// TODO @haohao重连也是
/** /**
* 重连延迟时间毫秒 * 重连延迟时间毫秒
*/ */
@ -53,15 +57,18 @@ public class IotMqttUpstreamProtocol {
private final IotGatewayProperties.EmqxProperties emqxProperties; private final IotGatewayProperties.EmqxProperties emqxProperties;
// 共享资源
private Vertx vertx; private Vertx vertx;
@Getter
private final String serverId;
// MQTT 客户端相关 // MQTT 客户端相关
private MqttClient mqttClient; private MqttClient mqttClient;
private IotMqttUpstreamHandler upstreamHandler; private IotMqttUpstreamHandler upstreamHandler;
// HTTP 认证服务相关 // HTTP 认证服务相关
private HttpServer httpAuthServer; private HttpServer httpAuthServer;
// TODO @haohaoauthHandler 可以 local
private IotMqttHttpAuthHandler authHandler; private IotMqttHttpAuthHandler authHandler;
/** /**
@ -69,11 +76,9 @@ public class IotMqttUpstreamProtocol {
*/ */
private volatile boolean isRunning = false; private volatile boolean isRunning = false;
/**
* 构造函数
*/
public IotMqttUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) { public IotMqttUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) {
this.emqxProperties = emqxProperties; this.emqxProperties = emqxProperties;
this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
} }
@PostConstruct @PostConstruct
@ -98,6 +103,7 @@ public class IotMqttUpstreamProtocol {
isRunning = true; isRunning = true;
log.info("[start][MQTT 统一协议服务启动完成]"); log.info("[start][MQTT 统一协议服务启动完成]");
} catch (Exception e) { } catch (Exception e) {
// TODO @haohao失败是不是直接 System.exit
log.error("[start][MQTT 统一协议服务启动失败]", e); log.error("[start][MQTT 统一协议服务启动失败]", e);
// 启动失败时清理资源 // 启动失败时清理资源
stop(); stop();
@ -169,6 +175,7 @@ public class IotMqttUpstreamProtocol {
* 停止 HTTP 认证服务 * 停止 HTTP 认证服务
*/ */
private void stopHttpAuthServer() { private void stopHttpAuthServer() {
// TODO @haohao一些 if return 最好搞下
if (httpAuthServer != null) { if (httpAuthServer != null) {
try { try {
httpAuthServer.close().result(); httpAuthServer.close().result();
@ -195,8 +202,7 @@ public class IotMqttUpstreamProtocol {
.setClientId(emqxProperties.getMqttClientId()) .setClientId(emqxProperties.getMqttClientId())
.setUsername(emqxProperties.getMqttUsername()) .setUsername(emqxProperties.getMqttUsername())
.setPassword(emqxProperties.getMqttPassword()) .setPassword(emqxProperties.getMqttPassword())
.setSsl(ObjUtil.defaultIfNull(emqxProperties.getMqttSsl(), false)); .setSsl(emqxProperties.getMqttSsl());
this.mqttClient = MqttClient.create(vertx, options); this.mqttClient = MqttClient.create(vertx, options);
// 连接 MQTT Broker // 连接 MQTT Broker
@ -256,6 +262,7 @@ public class IotMqttUpstreamProtocol {
.toCompletionStage() .toCompletionStage()
.toCompletableFuture() .toCompletableFuture()
.thenAccept(connAck -> { .thenAccept(connAck -> {
// TODO @haohao是不是可以连接完然后在执行里面不用 通过 thenAccept
log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port); log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port);
// 设置断开重连监听器 // 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> { mqttClient.closeHandler(closeEvent -> {
@ -268,6 +275,7 @@ public class IotMqttUpstreamProtocol {
subscribeToTopics(); subscribeToTopics();
}) })
.exceptionally(error -> { .exceptionally(error -> {
// TODO @haohao这里的异常是不是不用重连哈因为直接就退出了然后有 closeHandler 监听重连了
log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, error); log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, error);
// 连接失败时也要尝试重连 // 连接失败时也要尝试重连
reconnectWithDelay(); reconnectWithDelay();
@ -297,16 +305,10 @@ public class IotMqttUpstreamProtocol {
*/ */
private void subscribeToTopics() { private void subscribeToTopics() {
List<String> topicList = emqxProperties.getMqttTopics(); List<String> topicList = emqxProperties.getMqttTopics();
// @NotEmpty 注解已保证 topicList 不为空无需重复校验
log.info("[subscribeToTopics][开始订阅主题,共 {} 个]", topicList.size()); log.info("[subscribeToTopics][开始订阅主题,共 {} 个]", topicList.size());
for (String topic : topicList) { for (String topic : topicList) {
if (StrUtil.isBlank(topic)) {
log.warn("[subscribeToTopics][跳过空主题]");
continue;
}
mqttClient.subscribe(topic, DEFAULT_QOS.value(), subscribeResult -> { mqttClient.subscribe(topic, DEFAULT_QOS.value(), subscribeResult -> {
if (subscribeResult.succeeded()) { if (subscribeResult.succeeded()) {
log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, DEFAULT_QOS.value()); log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, DEFAULT_QOS.value());
@ -322,6 +324,7 @@ public class IotMqttUpstreamProtocol {
*/ */
private void reconnectWithDelay() { private void reconnectWithDelay() {
vertx.setTimer(RECONNECT_DELAY_MS, timerId -> { vertx.setTimer(RECONNECT_DELAY_MS, timerId -> {
// TODO @haohaoif return括号少一些
if (isRunning && (mqttClient == null || !mqttClient.isConnected())) { if (isRunning && (mqttClient == null || !mqttClient.isConnected())) {
log.info("[reconnectWithDelay][开始重连 MQTT Broker延迟 {} 毫秒]", RECONNECT_DELAY_MS); log.info("[reconnectWithDelay][开始重连 MQTT Broker延迟 {} 毫秒]", RECONNECT_DELAY_MS);
try { try {
@ -350,11 +353,4 @@ public class IotMqttUpstreamProtocol {
} }
} }
/**
* 获取服务器 ID
*/
public String getServerId() {
return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
}
} }

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
// TODO @haohao是不是不用基类哈
/** /**
* IoT 网关 MQTT 协议的处理器抽象基类 * IoT 网关 MQTT 协议的处理器抽象基类
* <p> * <p>

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.extra.spring.SpringUtil; import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.messages.MqttPublishMessage; import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -20,9 +21,10 @@ import java.nio.charset.StandardCharsets;
public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
private final IotDeviceMessageService deviceMessageService; private final IotDeviceMessageService deviceMessageService;
private final String serverId; private final String serverId;
public IotMqttUpstreamHandler(cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol protocol) { public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol) {
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.serverId = protocol.getServerId(); this.serverId = protocol.getServerId();
} }
@ -32,6 +34,7 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
*/ */
public void handle(MqttPublishMessage message) { public void handle(MqttPublishMessage message) {
String topic = message.topicName(); String topic = message.topicName();
// TODO @haohao message.payload().getBytes();
String payload = message.payload().toString(StandardCharsets.UTF_8); String payload = message.payload().toString(StandardCharsets.UTF_8);
log.debug("[handle][收到 MQTT 消息][topic: {}]", topic); log.debug("[handle][收到 MQTT 消息][topic: {}]", topic);
@ -44,6 +47,7 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
// 1. 识别并验证消息类型 // 1. 识别并验证消息类型
String messageType = getMessageType(topic); String messageType = getMessageType(topic);
if (messageType == null) { if (messageType == null) {
// TODO @haohaolog 是不是把 payload 也打印下哈
log.warn("[doHandle][未知的消息类型][topic: {}]", topic); log.warn("[doHandle][未知的消息类型][topic: {}]", topic);
return; return;
} }
@ -56,9 +60,11 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
* 处理消息的统一逻辑 * 处理消息的统一逻辑
*/ */
private void processMessage(String topic, String payload, String messageType) { private void processMessage(String topic, String payload, String messageType) {
// TODO @haohaomessageType 解析是不是作用不大哈
log.info("[processMessage][接收到{}][topic: {}]", messageType, topic); log.info("[processMessage][接收到{}][topic: {}]", messageType, topic);
// 解析主题获取设备信息 // 解析主题获取设备信息
// TODO @haohao不一定是 7 个哈阿里云 topic 有点差异的可以考虑解析到 topicParts[2]topicParts[3] topic
String[] topicParts = parseTopic(topic); String[] topicParts = parseTopic(topic);
if (topicParts == null) { if (topicParts == null) {
return; return;
@ -66,19 +72,21 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
String productKey = topicParts[2]; String productKey = topicParts[2];
String deviceName = topicParts[3]; String deviceName = topicParts[3];
// TODO @haohao解析不到可以打个 error log
// 解码消息 // 解码消息
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
messageBytes, productKey, deviceName); messageBytes, productKey, deviceName);
// 发送消息到队列需要补充设备信息 // 发送消息到队列
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
// 记录成功日志 // 记录成功日志
log.info("[processMessage][处理{}成功][topic: {}]", messageType, topic); log.info("[processMessage][处理{}成功][topic: {}]", messageType, topic);
} }
// TODO @haohao合并下处理不搞成每个 topic 一个处理
/** /**
* 识别消息类型 * 识别消息类型
* *