diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java index b09bd83705..398dbaa5dc 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java @@ -125,9 +125,8 @@ public class IotMqttUpstreamProtocol { // 创建认证处理器 IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler(this); - router.post(IotMqttTopicUtils.MQTT_AUTH_AUTHENTICATE_PATH).handler(authHandler::authenticate); - router.post(IotMqttTopicUtils.MQTT_AUTH_CONNECTED_PATH).handler(authHandler::connected); - router.post(IotMqttTopicUtils.MQTT_AUTH_DISCONNECTED_PATH).handler(authHandler::disconnected); + router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(authHandler::handleAuth); + router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(authHandler::handleEvent); // 启动 HTTP 服务器 int authPort = emqxProperties.getHttpAuthPort(); @@ -338,8 +337,8 @@ public class IotMqttUpstreamProtocol { int qos = emqxProperties.getMqttQos(); log.info("[subscribeToTopics][开始订阅主题, 共 {} 个, QoS: {}]", topicList.size(), qos); - int[] successCount = {0}; // 使用数组以便在 lambda 中修改 - int[] failCount = {0}; + int[] successCount = { 0 }; // 使用数组以便在 lambda 中修改 + int[] failCount = { 0 }; for (String topic : topicList) { mqttClient.subscribe(topic, qos, subscribeResult -> { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java index a24999c3cd..86e1c451b2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java @@ -27,29 +27,34 @@ import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVIC public class IotMqttHttpAuthHandler { /** - * 认证成功状态码 + * HTTP 成功状态码(EMQX 要求固定使用 200) */ private static final int SUCCESS_STATUS_CODE = 200; /** - * 参数错误状态码 + * 认证允许结果 */ - private static final int BAD_REQUEST_STATUS_CODE = 400; + private static final String RESULT_ALLOW = "allow"; /** - * 认证失败状态码 + * 认证拒绝结果 */ - private static final int UNAUTHORIZED_STATUS_CODE = 401; + private static final String RESULT_DENY = "deny"; /** - * 服务器错误状态码 + * 认证忽略结果 */ - private static final int INTERNAL_ERROR_STATUS_CODE = 500; + private static final String RESULT_IGNORE = "ignore"; /** - * MQTT 协议实例,用于获取服务器ID + * EMQX 事件类型常量 */ + private static final String EVENT_CLIENT_CONNECTED = "client.connected"; + private static final String EVENT_CLIENT_DISCONNECTED = "client.disconnected"; + private final IotMqttUpstreamProtocol protocol; + private final IotDeviceMessageService deviceMessageService; + private final IotDeviceCommonApi deviceApi; /** * 构造器 @@ -58,12 +63,14 @@ public class IotMqttHttpAuthHandler { */ public IotMqttHttpAuthHandler(IotMqttUpstreamProtocol protocol) { this.protocol = protocol; + this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); } /** * EMQX 认证接口 */ - public void authenticate(RoutingContext context) { + public void handleAuth(RoutingContext context) { try { // 解析请求体 JsonObject body = parseRequestBody(context); @@ -71,35 +78,41 @@ public class IotMqttHttpAuthHandler { return; } - String clientid = body.getString("clientid"); + String clientId = body.getString("clientid"); String username = body.getString("username"); String password = body.getString("password"); - log.debug("[authenticate][EMQX 设备认证, clientId: {}, username: {}]", clientid, username); + log.debug("[handleAuth][设备认证请求: clientId={}, username={}]", clientId, username); // 参数校验 - if (!validateAuthParams(context, clientid, username, password)) { + if (StrUtil.hasEmpty(clientId, username, password)) { + log.info("[handleAuth][认证参数不完整: clientId={}, username={}]", clientId, username); + sendAuthResponse(context, RESULT_DENY, false, "认证参数不完整"); return; } // 执行设备认证 - if (!performDeviceAuth(context, clientid, username, password)) { - return; + boolean authResult = performDeviceAuth(clientId, username, password); + if (authResult) { + log.info("[handleAuth][设备认证成功: {}]", username); + sendAuthResponse(context, RESULT_ALLOW, false, null); + } else { + log.info("[handleAuth][设备认证失败: {}]", username); + sendAuthResponse(context, RESULT_DENY, false, DEVICE_AUTH_FAIL.getMsg()); } - log.debug("[authenticate][设备认证成功, clientId: {}, username: {}]", clientid, username); - sendSuccessResponse(context, "认证成功"); - } catch (Exception e) { - log.error("[authenticate][设备认证异常, 详细信息: {}]", e.getMessage(), e); - sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常"); + log.error("[handleAuth][设备认证异常]", e); + sendAuthResponse(context, RESULT_IGNORE, false, "认证服务异常"); } } /** - * EMQX 客户端连接事件钩子 + * EMQX 统一事件处理接口 + * 根据 EMQX 官方 Webhook 设计,统一处理所有客户端事件 + * 支持的事件类型:client.connected、client.disconnected 等 */ - public void connected(RoutingContext context) { + public void handleEvent(RoutingContext context) { try { // 解析请求体 JsonObject body = parseRequestBody(context); @@ -107,48 +120,52 @@ public class IotMqttHttpAuthHandler { return; } - String clientid = body.getString("clientid"); + String event = body.getString("event"); String username = body.getString("username"); - Long timestamp = body.getLong("timestamp"); - log.debug("[connected][设备连接, clientId: {}, username: {}, timestamp: {}]", - clientid, username, timestamp); + log.debug("[handleEvent][收到事件: {} - {}]", event, username); - handleDeviceStateChange(username, true, "设备连接"); - sendSuccessResponse(context, "处理成功"); + // 根据事件类型进行分发处理 + switch (event) { + case EVENT_CLIENT_CONNECTED: + handleClientConnected(body); + break; + case EVENT_CLIENT_DISCONNECTED: + handleClientDisconnected(body); + break; + default: + log.debug("[handleEvent][忽略事件: {}]", event); + break; + } + + // EMQX Webhook 只需要200状态码,无需响应体 + context.response().setStatusCode(SUCCESS_STATUS_CODE).end(); } catch (Exception e) { - log.error("[connected][处理设备连接事件失败, 详细信息: {}]", e.getMessage(), e); - sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败"); + log.error("[handleEvent][事件处理失败]", e); + // 即使处理失败,也返回200避免EMQX重试 + context.response().setStatusCode(SUCCESS_STATUS_CODE).end(); } } /** - * EMQX 客户端断开连接事件钩子 + * 处理客户端连接事件 */ - public void disconnected(RoutingContext context) { - try { - // 解析请求体 - JsonObject body = parseRequestBody(context); - if (body == null) { - return; - } + private void handleClientConnected(JsonObject body) { + String username = body.getString("username"); + log.info("[handleClientConnected][设备上线: {}]", username); + handleDeviceStateChange(username, true); + } - String clientid = body.getString("clientid"); - String username = body.getString("username"); - String reason = body.getString("reason"); - Long timestamp = body.getLong("timestamp"); + /** + * 处理客户端断开连接事件 + */ + private void handleClientDisconnected(JsonObject body) { + String username = body.getString("username"); + String reason = body.getString("reason"); - log.debug("[disconnected][设备断开连接, clientId: {}, username: {}, reason: {}, timestamp: {}]", - clientid, username, reason, timestamp); - - handleDeviceStateChange(username, false, "设备断开连接,原因:" + reason); - sendSuccessResponse(context, "处理成功"); - - } catch (Exception e) { - log.error("[disconnected][处理设备断开连接事件失败, 详细信息: {}]", e.getMessage(), e); - sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败"); - } + log.info("[handleClientDisconnected][设备下线: {} ({})]", username, reason); + handleDeviceStateChange(username, false); } /** @@ -161,142 +178,113 @@ public class IotMqttHttpAuthHandler { try { JsonObject body = context.body().asJsonObject(); if (body == null) { - log.warn("[parseRequestBody][请求体为空]"); - sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空"); + log.info("[parseRequestBody][请求体为空]"); + sendAuthResponse(context, RESULT_IGNORE, false, "请求体不能为空"); return null; } return body; } catch (Exception e) { log.error("[parseRequestBody][解析请求体失败]", e); - sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体格式错误"); + sendAuthResponse(context, RESULT_IGNORE, false, "请求体格式错误"); return null; } } - /** - * 验证认证参数 - * - * @param context 路由上下文 - * @param clientid 客户端ID - * @param username 用户名 - * @param password 密码 - * @return 验证是否通过 - */ - private boolean validateAuthParams(RoutingContext context, String clientid, String username, String password) { - if (StrUtil.hasEmpty(clientid, username, password)) { - log.warn("[validateAuthParams][认证参数不完整, clientId: {}, username: {}, password: {}]", - clientid, username, StrUtil.isNotEmpty(password) ? "***" : "空"); - sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "认证参数不完整"); - return false; - } - return true; - } - /** * 执行设备认证 * - * @param context 路由上下文 - * @param clientid 客户端ID + * @param clientId 客户端ID * @param username 用户名 * @param password 密码 * @return 认证是否成功 */ - private boolean performDeviceAuth(RoutingContext context, String clientid, String username, String password) { + private boolean performDeviceAuth(String clientId, String username, String password) { try { - IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() - .setClientId(clientid) + .setClientId(clientId) .setUsername(username) .setPassword(password)); result.checkError(); - if (!BooleanUtil.isTrue(result.getData())) { - log.warn("[performDeviceAuth][设备认证失败, clientId: {}, username: {}]", clientid, username); - sendErrorResponse(context, UNAUTHORIZED_STATUS_CODE, DEVICE_AUTH_FAIL.getMsg()); - return false; - } - return true; + return BooleanUtil.isTrue(result.getData()); } catch (Exception e) { - log.error("[performDeviceAuth][设备认证异常, clientId: {}, username: {}]", clientid, username, e); - sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常"); - return false; + log.error("[performDeviceAuth][认证接口调用失败: {}]", username, e); + throw e; } } /** * 处理设备状态变化 * - * @param username 用户名 - * @param online 是否在线 - * @param actionDesc 操作描述 + * @param username 用户名 + * @param online 是否在线 */ - private void handleDeviceStateChange(String username, boolean online, String actionDesc) { + private void handleDeviceStateChange(String username, boolean online) { if (StrUtil.isEmpty(username) || "undefined".equals(username)) { - log.warn("[handleDeviceStateChange][用户名为空或'undefined', username: {}, action: {}]", - username, actionDesc); return; } // 解析设备信息 IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username); if (deviceInfo == null) { - log.warn("[handleDeviceStateChange][无法从 username({}) 解析设备信息, action: {}]", - username, actionDesc); + log.debug("[handleDeviceStateChange][跳过非设备连接: {}]", username); return; } try { - // 从协议实例获取服务器 ID String serverId = protocol.getServerId(); if (StrUtil.isEmpty(serverId)) { - log.error("[handleDeviceStateChange][获取服务器ID失败, username: {}, action: {}]", - username, actionDesc); + log.error("[handleDeviceStateChange][获取服务器ID失败]"); return; } // 构建设备状态消息 - IotDeviceMessageService deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); - IotDeviceMessage message; - if (online) { - message = IotDeviceMessage.buildStateOnline(); - log.debug("[handleDeviceStateChange][发送设备上线消息, username: {}, serverId: {}]", - username, serverId); - } else { - message = IotDeviceMessage.buildStateOffline(); - log.debug("[handleDeviceStateChange][发送设备下线消息, username: {}, serverId: {}]", - username, serverId); - } + IotDeviceMessage message = online ? IotDeviceMessage.buildStateOnline() + : IotDeviceMessage.buildStateOffline(); // 发送消息到消息总线 deviceMessageService.sendDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId); - log.info("[handleDeviceStateChange][{}处理成功, productKey: {}, deviceName: {}, serverId: {}]", - actionDesc, deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId); + log.info("[handleDeviceStateChange][设备状态更新: {}/{} -> {}]", + deviceInfo.getProductKey(), deviceInfo.getDeviceName(), + online ? "在线" : "离线"); } catch (Exception e) { - log.error("[handleDeviceStateChange][发送设备状态消息失败, username: {}, online: {}, action: {}]", - username, online, actionDesc, e); + log.error("[handleDeviceStateChange][发送设备状态消息失败: {}]", username, e); } } /** - * 发送成功响应 + * 发送 EMQX 认证响应 + * 根据 EMQX 官方文档要求,必须返回 JSON 格式响应 + * + * @param context 路由上下文 + * @param result 认证结果:allow、deny、ignore + * @param isSuperuser 是否超级用户 + * @param message 日志消息(仅用于日志记录,不返回给EMQX) */ - private void sendSuccessResponse(RoutingContext context, String message) { + private void sendAuthResponse(RoutingContext context, String result, boolean isSuperuser, String message) { + // 构建符合 EMQX 官方规范的响应 + JsonObject response = new JsonObject() + .put("result", result) + .put("is_superuser", isSuperuser); + + // 可以根据业务需求添加客户端属性 + // response.put("client_attrs", new JsonObject().put("role", "device")); + + // 可以添加认证过期时间(可选) + // response.put("expire_at", System.currentTimeMillis() / 1000 + 3600); + + // 记录详细的响应日志(message仅用于日志,不返回给EMQX) + if (StrUtil.isNotEmpty(message)) { + log.debug("[sendAuthResponse][响应详情: result={}, message={}]", result, message); + } + context.response() .setStatusCode(SUCCESS_STATUS_CODE) - .putHeader("Content-Type", "text/plain; charset=utf-8") - .end(message); + .putHeader("Content-Type", "application/json; charset=utf-8") + .end(response.encode()); } - /** - * 发送错误响应 - */ - private void sendErrorResponse(RoutingContext context, int statusCode, String message) { - context.response() - .setStatusCode(statusCode) - .putHeader("Content-Type", "text/plain; charset=utf-8") - .end(message); - } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java index 8e25fb14e6..270e2717ab 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java @@ -21,22 +21,26 @@ public final class IotMqttTopicUtils { */ private static final String SERVICE_TOPIC_PREFIX = "/thing/"; - // ========== MQTT 认证路径常量 ========== + // ========== MQTT HTTP 接口路径常量 ========== /** - * MQTT 认证路径 + * MQTT 认证接口路径 + * 对应 EMQX HTTP 认证插件的认证请求接口 */ - public static final String MQTT_AUTH_AUTHENTICATE_PATH = "/mqtt/auth/authenticate"; + public static final String MQTT_AUTH_PATH = "/mqtt/auth"; /** - * MQTT 连接事件路径 + * MQTT 统一事件处理接口路径 + * 对应 EMQX Webhook 的统一事件处理接口,支持所有客户端事件 + * 包括:client.connected、client.disconnected、message.publish 等 */ - public static final String MQTT_AUTH_CONNECTED_PATH = "/mqtt/auth/connected"; + public static final String MQTT_EVENT_PATH = "/mqtt/event"; /** - * MQTT 断开事件路径 + * MQTT 授权接口路径(预留) + * 对应 EMQX HTTP 授权插件的授权检查接口 */ - public static final String MQTT_AUTH_DISCONNECTED_PATH = "/mqtt/auth/disconnected"; + public static final String MQTT_AUTHZ_PATH = "/mqtt/authz"; // ========== 工具方法 ==========