reactor:【IoT 物联网】重构 MQTT 客户端连接和订阅主题流程,更新配置属性

This commit is contained in:
haohao 2025-06-13 12:30:25 +08:00
parent 569eef4a74
commit 2737ffa116
7 changed files with 308 additions and 127 deletions

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.hutool.core.util.StrUtil;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@ -175,7 +176,7 @@ public class IotGatewayProperties {
* @return MQTT 客户端 ID
*/
public String getMqttClientId() {
if (cn.hutool.core.util.StrUtil.isBlank(mqttClientId)) {
if (StrUtil.isBlank(mqttClientId)) {
mqttClientId = "iot-gateway-mqtt-" + System.currentTimeMillis();
}
return mqttClientId;

View File

@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@ -45,13 +44,13 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
@Override
public void onMessage(IotDeviceMessage message) {
log.info("[onMessage][接收到下行消息][messageId: {}][method: {}][deviceId: {}]",
log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
try {
// 1. 校验
String method = message.getMethod();
if (method == null) {
log.warn("[onMessage][消息方法为空][messageId: {}][deviceId: {}]",
log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]",
message.getId(), message.getDeviceId());
return;
}
@ -59,20 +58,9 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
// 2. 处理下行消息
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败][messageId: {}][method: {}][deviceId: {}]",
log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId(), e);
}
}
/**
* 判断是否为上行消息
*
* @param method 消息方法
* @return 是否为上行消息
*/
private boolean isUpstreamMessage(String method) {
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(method);
return methodEnum != null && methodEnum.getUpstream();
}
}

View File

@ -21,7 +21,6 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* IoT 网关 MQTT 协议接收设备上行消息
@ -61,10 +60,10 @@ public class IotMqttUpstreamProtocol {
@PostConstruct
public void start() {
if (isRunning) {
log.warn("[start][MQTT 统一协议服务已经在运行中,请勿重复启动]");
log.warn("[start][MQTT 协议服务已经在运行中,请勿重复启动]");
return;
}
log.info("[start][开始启动 MQTT 统一协议服务]");
log.info("[start][启动 MQTT 协议服务]");
try {
// 1. 创建共享的 Vertx 实例
@ -77,9 +76,9 @@ public class IotMqttUpstreamProtocol {
startMqttClient();
isRunning = true;
log.info("[start][MQTT 统一协议服务启动完成]");
log.info("[start][MQTT 协议服务启动完成]");
} catch (Exception e) {
log.error("[start][MQTT 统一协议服务启动失败]", e);
log.error("[start][MQTT 协议服务启动失败]", e);
// 启动失败时清理资源
stop();
throw e;
@ -89,10 +88,10 @@ public class IotMqttUpstreamProtocol {
@PreDestroy
public void stop() {
if (!isRunning) {
log.warn("[stop][MQTT 统一协议服务已经停止,无需再次停止]");
log.warn("[stop][MQTT 协议服务已经停止,无需再次停止]");
return;
}
log.info("[stop][开始停止 MQTT 统一协议服务]");
log.info("[stop][停止 MQTT 协议服务]");
// 1. 停止 MQTT 客户端
stopMqttClient();
@ -104,28 +103,28 @@ public class IotMqttUpstreamProtocol {
if (vertx != null) {
try {
vertx.close();
log.info("[stop][Vertx 实例已关闭]");
log.debug("[stop][Vertx 实例已关闭]");
} catch (Exception e) {
log.warn("[stop][关闭 Vertx 实例失败]", e);
}
}
isRunning = false;
log.info("[stop][MQTT 统一协议服务已停止]");
log.info("[stop][MQTT 协议服务已停止]");
}
/**
* 启动 HTTP 认证服务
*/
private void startHttpAuthServer() {
log.info("[startHttpAuthServer][开始启动 HTTP 认证服务]");
log.info("[startHttpAuthServer][启动 HTTP 认证服务]");
// 创建路由
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// 创建认证处理器
IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler();
IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler(this);
router.post(IotMqttTopicUtils.MQTT_AUTH_AUTHENTICATE_PATH).handler(authHandler::authenticate);
router.post(IotMqttTopicUtils.MQTT_AUTH_CONNECTED_PATH).handler(authHandler::connected);
router.post(IotMqttTopicUtils.MQTT_AUTH_DISCONNECTED_PATH).handler(authHandler::disconnected);
@ -137,7 +136,7 @@ public class IotMqttUpstreamProtocol {
.requestHandler(router)
.listen(authPort)
.result();
log.info("[startHttpAuthServer][HTTP 认证服务启动成功,端口:{}]", authPort);
log.info("[startHttpAuthServer][HTTP 认证服务启动成功, 端口: {}]", authPort);
} catch (Exception e) {
log.error("[startHttpAuthServer][HTTP 认证服务启动失败]", e);
throw e;
@ -163,23 +162,24 @@ public class IotMqttUpstreamProtocol {
* 启动 MQTT 客户端
*/
private void startMqttClient() {
log.info("[startMqttClient][开始启动 MQTT 客户端]");
log.info("[startMqttClient][启动 MQTT 客户端]");
try {
// 1. 初始化消息处理器
this.upstreamHandler = new IotMqttUpstreamHandler(this);
// 2. 创建 MQTT 客户端
log.info("[startMqttClient][使用 MQTT 客户端 ID: {}]", emqxProperties.getMqttClientId());
createMqttClient();
MqttClientOptions options = new MqttClientOptions()
.setClientId(emqxProperties.getMqttClientId())
.setUsername(emqxProperties.getMqttUsername())
.setPassword(emqxProperties.getMqttPassword())
.setSsl(emqxProperties.getMqttSsl());
this.mqttClient = MqttClient.create(vertx, options);
// 3. 连接 MQTT Broker
// 3. 连接 MQTT Broker异步连接不会抛出异常
connectMqtt();
log.info("[startMqttClient][MQTT 客户端启动完成,正在异步连接中...]");
} catch (Exception e) {
log.error("[startMqttClient][MQTT 客户端启动失败]", e);
throw new RuntimeException("MQTT 客户端启动失败", e);
}
}
/**
@ -195,7 +195,7 @@ public class IotMqttUpstreamProtocol {
mqttClient.unsubscribe(topic);
log.debug("[stopMqttClient][取消订阅主题: {}]", topic);
} catch (Exception e) {
log.warn("[stopMqttClient][取消订阅主题异常: {}]", topic, e);
log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e);
}
}
}
@ -216,43 +216,107 @@ public class IotMqttUpstreamProtocol {
* 连接 MQTT Broker 并订阅主题
*/
private void connectMqtt() {
connectMqtt(false);
}
/**
* 连接 MQTT Broker 并订阅主题
*
* @param isReconnect 是否为重连
*/
private void connectMqtt(boolean isReconnect) {
// 1. 参数校验
String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort();
if (StrUtil.isBlank(host)) {
log.error("[connectMqtt][MQTT Host 为空,无法连接]");
log.error("[connectMqtt][MQTT Host 为空, 无法连接]");
throw new IllegalArgumentException("MQTT Host 不能为空");
}
if (port == null || port <= 0) {
log.error("[connectMqtt][MQTT Port 无效:{}]", port);
log.error("[connectMqtt][MQTT Port({}) 无效]", port);
throw new IllegalArgumentException("MQTT Port 必须为正整数");
}
log.info("[connectMqtt][开始连接 MQTT Broker][host: {}][port: {}]", host, port);
// 2. 连接
try {
mqttClient.connect(port, host)
.toCompletionStage()
.toCompletableFuture()
.get(emqxProperties.getConnectTimeoutSeconds(), TimeUnit.SECONDS);
log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port);
if (isReconnect) {
log.info("[connectMqtt][开始重连 MQTT Broker, host: {}, port: {}]", host, port);
// 重连时重新创建客户端实例
createMqttClient();
} else {
log.info("[connectMqtt][开始连接 MQTT Broker, host: {}, port: {}]", host, port);
}
// 2. 异步连接
mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) {
if (isReconnect) {
log.info("[connectMqtt][MQTT 客户端重连成功, host: {}, port: {}]", host, port);
} else {
log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
}
// 3. 设置处理器
// 3.1 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> {
log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
reconnectWithDelay();
});
// 3.2 设置消息处理器
mqttClient.publishHandler(upstreamHandler::handle);
log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]");
setupMqttHandlers();
// 4. 订阅主题
subscribeToTopics();
} catch (Exception e) {
log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, e);
reconnectWithDelay(); // 连接失败时也要尝试重连
throw new RuntimeException("MQTT 客户端启动失败", e);
} else {
log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}, isReconnect: {}]",
host, port, isReconnect, connectResult.cause());
if (!isReconnect) {
// 首次连接失败时也要尝试重连
log.warn("[connectMqtt][首次连接失败,将开始重连机制]");
reconnectWithDelay();
} else {
// 重连失败时继续尝试重连
reconnectWithDelay();
}
}
});
}
/**
* 创建 MQTT 客户端
*/
private void createMqttClient() {
log.debug("[createMqttClient][创建 MQTT 客户端, clientId: {}]", emqxProperties.getMqttClientId());
MqttClientOptions options = new MqttClientOptions()
.setClientId(emqxProperties.getMqttClientId())
.setUsername(emqxProperties.getMqttUsername())
.setPassword(emqxProperties.getMqttPassword())
.setSsl(emqxProperties.getMqttSsl());
this.mqttClient = MqttClient.create(vertx, options);
}
/**
* 设置 MQTT 处理器
*/
private void setupMqttHandlers() {
if (mqttClient == null) {
log.warn("[setupMqttHandlers][MQTT 客户端为空,跳过处理器设置]");
return;
}
// 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> {
log.warn("[closeHandler][MQTT 连接已断开, 准备重连]");
reconnectWithDelay();
});
// 设置异常处理器
mqttClient.exceptionHandler(exception -> {
log.error("[exceptionHandler][MQTT 客户端异常]", exception);
});
// 设置消息处理器
if (upstreamHandler != null) {
mqttClient.publishHandler(upstreamHandler::handle);
log.debug("[setupMqttHandlers][MQTT 消息处理器设置完成]");
} else {
log.warn("[setupMqttHandlers][上行消息处理器为空,跳过设置]");
}
}
@ -261,15 +325,43 @@ public class IotMqttUpstreamProtocol {
*/
private void subscribeToTopics() {
List<String> topicList = emqxProperties.getMqttTopics();
int qos = emqxProperties.getMqttQos();
if (CollUtil.isEmpty(topicList)) {
log.warn("[subscribeToTopics][订阅主题列表为空, 跳过订阅]");
return;
}
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("[subscribeToTopics][MQTT 客户端未连接, 跳过订阅]");
return;
}
int qos = emqxProperties.getMqttQos();
log.info("[subscribeToTopics][开始订阅主题, 共 {} 个, QoS: {}]", topicList.size(), qos);
int[] successCount = {0}; // 使用数组以便在 lambda 中修改
int[] failCount = {0};
log.info("[subscribeToTopics][开始订阅主题,共 {} 个]", topicList.size());
for (String topic : topicList) {
mqttClient.subscribe(topic, qos, subscribeResult -> {
if (subscribeResult.succeeded()) {
log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, qos);
successCount[0]++;
log.debug("[subscribeToTopics][订阅主题成功, topic: {}, qos: {}]", topic, qos);
// 当所有主题都处理完成时记录汇总日志
if (successCount[0] + failCount[0] == topicList.size()) {
log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]",
successCount[0], failCount[0], topicList.size());
}
} else {
log.error("[subscribeToTopics][订阅主题失败: {}]", topic, subscribeResult.cause());
failCount[0]++;
log.error("[subscribeToTopics][订阅主题失败, topic: {}, qos: {}, 原因: {}]",
topic, qos, subscribeResult.cause().getMessage(), subscribeResult.cause());
// 当所有主题都处理完成时记录汇总日志
if (successCount[0] + failCount[0] == topicList.size()) {
log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]",
successCount[0], failCount[0], topicList.size());
}
}
});
}
@ -281,15 +373,21 @@ public class IotMqttUpstreamProtocol {
private void reconnectWithDelay() {
long delay = emqxProperties.getReconnectDelayMs();
vertx.setTimer(delay, timerId -> {
if (!isRunning || (mqttClient != null && mqttClient.isConnected())) {
if (!isRunning) {
log.debug("[reconnectWithDelay][服务已停止, 取消重连]");
return;
}
log.info("[reconnectWithDelay][开始重连 MQTT Broker延迟 {} 毫秒]", delay);
// 检查连接状态如果已连接则无需重连
if (mqttClient != null && mqttClient.isConnected()) {
log.debug("[reconnectWithDelay][MQTT 客户端已连接, 无需重连]");
return;
}
log.info("[reconnectWithDelay][开始重连 MQTT Broker, 延迟: {} ms]", delay);
try {
connectMqtt();
connectMqtt(true); // 标记为重连
} catch (Exception e) {
log.error("[reconnectWithDelay][重连失败,将继续尝试重连]", e);
reconnectWithDelay(); // 失败后继续尝试
log.error("[reconnectWithDelay][重连失败, 将继续尝试]", e);
// 重连失败时不需要重复调用因为 connectMqtt(true) 内部已经处理了重连逻辑
}
});
}
@ -302,7 +400,7 @@ public class IotMqttUpstreamProtocol {
*/
public void publishMessage(String topic, String payload) {
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("[publishMessage][MQTT 客户端未连接,无法发布消息][topic: {}]", topic);
log.warn("[publishMessage][MQTT 客户端未连接, 无法发布消息到 topic({})]", topic);
return;
}
MqttQoS qos = MqttQoS.valueOf(emqxProperties.getMqttQos());

View File

@ -41,14 +41,14 @@ public class IotMqttDownstreamHandler {
// 1. 获取设备信息使用缓存
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
if (deviceInfo == null) {
log.warn("[handle][设备信息不存在][deviceId: {}]", message.getDeviceId());
log.warn("[handle][设备信息不存在, deviceId: {}]", message.getDeviceId());
return;
}
// 2. 根据方法构建主题
String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (StrUtil.isBlank(topic)) {
log.warn("[handle][未知的消息方法{}]", message.getMethod());
log.warn("[handle][未知的消息方法: {}]", message.getMethod());
return;
}
@ -57,7 +57,7 @@ public class IotMqttDownstreamHandler {
// 4. 发布消息
protocol.publishMessage(topic, payload.toString());
log.info("[handle][发布下行消息成功][method: {}][topic: {}]", message.getMethod(), topic);
log.debug("[handle][发布下行消息成功, method: {}, topic: {}]", message.getMethod(), topic);
}
/**

View File

@ -8,6 +8,7 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
@ -45,15 +46,28 @@ public class IotMqttHttpAuthHandler {
*/
private static final int INTERNAL_ERROR_STATUS_CODE = 500;
/**
* MQTT 协议实例用于获取服务器ID
*/
private final IotMqttUpstreamProtocol protocol;
/**
* 构造器
*
* @param protocol MQTT 协议实例
*/
public IotMqttHttpAuthHandler(IotMqttUpstreamProtocol protocol) {
this.protocol = protocol;
}
/**
* EMQX 认证接口
*/
public void authenticate(RoutingContext context) {
try {
// 解析请求体
JsonObject body = context.body().asJsonObject();
JsonObject body = parseRequestBody(context);
if (body == null) {
sendErrorResponse(context, 400, "请求体不能为空");
return;
}
@ -61,34 +75,23 @@ public class IotMqttHttpAuthHandler {
String username = body.getString("username");
String password = body.getString("password");
log.info("[authenticate][EMQX 设备认证请求][clientId: {}][username: {}]", clientid, username);
log.debug("[authenticate][EMQX 设备认证, clientId: {}, username: {}]", clientid, username);
// 参数校验
if (StrUtil.isEmpty(clientid) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) {
log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientid, username);
sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "认证参数不完整");
if (!validateAuthParams(context, clientid, username, password)) {
return;
}
// 执行设备认证
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(clientid)
.setUsername(username)
.setPassword(password));
result.checkError();
if (!BooleanUtil.isTrue(result.getData())) {
log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientid, username);
sendErrorResponse(context, UNAUTHORIZED_STATUS_CODE, DEVICE_AUTH_FAIL.getMsg());
if (!performDeviceAuth(context, clientid, username, password)) {
return;
}
log.info("[authenticate][设备认证成功][clientId: {}][username: {}]", clientid, username);
log.debug("[authenticate][设备认证成功, clientId: {}, username: {}]", clientid, username);
sendSuccessResponse(context, "认证成功");
} catch (Exception e) {
log.error("[authenticate][设备认证异常]", e);
log.error("[authenticate][设备认证异常, 详细信息: {}]", e.getMessage(), e);
sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常");
}
}
@ -99,9 +102,8 @@ public class IotMqttHttpAuthHandler {
public void connected(RoutingContext context) {
try {
// 解析请求体
JsonObject body = context.body().asJsonObject();
JsonObject body = parseRequestBody(context);
if (body == null) {
sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空");
return;
}
@ -109,13 +111,14 @@ public class IotMqttHttpAuthHandler {
String username = body.getString("username");
Long timestamp = body.getLong("timestamp");
log.info("[connected][设备连接事件][clientId: {}][username: {}]", clientid, username);
log.debug("[connected][设备连接, clientId: {}, username: {}, timestamp: {}]",
clientid, username, timestamp);
handleDeviceStateChange(username, true);
handleDeviceStateChange(username, true, "设备连接");
sendSuccessResponse(context, "处理成功");
} catch (Exception e) {
log.error("[connected][处理设备连接事件失败]", e);
log.error("[connected][处理设备连接事件失败, 详细信息: {}]", e.getMessage(), e);
sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败");
}
}
@ -126,9 +129,8 @@ public class IotMqttHttpAuthHandler {
public void disconnected(RoutingContext context) {
try {
// 解析请求体
JsonObject body = context.body().asJsonObject();
JsonObject body = parseRequestBody(context);
if (body == null) {
sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空");
return;
}
@ -137,58 +139,144 @@ public class IotMqttHttpAuthHandler {
String reason = body.getString("reason");
Long timestamp = body.getLong("timestamp");
log.info("[disconnected][设备断开连接事件][clientId: {}][username: {}][reason: {}]",
clientid, username, reason);
log.debug("[disconnected][设备断开连接, clientId: {}, username: {}, reason: {}, timestamp: {}]",
clientid, username, reason, timestamp);
handleDeviceStateChange(username, false);
handleDeviceStateChange(username, false, "设备断开连接,原因:" + reason);
sendSuccessResponse(context, "处理成功");
} catch (Exception e) {
log.error("[disconnected][处理设备断开连接事件失败]", e);
log.error("[disconnected][处理设备断开连接事件失败, 详细信息: {}]", e.getMessage(), e);
sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败");
}
}
/**
* 解析请求体
*
* @param context 路由上下文
* @return 请求体JSON对象解析失败时返回null
*/
private JsonObject parseRequestBody(RoutingContext context) {
try {
JsonObject body = context.body().asJsonObject();
if (body == null) {
log.warn("[parseRequestBody][请求体为空]");
sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空");
return null;
}
return body;
} catch (Exception e) {
log.error("[parseRequestBody][解析请求体失败]", e);
sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体格式错误");
return null;
}
}
/**
* 验证认证参数
*
* @param context 路由上下文
* @param clientid 客户端ID
* @param username 用户名
* @param password 密码
* @return 验证是否通过
*/
private boolean validateAuthParams(RoutingContext context, String clientid, String username, String password) {
if (StrUtil.hasEmpty(clientid, username, password)) {
log.warn("[validateAuthParams][认证参数不完整, clientId: {}, username: {}, password: {}]",
clientid, username, StrUtil.isNotEmpty(password) ? "***" : "");
sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "认证参数不完整");
return false;
}
return true;
}
/**
* 执行设备认证
*
* @param context 路由上下文
* @param clientid 客户端ID
* @param username 用户名
* @param password 密码
* @return 认证是否成功
*/
private boolean performDeviceAuth(RoutingContext context, String clientid, String username, String password) {
try {
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(clientid)
.setUsername(username)
.setPassword(password));
result.checkError();
if (!BooleanUtil.isTrue(result.getData())) {
log.warn("[performDeviceAuth][设备认证失败, clientId: {}, username: {}]", clientid, username);
sendErrorResponse(context, UNAUTHORIZED_STATUS_CODE, DEVICE_AUTH_FAIL.getMsg());
return false;
}
return true;
} catch (Exception e) {
log.error("[performDeviceAuth][设备认证异常, clientId: {}, username: {}]", clientid, username, e);
sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常");
return false;
}
}
/**
* 处理设备状态变化
*
* @param username 用户名
* @param online 是否在线
* @param actionDesc 操作描述
*/
private void handleDeviceStateChange(String username, boolean online) {
private void handleDeviceStateChange(String username, boolean online, String actionDesc) {
if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
log.warn("[handleDeviceStateChange][用户名为空或未定义][username: {}]", username);
log.warn("[handleDeviceStateChange][用户名为空或'undefined', username: {}, action: {}]",
username, actionDesc);
return;
}
// 解析设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.warn("[handleDeviceStateChange][无法解析设备信息][username: {}]", username);
log.warn("[handleDeviceStateChange][无法从 username({}) 解析设备信息, action: {}]",
username, actionDesc);
return;
}
try {
// 获取服务器 ID
String serverId = "mqtt_auth_gateway";
// 从协议实例获取服务器 ID
String serverId = protocol.getServerId();
if (StrUtil.isEmpty(serverId)) {
log.error("[handleDeviceStateChange][获取服务器ID失败, username: {}, action: {}]",
username, actionDesc);
return;
}
// 构建设备状态消息
IotDeviceMessageService deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceMessage message;
if (online) {
message = IotDeviceMessage.buildStateOnline();
log.info("[handleDeviceStateChange][发送设备上线消息成功][username: {}]", username);
log.debug("[handleDeviceStateChange][发送设备上线消息, username: {}, serverId: {}]",
username, serverId);
} else {
message = IotDeviceMessage.buildStateOffline();
log.info("[handleDeviceStateChange][发送设备下线消息成功][username: {}]", username);
log.debug("[handleDeviceStateChange][发送设备下线消息, username: {}, serverId: {}]",
username, serverId);
}
// 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
log.info("[handleDeviceStateChange][{}处理成功, productKey: {}, deviceName: {}, serverId: {}]",
actionDesc, deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
} catch (Exception e) {
log.error("[handleDeviceStateChange][发送设备状态消息失败][username: {}][online: {}]",
username, online, e);
log.error("[handleDeviceStateChange][发送设备状态消息失败, username: {}, online: {}, action: {}]",
username, online, actionDesc, e);
}
}

View File

@ -9,6 +9,8 @@ import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import java.util.Arrays;
/**
* IoT 网关 MQTT 上行消息处理器
*
@ -32,12 +34,12 @@ public class IotMqttUpstreamHandler {
String topic = message.topicName();
byte[] payload = message.payload().getBytes();
log.debug("[handle][收到 MQTT 消息][topic: {}]", topic);
log.debug("[handle][收到 MQTT 消息, topic: {}]", topic);
try {
// 1. 前置校验
if (StrUtil.isBlank(topic)) {
log.warn("[validateInput][主题为空,忽略消息]");
log.warn("[handle][主题为空, 忽略消息]");
return;
}
// 注意payload 可以为空
@ -45,7 +47,7 @@ public class IotMqttUpstreamHandler {
// 2. 识别并验证消息类型
String messageType = getMessageType(topic);
Assert.notNull(messageType, String.format("未知的消息类型, topic(%s)", topic));
log.info("[handle][接收到{}][topic: {}]", messageType, topic);
log.debug("[handle][接收到上行消息({}), topic: {}]", messageType, topic);
// 3. 解析主题获取 productKey deviceName
String[] topicParts = topic.split("/");
@ -72,7 +74,7 @@ public class IotMqttUpstreamHandler {
deviceMessageService.sendDeviceMessage(deviceMessage, productKey, deviceName, serverId);
// 6. 记录成功日志
log.info("[handle][处理{}成功,已转发到 MQ][topic: {}]", messageType, topic);
log.debug("[handle][处理上行消息({})成功, topic: {}]", messageType, topic);
} catch (Exception e) {
log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, new String(payload), e);
}
@ -86,10 +88,13 @@ public class IotMqttUpstreamHandler {
*/
private String getMessageType(String topic) {
String[] topicParts = topic.split("/");
if (topicParts.length < 7) {
return null;
// 约定topic 4 个部分开始为消息类型
// 例如/sys/{productKey}/{deviceName}/thing/property/post ->
// thing/property/post
if (topicParts.length > 4) {
return String.join("/", Arrays.copyOfRange(topicParts, 4, topicParts.length));
}
return topicParts[3];
return topicParts[topicParts.length - 1];
}
}

View File

@ -37,6 +37,7 @@ yudao:
mqtt-port: 1883 # MQTT Broker 端口
mqtt-username: admin # MQTT 用户名
mqtt-password: public # MQTT 密码
mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID
mqtt-ssl: false # 是否开启 SSL
mqtt-topics:
- "/sys/#" # 系统主题