reactor:【IoT 物联网】重构 MQTT 认证和事件处理逻辑,符合 EMQX 官方规范

This commit is contained in:
haohao 2025-06-13 15:58:07 +08:00
parent 2737ffa116
commit f1368d9e79
3 changed files with 129 additions and 138 deletions

View File

@ -125,9 +125,8 @@ public class IotMqttUpstreamProtocol {
// 创建认证处理器
IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler(this);
router.post(IotMqttTopicUtils.MQTT_AUTH_AUTHENTICATE_PATH).handler(authHandler::authenticate);
router.post(IotMqttTopicUtils.MQTT_AUTH_CONNECTED_PATH).handler(authHandler::connected);
router.post(IotMqttTopicUtils.MQTT_AUTH_DISCONNECTED_PATH).handler(authHandler::disconnected);
router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(authHandler::handleAuth);
router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(authHandler::handleEvent);
// 启动 HTTP 服务器
int authPort = emqxProperties.getHttpAuthPort();
@ -338,8 +337,8 @@ public class IotMqttUpstreamProtocol {
int qos = emqxProperties.getMqttQos();
log.info("[subscribeToTopics][开始订阅主题, 共 {} 个, QoS: {}]", topicList.size(), qos);
int[] successCount = {0}; // 使用数组以便在 lambda 中修改
int[] failCount = {0};
int[] successCount = { 0 }; // 使用数组以便在 lambda 中修改
int[] failCount = { 0 };
for (String topic : topicList) {
mqttClient.subscribe(topic, qos, subscribeResult -> {

View File

@ -27,29 +27,34 @@ import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVIC
public class IotMqttHttpAuthHandler {
/**
* 认证成功状态码
* HTTP 成功状态码EMQX 要求固定使用 200
*/
private static final int SUCCESS_STATUS_CODE = 200;
/**
* 参数错误状态码
* 认证允许结果
*/
private static final int BAD_REQUEST_STATUS_CODE = 400;
private static final String RESULT_ALLOW = "allow";
/**
* 认证失败状态码
* 认证拒绝结果
*/
private static final int UNAUTHORIZED_STATUS_CODE = 401;
private static final String RESULT_DENY = "deny";
/**
* 服务器错误状态码
* 认证忽略结果
*/
private static final int INTERNAL_ERROR_STATUS_CODE = 500;
private static final String RESULT_IGNORE = "ignore";
/**
* MQTT 协议实例用于获取服务器ID
* EMQX 事件类型常量
*/
private static final String EVENT_CLIENT_CONNECTED = "client.connected";
private static final String EVENT_CLIENT_DISCONNECTED = "client.disconnected";
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceCommonApi deviceApi;
/**
* 构造器
@ -58,12 +63,14 @@ public class IotMqttHttpAuthHandler {
*/
public IotMqttHttpAuthHandler(IotMqttUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
}
/**
* EMQX 认证接口
*/
public void authenticate(RoutingContext context) {
public void handleAuth(RoutingContext context) {
try {
// 解析请求体
JsonObject body = parseRequestBody(context);
@ -71,35 +78,41 @@ public class IotMqttHttpAuthHandler {
return;
}
String clientid = body.getString("clientid");
String clientId = body.getString("clientid");
String username = body.getString("username");
String password = body.getString("password");
log.debug("[authenticate][EMQX 设备认证, clientId: {}, username: {}]", clientid, username);
log.debug("[handleAuth][设备认证请求: clientId={}, username={}]", clientId, username);
// 参数校验
if (!validateAuthParams(context, clientid, username, password)) {
if (StrUtil.hasEmpty(clientId, username, password)) {
log.info("[handleAuth][认证参数不完整: clientId={}, username={}]", clientId, username);
sendAuthResponse(context, RESULT_DENY, false, "认证参数不完整");
return;
}
// 执行设备认证
if (!performDeviceAuth(context, clientid, username, password)) {
return;
boolean authResult = performDeviceAuth(clientId, username, password);
if (authResult) {
log.info("[handleAuth][设备认证成功: {}]", username);
sendAuthResponse(context, RESULT_ALLOW, false, null);
} else {
log.info("[handleAuth][设备认证失败: {}]", username);
sendAuthResponse(context, RESULT_DENY, false, DEVICE_AUTH_FAIL.getMsg());
}
log.debug("[authenticate][设备认证成功, clientId: {}, username: {}]", clientid, username);
sendSuccessResponse(context, "认证成功");
} catch (Exception e) {
log.error("[authenticate][设备认证异常, 详细信息: {}]", e.getMessage(), e);
sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常");
log.error("[handleAuth][设备认证异常]", e);
sendAuthResponse(context, RESULT_IGNORE, false, "认证服务异常");
}
}
/**
* EMQX 客户端连接事件钩子
* EMQX 统一事件处理接口
* 根据 EMQX 官方 Webhook 设计统一处理所有客户端事件
* 支持的事件类型client.connectedclient.disconnected
*/
public void connected(RoutingContext context) {
public void handleEvent(RoutingContext context) {
try {
// 解析请求体
JsonObject body = parseRequestBody(context);
@ -107,48 +120,52 @@ public class IotMqttHttpAuthHandler {
return;
}
String clientid = body.getString("clientid");
String event = body.getString("event");
String username = body.getString("username");
Long timestamp = body.getLong("timestamp");
log.debug("[connected][设备连接, clientId: {}, username: {}, timestamp: {}]",
clientid, username, timestamp);
log.debug("[handleEvent][收到事件: {} - {}]", event, username);
handleDeviceStateChange(username, true, "设备连接");
sendSuccessResponse(context, "处理成功");
// 根据事件类型进行分发处理
switch (event) {
case EVENT_CLIENT_CONNECTED:
handleClientConnected(body);
break;
case EVENT_CLIENT_DISCONNECTED:
handleClientDisconnected(body);
break;
default:
log.debug("[handleEvent][忽略事件: {}]", event);
break;
}
// EMQX Webhook 只需要200状态码无需响应体
context.response().setStatusCode(SUCCESS_STATUS_CODE).end();
} catch (Exception e) {
log.error("[connected][处理设备连接事件失败, 详细信息: {}]", e.getMessage(), e);
sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败");
log.error("[handleEvent][事件处理失败]", e);
// 即使处理失败也返回200避免EMQX重试
context.response().setStatusCode(SUCCESS_STATUS_CODE).end();
}
}
/**
* EMQX 客户端断开连接事件钩子
* 处理客户端连接事件
*/
public void disconnected(RoutingContext context) {
try {
// 解析请求体
JsonObject body = parseRequestBody(context);
if (body == null) {
return;
}
private void handleClientConnected(JsonObject body) {
String username = body.getString("username");
log.info("[handleClientConnected][设备上线: {}]", username);
handleDeviceStateChange(username, true);
}
String clientid = body.getString("clientid");
String username = body.getString("username");
String reason = body.getString("reason");
Long timestamp = body.getLong("timestamp");
/**
* 处理客户端断开连接事件
*/
private void handleClientDisconnected(JsonObject body) {
String username = body.getString("username");
String reason = body.getString("reason");
log.debug("[disconnected][设备断开连接, clientId: {}, username: {}, reason: {}, timestamp: {}]",
clientid, username, reason, timestamp);
handleDeviceStateChange(username, false, "设备断开连接,原因:" + reason);
sendSuccessResponse(context, "处理成功");
} catch (Exception e) {
log.error("[disconnected][处理设备断开连接事件失败, 详细信息: {}]", e.getMessage(), e);
sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败");
}
log.info("[handleClientDisconnected][设备下线: {} ({})]", username, reason);
handleDeviceStateChange(username, false);
}
/**
@ -161,142 +178,113 @@ public class IotMqttHttpAuthHandler {
try {
JsonObject body = context.body().asJsonObject();
if (body == null) {
log.warn("[parseRequestBody][请求体为空]");
sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空");
log.info("[parseRequestBody][请求体为空]");
sendAuthResponse(context, RESULT_IGNORE, false, "请求体不能为空");
return null;
}
return body;
} catch (Exception e) {
log.error("[parseRequestBody][解析请求体失败]", e);
sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体格式错误");
sendAuthResponse(context, RESULT_IGNORE, false, "请求体格式错误");
return null;
}
}
/**
* 验证认证参数
*
* @param context 路由上下文
* @param clientid 客户端ID
* @param username 用户名
* @param password 密码
* @return 验证是否通过
*/
private boolean validateAuthParams(RoutingContext context, String clientid, String username, String password) {
if (StrUtil.hasEmpty(clientid, username, password)) {
log.warn("[validateAuthParams][认证参数不完整, clientId: {}, username: {}, password: {}]",
clientid, username, StrUtil.isNotEmpty(password) ? "***" : "");
sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "认证参数不完整");
return false;
}
return true;
}
/**
* 执行设备认证
*
* @param context 路由上下文
* @param clientid 客户端ID
* @param clientId 客户端ID
* @param username 用户名
* @param password 密码
* @return 认证是否成功
*/
private boolean performDeviceAuth(RoutingContext context, String clientid, String username, String password) {
private boolean performDeviceAuth(String clientId, String username, String password) {
try {
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(clientid)
.setClientId(clientId)
.setUsername(username)
.setPassword(password));
result.checkError();
if (!BooleanUtil.isTrue(result.getData())) {
log.warn("[performDeviceAuth][设备认证失败, clientId: {}, username: {}]", clientid, username);
sendErrorResponse(context, UNAUTHORIZED_STATUS_CODE, DEVICE_AUTH_FAIL.getMsg());
return false;
}
return true;
return BooleanUtil.isTrue(result.getData());
} catch (Exception e) {
log.error("[performDeviceAuth][设备认证异常, clientId: {}, username: {}]", clientid, username, e);
sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常");
return false;
log.error("[performDeviceAuth][认证接口调用失败: {}]", username, e);
throw e;
}
}
/**
* 处理设备状态变化
*
* @param username 用户名
* @param online 是否在线
* @param actionDesc 操作描述
* @param username 用户名
* @param online 是否在线
*/
private void handleDeviceStateChange(String username, boolean online, String actionDesc) {
private void handleDeviceStateChange(String username, boolean online) {
if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
log.warn("[handleDeviceStateChange][用户名为空或'undefined', username: {}, action: {}]",
username, actionDesc);
return;
}
// 解析设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.warn("[handleDeviceStateChange][无法从 username({}) 解析设备信息, action: {}]",
username, actionDesc);
log.debug("[handleDeviceStateChange][跳过非设备连接: {}]", username);
return;
}
try {
// 从协议实例获取服务器 ID
String serverId = protocol.getServerId();
if (StrUtil.isEmpty(serverId)) {
log.error("[handleDeviceStateChange][获取服务器ID失败, username: {}, action: {}]",
username, actionDesc);
log.error("[handleDeviceStateChange][获取服务器ID失败]");
return;
}
// 构建设备状态消息
IotDeviceMessageService deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceMessage message;
if (online) {
message = IotDeviceMessage.buildStateOnline();
log.debug("[handleDeviceStateChange][发送设备上线消息, username: {}, serverId: {}]",
username, serverId);
} else {
message = IotDeviceMessage.buildStateOffline();
log.debug("[handleDeviceStateChange][发送设备下线消息, username: {}, serverId: {}]",
username, serverId);
}
IotDeviceMessage message = online ? IotDeviceMessage.buildStateOnline()
: IotDeviceMessage.buildStateOffline();
// 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
log.info("[handleDeviceStateChange][{}处理成功, productKey: {}, deviceName: {}, serverId: {}]",
actionDesc, deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
log.info("[handleDeviceStateChange][设备状态更新: {}/{} -> {}]",
deviceInfo.getProductKey(), deviceInfo.getDeviceName(),
online ? "在线" : "离线");
} catch (Exception e) {
log.error("[handleDeviceStateChange][发送设备状态消息失败, username: {}, online: {}, action: {}]",
username, online, actionDesc, e);
log.error("[handleDeviceStateChange][发送设备状态消息失败: {}]", username, e);
}
}
/**
* 发送成功响应
* 发送 EMQX 认证响应
* 根据 EMQX 官方文档要求必须返回 JSON 格式响应
*
* @param context 路由上下文
* @param result 认证结果allowdenyignore
* @param isSuperuser 是否超级用户
* @param message 日志消息仅用于日志记录不返回给EMQX
*/
private void sendSuccessResponse(RoutingContext context, String message) {
private void sendAuthResponse(RoutingContext context, String result, boolean isSuperuser, String message) {
// 构建符合 EMQX 官方规范的响应
JsonObject response = new JsonObject()
.put("result", result)
.put("is_superuser", isSuperuser);
// 可以根据业务需求添加客户端属性
// response.put("client_attrs", new JsonObject().put("role", "device"));
// 可以添加认证过期时间可选
// response.put("expire_at", System.currentTimeMillis() / 1000 + 3600);
// 记录详细的响应日志message仅用于日志不返回给EMQX
if (StrUtil.isNotEmpty(message)) {
log.debug("[sendAuthResponse][响应详情: result={}, message={}]", result, message);
}
context.response()
.setStatusCode(SUCCESS_STATUS_CODE)
.putHeader("Content-Type", "text/plain; charset=utf-8")
.end(message);
.putHeader("Content-Type", "application/json; charset=utf-8")
.end(response.encode());
}
/**
* 发送错误响应
*/
private void sendErrorResponse(RoutingContext context, int statusCode, String message) {
context.response()
.setStatusCode(statusCode)
.putHeader("Content-Type", "text/plain; charset=utf-8")
.end(message);
}
}

View File

@ -21,22 +21,26 @@ public final class IotMqttTopicUtils {
*/
private static final String SERVICE_TOPIC_PREFIX = "/thing/";
// ========== MQTT 认证路径常量 ==========
// ========== MQTT HTTP 接口路径常量 ==========
/**
* MQTT 认证路径
* MQTT 认证接口路径
* 对应 EMQX HTTP 认证插件的认证请求接口
*/
public static final String MQTT_AUTH_AUTHENTICATE_PATH = "/mqtt/auth/authenticate";
public static final String MQTT_AUTH_PATH = "/mqtt/auth";
/**
* MQTT 连接事件路径
* MQTT 统一事件处理接口路径
* 对应 EMQX Webhook 的统一事件处理接口支持所有客户端事件
* 包括client.connectedclient.disconnectedmessage.publish
*/
public static final String MQTT_AUTH_CONNECTED_PATH = "/mqtt/auth/connected";
public static final String MQTT_EVENT_PATH = "/mqtt/event";
/**
* MQTT 断开事件路径
* MQTT 授权接口路径预留
* 对应 EMQX HTTP 授权插件的授权检查接口
*/
public static final String MQTT_AUTH_DISCONNECTED_PATH = "/mqtt/auth/disconnected";
public static final String MQTT_AUTHZ_PATH = "/mqtt/authz";
// ========== 工具方法 ==========