From 21472e8dfad464b41dea9bd7b1479c7877e488c9 Mon Sep 17 00:00:00 2001 From: haohao <1036606149@qq.com> Date: Wed, 11 Jun 2025 23:29:24 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E6=96=B0=E5=A2=9E=20MQTT=20HTTP=20=E8=AE=A4?= =?UTF-8?q?=E8=AF=81=E5=8D=8F=E8=AE=AE=EF=BC=8C=E4=BC=98=E5=8C=96=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E8=AE=A4=E8=AF=81=E9=80=BB=E8=BE=91=EF=BC=8C=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=E4=BB=A5=E6=94=AF?= =?UTF-8?q?=E6=8C=81=20HTTP=20=E8=AE=A4=E8=AF=81=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 32 ++- .../gateway/config/IotGatewayProperties.java | 8 +- .../mqtt/IotMqttDownstreamSubscriber.java | 22 +- .../mqtt/IotMqttHttpAuthProtocol.java | 96 +++++++++ .../mqtt/IotMqttUpstreamProtocol.http | 139 +++++++++++++ .../mqtt/IotMqttUpstreamProtocol.java | 17 -- .../mqtt/router/IotMqttAuthRouter.java | 149 -------------- .../mqtt/router/IotMqttEventHandler.java | 4 +- .../mqtt/router/IotMqttHttpAuthHandler.java | 192 ++++++++++++++++++ .../mqtt/router/IotMqttPropertyHandler.java | 8 +- .../mqtt/router/IotMqttServiceHandler.java | 4 +- .../mqtt/router/IotMqttUpstreamRouter.java | 2 +- .../src/main/resources/application-local.yaml | 14 +- 13 files changed, 487 insertions(+), 200 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttHttpAuthProtocol.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.http delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAuthRouter.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 8fff35a472..f88f1f9db1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -3,6 +3,9 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttHttpAuthProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -42,17 +45,24 @@ public class IotGatewayConfiguration { @Slf4j public static class MqttProtocolConfiguration { - // TODO @haohao:临时注释,避免报错 -// @Bean -// public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) { -// return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); -// } -// -// @Bean -// public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol, -// IotMessageBus messageBus) { -// return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus); -// } + @Bean + public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) { + return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); + } + + @Bean + public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol, + IotMessageBus messageBus) { + return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus); + } + + /** + * MQTT HTTP 认证协议:提供 HTTP 认证接口供 EMQX 调用 + */ + @Bean + public IotMqttHttpAuthProtocol mqttHttpAuthProtocol(IotGatewayProperties gatewayProperties) { + return new IotMqttHttpAuthProtocol(gatewayProperties.getProtocol().getEmqx()); + } } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 46170d5c04..590d1e17e4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -103,6 +103,10 @@ public class IotGatewayProperties { */ @NotNull(message = "是否开启不能为空") private Boolean enabled; + /** + * HTTP 认证端口 + */ + private Integer httpAuthPort = 8090; /** * MQTT 服务器地址 */ @@ -127,10 +131,6 @@ public class IotGatewayProperties { * MQTT 主题 */ private List mqttTopics; - /** - * 认证端口 - */ - private Integer authPort; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java index 9102690ce5..8db8db0714 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -1,12 +1,14 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; import cn.hutool.json.JSONObject; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceCacheService; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.RequiredArgsConstructor; @@ -25,7 +27,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber 成功认证 +POST http://localhost:8090/mqtt/auth/authenticate +Content-Type: application/json + +{ + "clientid": "test_product.test_device", + "username": "test_device&test_product", + "password": "test_device_secret_hmac_password" +} + +### 2. MQTT 设备认证接口 => 参数不完整(clientid 为空) +POST http://localhost:8090/mqtt/auth/authenticate +Content-Type: application/json + +{ + "clientid": "", + "username": "test_device&test_product", + "password": "test_device_secret" +} + +### 3. MQTT 设备认证接口 => 参数不完整(username 为空) +POST http://localhost:8090/mqtt/auth/authenticate +Content-Type: application/json + +{ + "clientid": "test_product.test_device", + "username": "", + "password": "test_device_secret" +} + +### 4. MQTT 设备认证接口 => 参数不完整(password 为空) +POST http://localhost:8090/mqtt/auth/authenticate +Content-Type: application/json + +{ + "clientid": "test_product.test_device", + "username": "test_device&test_product", + "password": "" +} + +### 5. MQTT 设备认证接口 => 认证失败(错误的设备密钥) +POST http://localhost:8090/mqtt/auth/authenticate +Content-Type: application/json + +{ + "clientid": "test_product.test_device", + "username": "test_device&test_product", + "password": "wrong_password" +} + +### 6. EMQX 客户端连接事件钩子 => 设备上线 +POST http://localhost:8090/mqtt/auth/connected +Content-Type: application/json + +{ + "clientid": "test_product.test_device", + "username": "test_device&test_product", + "timestamp": 1703808000000, + "peername": "192.168.1.100:52036", + "sockname": "127.0.0.1:1883" +} + +### 7. EMQX 客户端连接事件钩子 => 用户名为空 +POST http://localhost:8090/mqtt/auth/connected +Content-Type: application/json + +{ + "clientid": "test_product.test_device", + "username": "", + "timestamp": 1703808000000, + "peername": "192.168.1.100:52036", + "sockname": "127.0.0.1:1883" +} + +### 8. EMQX 客户端断开连接事件钩子 => 设备下线(正常断开) +POST http://localhost:8090/mqtt/auth/disconnected +Content-Type: application/json + +{ + "clientid": "test_product.test_device", + "username": "test_device&test_product", + "reason": "normal", + "timestamp": 1703808060000 +} + +### 9. 使用自定义端口测试(如果配置了不同端口) +### 假设配置了端口 8091: +### POST http://localhost:8091/mqtt/auth/authenticate +### Content-Type: application/json +### +### { +### "clientid": "test_product.test_device", +### "username": "test_device&test_product", +### "password": "test_device_secret_hmac_password" +### } + +### EMQX 配置参考: +### authentication = [ +### { +### mechanism = http +### method = post +### url = "http://localhost:8090/mqtt/auth/authenticate" # 使用配置的端口 +### body = { +### clientid = "${clientid}" +### username = "${username}" +### password = "${password}" +### } +### headers = { +### "content-type" = "application/json" +### } +### } +### ] +### +### webhooks = [ +### { +### name = "client_connected" +### url = "http://localhost:8090/mqtt/auth/connected" # 使用配置的端口 +### events = ["client.connected"] +### }, +### { +### name = "client_disconnected" +### url = "http://localhost:8090/mqtt/auth/disconnected" # 使用配置的端口 +### events = ["client.disconnected"] +### } +### ] \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java index 5d1c581794..582bc28bd9 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java @@ -1,15 +1,11 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAuthRouter; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; @@ -43,8 +39,6 @@ public class IotMqttUpstreamProtocol { private Vertx vertx; private MqttClient mqttClient; private IotMqttUpstreamRouter messageRouter; - private IotMqttAuthRouter authRouter; - private IotDeviceMessageProducer deviceMessageProducer; /** * 服务运行状态标志 @@ -61,9 +55,7 @@ public class IotMqttUpstreamProtocol { // 初始化组件 this.vertx = Vertx.vertx(); - this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class); this.messageRouter = new IotMqttUpstreamRouter(this); - this.authRouter = new IotMqttAuthRouter(this); // 创建 MQTT 客户端 MqttClientOptions options = new MqttClientOptions() @@ -255,13 +247,4 @@ public class IotMqttUpstreamProtocol { return mqttClient; } - /** - * 获取认证路由器 - * - * @return 认证路由器 - */ - public IotMqttAuthRouter getAuthRouter() { - return authRouter; - } - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAuthRouter.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAuthRouter.java deleted file mode 100644 index fbb43e7ab7..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAuthRouter.java +++ /dev/null @@ -1,149 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; -import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 MQTT 认证路由器 - *

- * 处理设备的 MQTT 连接认证和连接状态管理 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -@Slf4j -public class IotMqttAuthRouter { - - private final IotMqttUpstreamProtocol protocol; - private final IotDeviceMessageProducer deviceMessageProducer; - private final IotDeviceTokenService deviceTokenService; - private final IotDeviceCommonApi deviceCommonApi; - private final IotDeviceMessageService deviceMessageService; - - public IotMqttAuthRouter(IotMqttUpstreamProtocol protocol) { - this.protocol = protocol; - this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class); - this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); - this.deviceCommonApi = SpringUtil.getBean(IotDeviceCommonApi.class); - this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); - } - - /** - * 处理设备认证 - * - * @param clientId 客户端 ID - * @param username 用户名 - * @param password 密码 - * @return 认证结果 - */ - public boolean authenticate(String clientId, String username, String password) { - try { - log.info("[authenticate][开始认证设备][clientId: {}][username: {}]", clientId, username); - - // 1. 参数校验 - if (StrUtil.isEmpty(clientId) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) { - log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientId, username); - return false; - } - - // 2. 执行认证 - CommonResult result = deviceCommonApi.authDevice(new IotDeviceAuthReqDTO() - .setClientId(clientId).setUsername(username).setPassword(password)); - result.checkError(); - if (!Boolean.TRUE.equals(result.getData())) { - log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientId, username); - return false; - } - - log.info("[authenticate][设备认证成功][clientId: {}][username: {}]", clientId, username); - return true; - } catch (Exception e) { - log.error("[authenticate][设备认证异常][clientId: {}][username: {}]", clientId, username, e); - return false; - } - } - - /** - * 处理设备连接事件 - * - * @param clientId 客户端 ID - * @param username 用户名 - */ - public void handleClientConnected(String clientId, String username) { - try { - log.info("[handleClientConnected][设备连接][clientId: {}][username: {}]", clientId, username); - - // 解析设备信息并发送上线消息 - handleDeviceStateChange(username, true); - } catch (Exception e) { - log.error("[handleClientConnected][处理设备连接事件异常][clientId: {}][username: {}]", clientId, username, e); - } - } - - /** - * 处理设备断开连接事件 - * - * @param clientId 客户端 ID - * @param username 用户名 - */ - public void handleClientDisconnected(String clientId, String username) { - try { - log.info("[handleClientDisconnected][设备断开连接][clientId: {}][username: {}]", clientId, username); - - // 解析设备信息并发送下线消息 - handleDeviceStateChange(username, false); - } catch (Exception e) { - log.error("[handleClientDisconnected][处理设备断开连接事件异常][clientId: {}][username: {}]", clientId, username, e); - } - } - - /** - * 处理设备状态变化 - * - * @param username 用户名 - * @param online 是否在线 - */ - private void handleDeviceStateChange(String username, boolean online) { - // 解析设备信息 - if (StrUtil.isEmpty(username) || "undefined".equals(username)) { - log.warn("[handleDeviceStateChange][用户名为空或未定义][username: {}]", username); - return; - } - - IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.parseUsername(username); - if (deviceInfo == null) { - log.warn("[handleDeviceStateChange][无法解析设备信息][username: {}]", username); - return; - } - - try { - // 使用 IotDeviceMessageService 构建设备状态消息 - IotDeviceMessage message; - if (online) { - message = deviceMessageService.buildDeviceMessageOfStateOnline( - deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId()); - log.info("[handleDeviceStateChange][发送设备上线消息成功][username: {}]", username); - } else { - message = deviceMessageService.buildDeviceMessageOfStateOffline( - deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId()); - log.info("[handleDeviceStateChange][发送设备下线消息成功][username: {}]", username); - } - - deviceMessageProducer.sendDeviceMessage(message); - } catch (Exception e) { - log.error("[handleDeviceStateChange][发送设备状态消息失败][username: {}][online: {}]", username, online, e); - } - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java index 40a38000be..6b74b9e4e3 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttEventHandler.java @@ -6,7 +6,7 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -49,7 +49,7 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler { // 使用 IotDeviceMessageService 解码消息 byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName, protocol.getServerId()); + messageBytes, productKey, deviceName); // 发送消息 deviceMessageProducer.sendDeviceMessage(message); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java new file mode 100644 index 0000000000..ce3cc83aea --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java @@ -0,0 +1,192 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT HTTP 认证处理器 + *

+ * 处理 EMQX 的认证请求和事件钩子 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttHttpAuthHandler { + + /** + * EMQX 认证接口 + */ + public void authenticate(RoutingContext context) { + try { + // 解析请求体 + JsonObject body = context.body().asJsonObject(); + if (body == null) { + sendErrorResponse(context, 400, "请求体不能为空"); + return; + } + + String clientid = body.getString("clientid"); + String username = body.getString("username"); + String password = body.getString("password"); + + log.info("[authenticate][EMQX 设备认证请求][clientId: {}][username: {}]", clientid, username); + + // 参数校验 + if (StrUtil.isEmpty(clientid) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) { + log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientid, username); + sendErrorResponse(context, 400, "认证参数不完整"); + return; + } + + // 执行设备认证 + IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() + .setClientId(clientid) + .setUsername(username) + .setPassword(password)); + + result.checkError(); + if (!BooleanUtil.isTrue(result.getData())) { + log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientid, username); + sendErrorResponse(context, 401, "设备认证失败"); + return; + } + + log.info("[authenticate][设备认证成功][clientId: {}][username: {}]", clientid, username); + sendSuccessResponse(context, "认证成功"); + + } catch (Exception e) { + log.error("[authenticate][设备认证异常]", e); + sendErrorResponse(context, 500, "认证服务异常"); + } + } + + /** + * EMQX 客户端连接事件钩子 + */ + public void connected(RoutingContext context) { + try { + // 解析请求体 + JsonObject body = context.body().asJsonObject(); + if (body == null) { + sendErrorResponse(context, 400, "请求体不能为空"); + return; + } + + String clientid = body.getString("clientid"); + String username = body.getString("username"); + Long timestamp = body.getLong("timestamp"); + + log.info("[connected][设备连接事件][clientId: {}][username: {}]", clientid, username); + + handleDeviceStateChange(username, true); + sendSuccessResponse(context, "处理成功"); + + } catch (Exception e) { + log.error("[connected][处理设备连接事件失败]", e); + sendErrorResponse(context, 500, "处理失败"); + } + } + + /** + * EMQX 客户端断开连接事件钩子 + */ + public void disconnected(RoutingContext context) { + try { + // 解析请求体 + JsonObject body = context.body().asJsonObject(); + if (body == null) { + sendErrorResponse(context, 400, "请求体不能为空"); + return; + } + + String clientid = body.getString("clientid"); + String username = body.getString("username"); + String reason = body.getString("reason"); + Long timestamp = body.getLong("timestamp"); + + log.info("[disconnected][设备断开连接事件][clientId: {}][username: {}][reason: {}]", + clientid, username, reason); + + handleDeviceStateChange(username, false); + sendSuccessResponse(context, "处理成功"); + + } catch (Exception e) { + log.error("[disconnected][处理设备断开连接事件失败]", e); + sendErrorResponse(context, 500, "处理失败"); + } + } + + /** + * 处理设备状态变化 + * + * @param username 用户名 + * @param online 是否在线 + */ + private void handleDeviceStateChange(String username, boolean online) { + if (StrUtil.isEmpty(username) || "undefined".equals(username)) { + log.warn("[handleDeviceStateChange][用户名为空或未定义][username: {}]", username); + return; + } + + // 解析设备信息 + IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username); + if (deviceInfo == null) { + log.warn("[handleDeviceStateChange][无法解析设备信息][username: {}]", username); + return; + } + + try { + // 获取服务器 ID + String serverId = "mqtt_auth_gateway"; + + // 构建设备状态消息 + IotDeviceMessageService deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + IotDeviceMessage message; + if (online) { + message = IotDeviceMessage.buildStateOnline(); + log.info("[handleDeviceStateChange][发送设备上线消息成功][username: {}]", username); + } else { + message = IotDeviceMessage.buildStateOffline(); + log.info("[handleDeviceStateChange][发送设备下线消息成功][username: {}]", username); + } + + // 发送消息到消息总线 + deviceMessageService.sendDeviceMessage(message, + deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId); + } catch (Exception e) { + log.error("[handleDeviceStateChange][发送设备状态消息失败][username: {}][online: {}]", + username, online, e); + } + } + + /** + * 发送成功响应 + */ + private void sendSuccessResponse(RoutingContext context, String message) { + context.response() + .setStatusCode(200) + .putHeader("Content-Type", "text/plain; charset=utf-8") + .end(message); + } + + /** + * 发送错误响应 + */ + private void sendErrorResponse(RoutingContext context, int statusCode, String message) { + context.response() + .setStatusCode(statusCode) + .putHeader("Content-Type", "text/plain; charset=utf-8") + .end(message); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java index bb00b4b8a9..64acb82964 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttPropertyHandler.java @@ -6,7 +6,7 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -65,7 +65,7 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler { // 使用 IotDeviceMessageService 解码消息 byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName, protocol.getServerId()); + messageBytes, productKey, deviceName); // 发送消息 deviceMessageProducer.sendDeviceMessage(message); @@ -100,7 +100,7 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler { // 使用 IotDeviceMessageService 解码消息 byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName, protocol.getServerId()); + messageBytes, productKey, deviceName); // 发送消息 deviceMessageProducer.sendDeviceMessage(message); @@ -132,7 +132,7 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler { // 使用 IotDeviceMessageService 解码消息 byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName, protocol.getServerId()); + messageBytes, productKey, deviceName); // 发送消息 deviceMessageProducer.sendDeviceMessage(message); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java index a63cf84f64..85947e75d3 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttServiceHandler.java @@ -6,7 +6,7 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -49,7 +49,7 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler { // 使用 IotDeviceMessageService 解码消息 byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8); IotDeviceMessage message = deviceMessageService.decodeDeviceMessage( - messageBytes, productKey, deviceName, protocol.getServerId()); + messageBytes, productKey, deviceName); // 发送消息 deviceMessageProducer.sendDeviceMessage(message); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java index 70e5a31b18..4b10ed0bc5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java @@ -5,7 +5,7 @@ import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml index 282434315b..ae9381c60e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml @@ -1,5 +1,4 @@ -server: - port: 8090 # IoT 网关服务端口 +# ==================== IoT 网关本地开发环境配置 ==================== --- #################### 消息队列相关 #################### @@ -26,16 +25,23 @@ yudao: # 针对引入的 HTTP 组件的配置 # ==================================== http: + enabled: true server-port: 8092 # ==================================== # 针对引入的 EMQX 组件的配置 # ==================================== emqx: + enabled: true + http-auth-port: 8090 # MQTT HTTP 认证服务端口 mqtt-host: 127.0.0.1 # MQTT Broker 地址 mqtt-port: 1883 # MQTT Broker 端口 mqtt-username: admin # MQTT 用户名 mqtt-password: public # MQTT 密码 - auth-port: 8101 # 认证端口 + mqtt-ssl: false # 是否开启 SSL + mqtt-topics: + - "/sys/#" # 系统主题(设备上报) + - "/ota/#" # OTA 升级主题 + - "/config/#" # 配置主题 # 消息总线配置 message-bus: @@ -50,4 +56,4 @@ logging: cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG # MQTT 客户端日志 - io.vertx.mqtt: DEBUG \ No newline at end of file + io.vertx.mqtt: DEBUG \ No newline at end of file