review:【IoT 物联网】MqTT 协议

This commit is contained in:
YunaiV 2025-06-13 23:13:29 +08:00
parent f1368d9e79
commit 7e49c90156
6 changed files with 68 additions and 104 deletions

View File

@ -23,12 +23,13 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
STATE_OFFLINE("thing.state.offline", true),
// ========== 设备属性 ==========
// 可参考
// https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
// 可参考https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
PROPERTY_POST("thing.property.post", true),
PROPERTY_SET("thing.property.set", false),
// ========== 设备事件 ==========
EVENT_POST("thing.event.post", true),
;

View File

@ -105,6 +105,7 @@ public class IotGatewayProperties {
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
// TODO @haohao是不是改成 httpPort不只认证目前看
/**
* HTTP 认证端口默认8090
*/
@ -115,42 +116,35 @@ public class IotGatewayProperties {
*/
@NotEmpty(message = "MQTT 服务器地址不能为空")
private String mqttHost;
/**
* MQTT 服务器端口默认1883
*/
@NotNull(message = "MQTT 服务器端口不能为空")
private Integer mqttPort = 1883;
/**
* MQTT 用户名
*/
@NotEmpty(message = "MQTT 用户名不能为空")
private String mqttUsername;
/**
* MQTT 密码
*/
@NotEmpty(message = "MQTT 密码不能为空")
private String mqttPassword;
/**
* MQTT 客户端的 SSL 开关
*/
@NotNull(message = "MQTT 是否开启 SSL 不能为空")
private Boolean mqttSsl = false;
/**
* MQTT 客户端 ID如果为空系统将自动生成
*/
private String mqttClientId;
/**
* MQTT 订阅的主题
*/
@NotEmpty(message = "MQTT 主题不能为空")
private List<String> mqttTopics;
private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics;
/**
* 默认 QoS 级别
* <p>
@ -164,12 +158,12 @@ public class IotGatewayProperties {
* 连接超时时间
*/
private Integer connectTimeoutSeconds = 10;
/**
* 重连延迟时间毫秒
*/
private Long reconnectDelayMs = 5000L;
// TODO @haohao貌似可以通过配置文件 + el 表达式尽量还是配置文件
/**
* 获取 MQTT 客户端 ID如果未配置则自动生成
*

View File

@ -35,6 +35,11 @@ public class IotMqttUpstreamProtocol {
private final IotGatewayProperties.EmqxProperties emqxProperties;
/**
* 服务运行状态标志
*/
private volatile boolean isRunning = false;
private Vertx vertx;
@Getter
@ -47,11 +52,6 @@ public class IotMqttUpstreamProtocol {
// HTTP 认证服务相关
private HttpServer httpAuthServer;
/**
* 服务运行状态标志
*/
private volatile boolean isRunning = false;
public IotMqttUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) {
this.emqxProperties = emqxProperties;
this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
@ -66,13 +66,12 @@ public class IotMqttUpstreamProtocol {
log.info("[start][启动 MQTT 协议服务]");
try {
// 1. 创建共享的 Vertx 实例
this.vertx = Vertx.vertx();
// 2. 启动 HTTP 认证服务
// 1. 启动 HTTP 认证服务
startHttpAuthServer();
// 3. 启动 MQTT 客户端
// 2. 启动 MQTT 客户端
startMqttClient();
isRunning = true;
@ -119,16 +118,15 @@ public class IotMqttUpstreamProtocol {
private void startHttpAuthServer() {
log.info("[startHttpAuthServer][启动 HTTP 认证服务]");
// 创建路由
// 1.1 创建路由
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// 创建认证处理器
// 1.2 创建认证处理器
IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler(this);
router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(authHandler::handleAuth);
router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(authHandler::handleEvent);
// 启动 HTTP 服务器
// 2. 启动 HTTP 服务器
int authPort = emqxProperties.getHttpAuthPort();
try {
httpAuthServer = vertx.createHttpServer()
@ -172,7 +170,7 @@ public class IotMqttUpstreamProtocol {
createMqttClient();
// 3. 连接 MQTT Broker异步连接不会抛出异常
connectMqtt();
connectMqtt(false);
log.info("[startMqttClient][MQTT 客户端启动完成,正在异步连接中...]");
} catch (Exception e) {
@ -211,13 +209,6 @@ public class IotMqttUpstreamProtocol {
}
}
/**
* 连接 MQTT Broker 并订阅主题
*/
private void connectMqtt() {
connectMqtt(false);
}
/**
* 连接 MQTT Broker 并订阅主题
*
@ -227,6 +218,7 @@ public class IotMqttUpstreamProtocol {
// 1. 参数校验
String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort();
// TODO @haohao这些参数校验交给 validator
if (StrUtil.isBlank(host)) {
log.error("[connectMqtt][MQTT Host 为空, 无法连接]");
throw new IllegalArgumentException("MQTT Host 不能为空");
@ -246,6 +238,7 @@ public class IotMqttUpstreamProtocol {
// 2. 异步连接
mqttClient.connect(port, host, connectResult -> {
// TODO @haohaoif return减少括号哈
if (connectResult.succeeded()) {
if (isReconnect) {
log.info("[connectMqtt][MQTT 客户端重连成功, host: {}, port: {}]", host, port);
@ -253,16 +246,15 @@ public class IotMqttUpstreamProtocol {
log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
}
// 3. 设置处理器
// 设置处理器
setupMqttHandlers();
// 4. 订阅主题
// 订阅主题
subscribeToTopics();
} else {
log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}, isReconnect: {}]",
host, port, isReconnect, connectResult.cause());
// TODO @haohao体感上是不是首次必须连接成功类似 mysql首次要连接上然后后续可以重连
if (!isReconnect) {
// 首次连接失败时也要尝试重连
log.warn("[connectMqtt][首次连接失败,将开始重连机制]");
@ -279,14 +271,11 @@ public class IotMqttUpstreamProtocol {
* 创建 MQTT 客户端
*/
private void createMqttClient() {
log.debug("[createMqttClient][创建 MQTT 客户端, clientId: {}]", emqxProperties.getMqttClientId());
MqttClientOptions options = new MqttClientOptions()
.setClientId(emqxProperties.getMqttClientId())
.setUsername(emqxProperties.getMqttUsername())
.setPassword(emqxProperties.getMqttPassword())
.setSsl(emqxProperties.getMqttSsl());
this.mqttClient = MqttClient.create(vertx, options);
}
@ -294,6 +283,7 @@ public class IotMqttUpstreamProtocol {
* 设置 MQTT 处理器
*/
private void setupMqttHandlers() {
// TODO @haohaomqttClient 一定非空
if (mqttClient == null) {
log.warn("[setupMqttHandlers][MQTT 客户端为空,跳过处理器设置]");
return;
@ -311,6 +301,7 @@ public class IotMqttUpstreamProtocol {
});
// 设置消息处理器
// TODO @haohaoupstreamHandler 一定非空
if (upstreamHandler != null) {
mqttClient.publishHandler(upstreamHandler::handle);
log.debug("[setupMqttHandlers][MQTT 消息处理器设置完成]");
@ -328,7 +319,6 @@ public class IotMqttUpstreamProtocol {
log.warn("[subscribeToTopics][订阅主题列表为空, 跳过订阅]");
return;
}
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("[subscribeToTopics][MQTT 客户端未连接, 跳过订阅]");
return;
@ -337,10 +327,12 @@ public class IotMqttUpstreamProtocol {
int qos = emqxProperties.getMqttQos();
log.info("[subscribeToTopics][开始订阅主题, 共 {} 个, QoS: {}]", topicList.size(), qos);
// TODO @haohao使用 atomicinteger 会更合适
int[] successCount = { 0 }; // 使用数组以便在 lambda 中修改
int[] failCount = { 0 };
for (String topic : topicList) {
// TODO @haohaoMqttClient subscribe(Map<String, Integer> topics, 是不是更简洁哈
mqttClient.subscribe(topic, qos, subscribeResult -> {
if (subscribeResult.succeeded()) {
successCount[0]++;

View File

@ -15,7 +15,7 @@ import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 下行消息处理器
* <p>
* 从消息总线接收到下行消息然后发布到 MQTT Broker
* 从消息总线接收到下行消息然后发布到 MQTT Broker从而被设备所接收
*
* @author 芋道源码
*/
@ -23,7 +23,9 @@ import lombok.extern.slf4j.Slf4j;
public class IotMqttDownstreamHandler {
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceService deviceService;
private final IotDeviceMessageService deviceMessageService;
public IotMqttDownstreamHandler(IotMqttUpstreamProtocol protocol) {
@ -41,25 +43,24 @@ public class IotMqttDownstreamHandler {
// 1. 获取设备信息使用缓存
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
if (deviceInfo == null) {
log.warn("[handle][设备信息不存在, deviceId: {}]", message.getDeviceId());
log.error("[handle][设备信息({})不存在]", message.getDeviceId());
return;
}
// 2. 根据方法构建主题
// 2.1 根据方法构建主题
String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (StrUtil.isBlank(topic)) {
log.warn("[handle][未知的消息方法: {}]", message.getMethod());
return;
}
// 3. 构建载荷
// 2.2 构建载荷
// TODO @haohao这里是不是 encode 就可以发拉因为本身就 json 化了
JSONObject payload = buildDownstreamPayload(message);
// 4. 发布消息
// 2.3 发布消息
protocol.publishMessage(topic, payload.toString());
log.debug("[handle][发布下行消息成功, method: {}, topic: {}]", message.getMethod(), topic);
}
// TODO @haohao这个是不是也可以计算IotDeviceMessageUtils isReplyMessage这样就直接生成了
/**
* 根据方法构建主题
*

View File

@ -35,12 +35,10 @@ public class IotMqttHttpAuthHandler {
* 认证允许结果
*/
private static final String RESULT_ALLOW = "allow";
/**
* 认证拒绝结果
*/
private static final String RESULT_DENY = "deny";
/**
* 认证忽略结果
*/
@ -53,14 +51,11 @@ public class IotMqttHttpAuthHandler {
private static final String EVENT_CLIENT_DISCONNECTED = "client.disconnected";
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceCommonApi deviceApi;
/**
* 构造器
*
* @param protocol MQTT 协议实例
*/
public IotMqttHttpAuthHandler(IotMqttUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
@ -72,19 +67,15 @@ public class IotMqttHttpAuthHandler {
*/
public void handleAuth(RoutingContext context) {
try {
// 解析请求体
// 参数校验
JsonObject body = parseRequestBody(context);
if (body == null) {
return;
}
String clientId = body.getString("clientid");
String username = body.getString("username");
String password = body.getString("password");
log.debug("[handleAuth][设备认证请求: clientId={}, username={}]", clientId, username);
// 参数校验
if (StrUtil.hasEmpty(clientId, username, password)) {
log.info("[handleAuth][认证参数不完整: clientId={}, username={}]", clientId, username);
sendAuthResponse(context, RESULT_DENY, false, "认证参数不完整");
@ -94,13 +85,13 @@ public class IotMqttHttpAuthHandler {
// 执行设备认证
boolean authResult = performDeviceAuth(clientId, username, password);
if (authResult) {
// TODO @haohao是不是两条 info直接打认证结果authResult
log.info("[handleAuth][设备认证成功: {}]", username);
sendAuthResponse(context, RESULT_ALLOW, false, null);
} else {
log.info("[handleAuth][设备认证失败: {}]", username);
sendAuthResponse(context, RESULT_DENY, false, DEVICE_AUTH_FAIL.getMsg());
}
} catch (Exception e) {
log.error("[handleAuth][设备认证异常]", e);
sendAuthResponse(context, RESULT_IGNORE, false, "认证服务异常");
@ -119,10 +110,8 @@ public class IotMqttHttpAuthHandler {
if (body == null) {
return;
}
String event = body.getString("event");
String username = body.getString("username");
log.debug("[handleEvent][收到事件: {} - {}]", event, username);
// 根据事件类型进行分发处理
@ -140,8 +129,8 @@ public class IotMqttHttpAuthHandler {
// EMQX Webhook 只需要 200 状态码无需响应体
context.response().setStatusCode(SUCCESS_STATUS_CODE).end();
} catch (Exception e) {
// TODO @haohaobody 可以打印出来
log.error("[handleEvent][事件处理失败]", e);
// 即使处理失败也返回 200 避免EMQX重试
context.response().setStatusCode(SUCCESS_STATUS_CODE).end();
@ -163,7 +152,6 @@ public class IotMqttHttpAuthHandler {
private void handleClientDisconnected(JsonObject body) {
String username = body.getString("username");
String reason = body.getString("reason");
log.info("[handleClientDisconnected][设备下线: {} ({})]", username, reason);
handleDeviceStateChange(username, false);
}
@ -184,6 +172,7 @@ public class IotMqttHttpAuthHandler {
}
return body;
} catch (Exception e) {
// TODO @haohao最好把 body 打印出来
log.error("[parseRequestBody][解析请求体失败]", e);
sendAuthResponse(context, RESULT_IGNORE, false, "请求体格式错误");
return null;
@ -201,10 +190,7 @@ public class IotMqttHttpAuthHandler {
private boolean performDeviceAuth(String clientId, String username, String password) {
try {
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password));
.setClientId(clientId).setUsername(username).setPassword(password));
result.checkError();
return BooleanUtil.isTrue(result.getData());
} catch (Exception e) {
@ -220,11 +206,10 @@ public class IotMqttHttpAuthHandler {
* @param online 是否在线
*/
private void handleDeviceStateChange(String username, boolean online) {
// 解析设备信息
if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
return;
}
// 解析设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.debug("[handleDeviceStateChange][跳过非设备连接: {}]", username);
@ -232,6 +217,7 @@ public class IotMqttHttpAuthHandler {
}
try {
// TODO @haohaoserverId 获取非空可以忽略掉
String serverId = protocol.getServerId();
if (StrUtil.isEmpty(serverId)) {
log.error("[handleDeviceStateChange][获取服务器ID失败]");
@ -241,15 +227,14 @@ public class IotMqttHttpAuthHandler {
// 构建设备状态消息
IotDeviceMessage message = online ? IotDeviceMessage.buildStateOnline()
: IotDeviceMessage.buildStateOffline();
// 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
// TODO @haohaoonline 不用翻译
log.info("[handleDeviceStateChange][设备状态更新: {}/{} -> {}]",
deviceInfo.getProductKey(), deviceInfo.getDeviceName(),
online ? "在线" : "离线");
} catch (Exception e) {
log.error("[handleDeviceStateChange][发送设备状态消息失败: {}]", username, e);
}
@ -277,10 +262,6 @@ public class IotMqttHttpAuthHandler {
// response.put("expire_at", System.currentTimeMillis() / 1000 + 3600);
// 记录详细的响应日志message仅用于日志不返回给EMQX
if (StrUtil.isNotEmpty(message)) {
log.debug("[sendAuthResponse][响应详情: result={}, message={}]", result, message);
}
context.response()
.setStatusCode(SUCCESS_STATUS_CODE)
.putHeader("Content-Type", "application/json; charset=utf-8")

View File

@ -20,6 +20,7 @@ import java.util.Arrays;
public class IotMqttUpstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final String serverId;
public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol) {
@ -30,56 +31,50 @@ public class IotMqttUpstreamHandler {
/**
* 处理 MQTT 发布消息
*/
public void handle(MqttPublishMessage message) {
String topic = message.topicName();
byte[] payload = message.payload().getBytes();
log.debug("[handle][收到 MQTT 消息, topic: {}]", topic);
public void handle(MqttPublishMessage mqttMessage) {
String topic = mqttMessage.topicName();
byte[] payload = mqttMessage.payload().getBytes();
try {
// 1. 前置校验
if (StrUtil.isBlank(topic)) {
log.warn("[handle][主题为空, 忽略消息]");
return;
}
// 注意payload 可以为空
// 2. 识别并验证消息类型
// 2.1 识别并验证消息类型
String messageType = getMessageType(topic);
// TODO @haohao可以使用 hutool 它的字符串拼接更简单
Assert.notNull(messageType, String.format("未知的消息类型, topic(%s)", topic));
log.debug("[handle][接收到上行消息({}), topic: {}]", messageType, topic);
// 3. 解析主题获取 productKey deviceName
// 2.2 解析主题获取 productKey deviceName
// TODO @haohao体感 getMessageType 和下面 split是不是一次就 ok 1split 223 位置是 productKeydeviceName34 开始还是 method
String[] topicParts = topic.split("/");
if (topicParts.length < 4) {
log.warn("[handle][主题格式不正确,无法解析 productKey 和 deviceName][topic: {}]", topic);
log.warn("[handle][topic({}) 格式不正确,无法解析 productKey 和 deviceName]", topic);
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
// TODO @haohao是不是要判断部分为空就不行呀
if (StrUtil.isAllBlank(productKey, deviceName)) {
log.warn("[handle][主题中 productKey 或 deviceName 为空][topic: {}]", topic);
log.warn("[handle][topic({}) 格式不正确productKey 和 deviceName 部分为空]", topic);
return;
}
// 4. 解码消息
IotDeviceMessage deviceMessage = deviceMessageService.decodeDeviceMessage(
payload, productKey, deviceName);
if (deviceMessage == null) {
log.warn("[handle][消息解码失败][topic: {}]", topic);
// 3. 解码消息
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[handle][topic({}) payload({}) 消息解码失败", topic, new String(payload));
return;
}
// 5. 发送消息到队列
deviceMessageService.sendDeviceMessage(deviceMessage, productKey, deviceName, serverId);
// 6. 记录成功日志
log.debug("[handle][处理上行消息({})成功, topic: {}]", messageType, topic);
// 4. 发送消息到队列
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
} catch (Exception e) {
log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, new String(payload), e);
log.error("[handle][topic({}) payload({}) 处理异常]", topic, new String(payload), e);
}
}
// TODO @haohao是不是 getMethodFromTopic
/**
* 从主题中获得消息类型
*
@ -89,9 +84,9 @@ public class IotMqttUpstreamHandler {
private String getMessageType(String topic) {
String[] topicParts = topic.split("/");
// 约定topic 4 个部分开始为消息类型
// 例如/sys/{productKey}/{deviceName}/thing/property/post ->
// thing/property/post
// 例如/sys/{productKey}/{deviceName}/thing/property/post -> thing/property/post
if (topicParts.length > 4) {
// TODO @haohao是不是 subString 3 性能更好
return String.join("/", Arrays.copyOfRange(topicParts, 4, topicParts.length));
}
return topicParts[topicParts.length - 1];