Merge remote-tracking branch 'origin/feature/iot' into feature/iot
This commit is contained in:
commit
74db72c9c0
|
@ -3,6 +3,9 @@ package cn.iocoder.yudao.module.iot.gateway.config;
|
|||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttHttpAuthProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
|
@ -42,17 +45,24 @@ public class IotGatewayConfiguration {
|
|||
@Slf4j
|
||||
public static class MqttProtocolConfiguration {
|
||||
|
||||
// TODO @haohao:临时注释,避免报错
|
||||
// @Bean
|
||||
// public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) {
|
||||
// return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx());
|
||||
// }
|
||||
//
|
||||
// @Bean
|
||||
// public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol,
|
||||
// IotMessageBus messageBus) {
|
||||
// return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus);
|
||||
// }
|
||||
@Bean
|
||||
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) {
|
||||
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol,
|
||||
IotMessageBus messageBus) {
|
||||
return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus);
|
||||
}
|
||||
|
||||
/**
|
||||
* MQTT HTTP 认证协议:提供 HTTP 认证接口供 EMQX 调用
|
||||
*/
|
||||
@Bean
|
||||
public IotMqttHttpAuthProtocol mqttHttpAuthProtocol(IotGatewayProperties gatewayProperties) {
|
||||
return new IotMqttHttpAuthProtocol(gatewayProperties.getProtocol().getEmqx());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -103,6 +103,10 @@ public class IotGatewayProperties {
|
|||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
/**
|
||||
* HTTP 认证端口
|
||||
*/
|
||||
private Integer httpAuthPort = 8090;
|
||||
/**
|
||||
* MQTT 服务器地址
|
||||
*/
|
||||
|
@ -127,10 +131,6 @@ public class IotGatewayProperties {
|
|||
* MQTT 主题
|
||||
*/
|
||||
private List<String> mqttTopics;
|
||||
/**
|
||||
* 认证端口
|
||||
*/
|
||||
private Integer authPort;
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceCacheService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
@ -25,7 +27,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
private final IotMessageBus messageBus;
|
||||
|
||||
@Resource
|
||||
private IotDeviceCacheService deviceCacheService;
|
||||
private IotDeviceCommonApi deviceApi;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
|
@ -81,7 +83,9 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
String method = message.getMethod();
|
||||
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfoById(message.getDeviceId());
|
||||
IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO();
|
||||
reqDTO.setId(message.getDeviceId());
|
||||
IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData();
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handlePropertyMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
|
@ -116,7 +120,9 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
String method = message.getMethod();
|
||||
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfoById(message.getDeviceId());
|
||||
IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO();
|
||||
reqDTO.setId(message.getDeviceId());
|
||||
IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData();
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleServiceMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
|
@ -141,7 +147,9 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
*/
|
||||
private void handleConfigMessage(IotDeviceMessage message) {
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfoById(message.getDeviceId());
|
||||
IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO();
|
||||
reqDTO.setId(message.getDeviceId());
|
||||
IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData();
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleConfigMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
|
@ -163,7 +171,9 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
*/
|
||||
private void handleOtaMessage(IotDeviceMessage message) {
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfoById(message.getDeviceId());
|
||||
IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO();
|
||||
reqDTO.setId(message.getDeviceId());
|
||||
IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData();
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleOtaMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.handler.BodyHandler;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT HTTP 认证协议:提供 HTTP 认证接口供 EMQX 调用
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotMqttHttpAuthProtocol extends AbstractVerticle {
|
||||
|
||||
private final IotGatewayProperties.EmqxProperties emqxProperties;
|
||||
|
||||
private HttpServer httpServer;
|
||||
private Vertx vertx;
|
||||
|
||||
@Override
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
log.info("[start][开始启动 MQTT HTTP 认证协议服务]");
|
||||
|
||||
// 创建 Vertx 实例
|
||||
this.vertx = Vertx.vertx();
|
||||
|
||||
// 创建路由
|
||||
Router router = Router.router(vertx);
|
||||
router.route().handler(BodyHandler.create());
|
||||
|
||||
// 创建 MQTT 认证处理器
|
||||
IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler();
|
||||
|
||||
// 添加 MQTT 认证路由
|
||||
router.post("/mqtt/auth/authenticate").handler(authHandler::authenticate);
|
||||
router.post("/mqtt/auth/connected").handler(authHandler::connected);
|
||||
router.post("/mqtt/auth/disconnected").handler(authHandler::disconnected);
|
||||
|
||||
// 启动 HTTP 服务器,使用独立端口
|
||||
int authPort = getAuthPort();
|
||||
try {
|
||||
httpServer = vertx.createHttpServer()
|
||||
.requestHandler(router)
|
||||
.listen(authPort)
|
||||
.result();
|
||||
log.info("[start][MQTT HTTP 认证协议启动成功,端口:{}]", authPort);
|
||||
} catch (Exception e) {
|
||||
log.error("[start][MQTT HTTP 认证协议启动失败]", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (httpServer != null) {
|
||||
try {
|
||||
httpServer.close().result();
|
||||
log.info("[stop][MQTT HTTP 认证协议已停止]");
|
||||
} catch (Exception e) {
|
||||
log.error("[stop][MQTT HTTP 认证协议停止失败]", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (vertx != null) {
|
||||
try {
|
||||
vertx.close();
|
||||
log.info("[stop][Vertx 已关闭]");
|
||||
} catch (Exception e) {
|
||||
log.warn("[stop][关闭 Vertx 异常]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取认证服务端口
|
||||
* 从配置中获取,默认使用 8090 端口
|
||||
*/
|
||||
private int getAuthPort() {
|
||||
return emqxProperties.getHttpAuthPort() != null ? emqxProperties.getHttpAuthPort() : 8090;
|
||||
}
|
||||
|
||||
public String getServerId() {
|
||||
return "mqtt_auth_gateway_" + getAuthPort();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
### IoT MQTT HTTP 认证服务测试文件
|
||||
### 用于测试独立的 MQTT HTTP 认证服务接口
|
||||
### 服务由 IotMqttHttpAuthProtocol 启动,运行在配置的端口上(默认 8090)
|
||||
### 注意:这些接口通常由 EMQX 调用,也可以用于手动测试
|
||||
|
||||
### 配置说明:
|
||||
### 在 application-local.yaml 中可以配置端口:
|
||||
### yudao:
|
||||
### iot:
|
||||
### gateway:
|
||||
### protocol:
|
||||
### emqx:
|
||||
### http-auth-port: 8090 # 可以修改为其他端口
|
||||
|
||||
### 1. MQTT 设备认证接口 => 成功认证
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"password": "test_device_secret_hmac_password"
|
||||
}
|
||||
|
||||
### 2. MQTT 设备认证接口 => 参数不完整(clientid 为空)
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "",
|
||||
"username": "test_device&test_product",
|
||||
"password": "test_device_secret"
|
||||
}
|
||||
|
||||
### 3. MQTT 设备认证接口 => 参数不完整(username 为空)
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "",
|
||||
"password": "test_device_secret"
|
||||
}
|
||||
|
||||
### 4. MQTT 设备认证接口 => 参数不完整(password 为空)
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"password": ""
|
||||
}
|
||||
|
||||
### 5. MQTT 设备认证接口 => 认证失败(错误的设备密钥)
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"password": "wrong_password"
|
||||
}
|
||||
|
||||
### 6. EMQX 客户端连接事件钩子 => 设备上线
|
||||
POST http://localhost:8090/mqtt/auth/connected
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"timestamp": 1703808000000,
|
||||
"peername": "192.168.1.100:52036",
|
||||
"sockname": "127.0.0.1:1883"
|
||||
}
|
||||
|
||||
### 7. EMQX 客户端连接事件钩子 => 用户名为空
|
||||
POST http://localhost:8090/mqtt/auth/connected
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "",
|
||||
"timestamp": 1703808000000,
|
||||
"peername": "192.168.1.100:52036",
|
||||
"sockname": "127.0.0.1:1883"
|
||||
}
|
||||
|
||||
### 8. EMQX 客户端断开连接事件钩子 => 设备下线(正常断开)
|
||||
POST http://localhost:8090/mqtt/auth/disconnected
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"reason": "normal",
|
||||
"timestamp": 1703808060000
|
||||
}
|
||||
|
||||
### 9. 使用自定义端口测试(如果配置了不同端口)
|
||||
### 假设配置了端口 8091:
|
||||
### POST http://localhost:8091/mqtt/auth/authenticate
|
||||
### Content-Type: application/json
|
||||
###
|
||||
### {
|
||||
### "clientid": "test_product.test_device",
|
||||
### "username": "test_device&test_product",
|
||||
### "password": "test_device_secret_hmac_password"
|
||||
### }
|
||||
|
||||
### EMQX 配置参考:
|
||||
### authentication = [
|
||||
### {
|
||||
### mechanism = http
|
||||
### method = post
|
||||
### url = "http://localhost:8090/mqtt/auth/authenticate" # 使用配置的端口
|
||||
### body = {
|
||||
### clientid = "${clientid}"
|
||||
### username = "${username}"
|
||||
### password = "${password}"
|
||||
### }
|
||||
### headers = {
|
||||
### "content-type" = "application/json"
|
||||
### }
|
||||
### }
|
||||
### ]
|
||||
###
|
||||
### webhooks = [
|
||||
### {
|
||||
### name = "client_connected"
|
||||
### url = "http://localhost:8090/mqtt/auth/connected" # 使用配置的端口
|
||||
### events = ["client.connected"]
|
||||
### },
|
||||
### {
|
||||
### name = "client_disconnected"
|
||||
### url = "http://localhost:8090/mqtt/auth/disconnected" # 使用配置的端口
|
||||
### events = ["client.disconnected"]
|
||||
### }
|
||||
### ]
|
|
@ -1,15 +1,11 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAuthRouter;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.vertx.core.Vertx;
|
||||
|
@ -43,8 +39,6 @@ public class IotMqttUpstreamProtocol {
|
|||
private Vertx vertx;
|
||||
private MqttClient mqttClient;
|
||||
private IotMqttUpstreamRouter messageRouter;
|
||||
private IotMqttAuthRouter authRouter;
|
||||
private IotDeviceMessageProducer deviceMessageProducer;
|
||||
|
||||
/**
|
||||
* 服务运行状态标志
|
||||
|
@ -61,9 +55,7 @@ public class IotMqttUpstreamProtocol {
|
|||
|
||||
// 初始化组件
|
||||
this.vertx = Vertx.vertx();
|
||||
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
|
||||
this.messageRouter = new IotMqttUpstreamRouter(this);
|
||||
this.authRouter = new IotMqttAuthRouter(this);
|
||||
|
||||
// 创建 MQTT 客户端
|
||||
MqttClientOptions options = new MqttClientOptions()
|
||||
|
@ -255,13 +247,4 @@ public class IotMqttUpstreamProtocol {
|
|||
return mqttClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取认证路由器
|
||||
*
|
||||
* @return 认证路由器
|
||||
*/
|
||||
public IotMqttAuthRouter getAuthRouter() {
|
||||
return authRouter;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,149 +0,0 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 认证路由器
|
||||
* <p>
|
||||
* 处理设备的 MQTT 连接认证和连接状态管理
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotMqttAuthRouter {
|
||||
|
||||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceTokenService deviceTokenService;
|
||||
private final IotDeviceCommonApi deviceCommonApi;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotMqttAuthRouter(IotMqttUpstreamProtocol protocol) {
|
||||
this.protocol = protocol;
|
||||
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
this.deviceCommonApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备认证
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param username 用户名
|
||||
* @param password 密码
|
||||
* @return 认证结果
|
||||
*/
|
||||
public boolean authenticate(String clientId, String username, String password) {
|
||||
try {
|
||||
log.info("[authenticate][开始认证设备][clientId: {}][username: {}]", clientId, username);
|
||||
|
||||
// 1. 参数校验
|
||||
if (StrUtil.isEmpty(clientId) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) {
|
||||
log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientId, username);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 2. 执行认证
|
||||
CommonResult<Boolean> result = deviceCommonApi.authDevice(new IotDeviceAuthReqDTO()
|
||||
.setClientId(clientId).setUsername(username).setPassword(password));
|
||||
result.checkError();
|
||||
if (!Boolean.TRUE.equals(result.getData())) {
|
||||
log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientId, username);
|
||||
return false;
|
||||
}
|
||||
|
||||
log.info("[authenticate][设备认证成功][clientId: {}][username: {}]", clientId, username);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("[authenticate][设备认证异常][clientId: {}][username: {}]", clientId, username, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备连接事件
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param username 用户名
|
||||
*/
|
||||
public void handleClientConnected(String clientId, String username) {
|
||||
try {
|
||||
log.info("[handleClientConnected][设备连接][clientId: {}][username: {}]", clientId, username);
|
||||
|
||||
// 解析设备信息并发送上线消息
|
||||
handleDeviceStateChange(username, true);
|
||||
} catch (Exception e) {
|
||||
log.error("[handleClientConnected][处理设备连接事件异常][clientId: {}][username: {}]", clientId, username, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备断开连接事件
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param username 用户名
|
||||
*/
|
||||
public void handleClientDisconnected(String clientId, String username) {
|
||||
try {
|
||||
log.info("[handleClientDisconnected][设备断开连接][clientId: {}][username: {}]", clientId, username);
|
||||
|
||||
// 解析设备信息并发送下线消息
|
||||
handleDeviceStateChange(username, false);
|
||||
} catch (Exception e) {
|
||||
log.error("[handleClientDisconnected][处理设备断开连接事件异常][clientId: {}][username: {}]", clientId, username, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备状态变化
|
||||
*
|
||||
* @param username 用户名
|
||||
* @param online 是否在线
|
||||
*/
|
||||
private void handleDeviceStateChange(String username, boolean online) {
|
||||
// 解析设备信息
|
||||
if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
|
||||
log.warn("[handleDeviceStateChange][用户名为空或未定义][username: {}]", username);
|
||||
return;
|
||||
}
|
||||
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.parseUsername(username);
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleDeviceStateChange][无法解析设备信息][username: {}]", username);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 使用 IotDeviceMessageService 构建设备状态消息
|
||||
IotDeviceMessage message;
|
||||
if (online) {
|
||||
message = deviceMessageService.buildDeviceMessageOfStateOnline(
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
|
||||
log.info("[handleDeviceStateChange][发送设备上线消息成功][username: {}]", username);
|
||||
} else {
|
||||
message = deviceMessageService.buildDeviceMessageOfStateOffline(
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
|
||||
log.info("[handleDeviceStateChange][发送设备下线消息成功][username: {}]", username);
|
||||
}
|
||||
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
} catch (Exception e) {
|
||||
log.error("[handleDeviceStateChange][发送设备状态消息失败][username: {}][online: {}]", username, online, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -6,7 +6,7 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
|||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
@ -49,7 +49,7 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler {
|
|||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
|
|
|
@ -0,0 +1,192 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
|
||||
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT HTTP 认证处理器
|
||||
* <p>
|
||||
* 处理 EMQX 的认证请求和事件钩子
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotMqttHttpAuthHandler {
|
||||
|
||||
/**
|
||||
* EMQX 认证接口
|
||||
*/
|
||||
public void authenticate(RoutingContext context) {
|
||||
try {
|
||||
// 解析请求体
|
||||
JsonObject body = context.body().asJsonObject();
|
||||
if (body == null) {
|
||||
sendErrorResponse(context, 400, "请求体不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
String clientid = body.getString("clientid");
|
||||
String username = body.getString("username");
|
||||
String password = body.getString("password");
|
||||
|
||||
log.info("[authenticate][EMQX 设备认证请求][clientId: {}][username: {}]", clientid, username);
|
||||
|
||||
// 参数校验
|
||||
if (StrUtil.isEmpty(clientid) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) {
|
||||
log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientid, username);
|
||||
sendErrorResponse(context, 400, "认证参数不完整");
|
||||
return;
|
||||
}
|
||||
|
||||
// 执行设备认证
|
||||
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
|
||||
.setClientId(clientid)
|
||||
.setUsername(username)
|
||||
.setPassword(password));
|
||||
|
||||
result.checkError();
|
||||
if (!BooleanUtil.isTrue(result.getData())) {
|
||||
log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientid, username);
|
||||
sendErrorResponse(context, 401, "设备认证失败");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[authenticate][设备认证成功][clientId: {}][username: {}]", clientid, username);
|
||||
sendSuccessResponse(context, "认证成功");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[authenticate][设备认证异常]", e);
|
||||
sendErrorResponse(context, 500, "认证服务异常");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* EMQX 客户端连接事件钩子
|
||||
*/
|
||||
public void connected(RoutingContext context) {
|
||||
try {
|
||||
// 解析请求体
|
||||
JsonObject body = context.body().asJsonObject();
|
||||
if (body == null) {
|
||||
sendErrorResponse(context, 400, "请求体不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
String clientid = body.getString("clientid");
|
||||
String username = body.getString("username");
|
||||
Long timestamp = body.getLong("timestamp");
|
||||
|
||||
log.info("[connected][设备连接事件][clientId: {}][username: {}]", clientid, username);
|
||||
|
||||
handleDeviceStateChange(username, true);
|
||||
sendSuccessResponse(context, "处理成功");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[connected][处理设备连接事件失败]", e);
|
||||
sendErrorResponse(context, 500, "处理失败");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* EMQX 客户端断开连接事件钩子
|
||||
*/
|
||||
public void disconnected(RoutingContext context) {
|
||||
try {
|
||||
// 解析请求体
|
||||
JsonObject body = context.body().asJsonObject();
|
||||
if (body == null) {
|
||||
sendErrorResponse(context, 400, "请求体不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
String clientid = body.getString("clientid");
|
||||
String username = body.getString("username");
|
||||
String reason = body.getString("reason");
|
||||
Long timestamp = body.getLong("timestamp");
|
||||
|
||||
log.info("[disconnected][设备断开连接事件][clientId: {}][username: {}][reason: {}]",
|
||||
clientid, username, reason);
|
||||
|
||||
handleDeviceStateChange(username, false);
|
||||
sendSuccessResponse(context, "处理成功");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[disconnected][处理设备断开连接事件失败]", e);
|
||||
sendErrorResponse(context, 500, "处理失败");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备状态变化
|
||||
*
|
||||
* @param username 用户名
|
||||
* @param online 是否在线
|
||||
*/
|
||||
private void handleDeviceStateChange(String username, boolean online) {
|
||||
if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
|
||||
log.warn("[handleDeviceStateChange][用户名为空或未定义][username: {}]", username);
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析设备信息
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username);
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleDeviceStateChange][无法解析设备信息][username: {}]", username);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 获取服务器 ID
|
||||
String serverId = "mqtt_auth_gateway";
|
||||
|
||||
// 构建设备状态消息
|
||||
IotDeviceMessageService deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
IotDeviceMessage message;
|
||||
if (online) {
|
||||
message = IotDeviceMessage.buildStateOnline();
|
||||
log.info("[handleDeviceStateChange][发送设备上线消息成功][username: {}]", username);
|
||||
} else {
|
||||
message = IotDeviceMessage.buildStateOffline();
|
||||
log.info("[handleDeviceStateChange][发送设备下线消息成功][username: {}]", username);
|
||||
}
|
||||
|
||||
// 发送消息到消息总线
|
||||
deviceMessageService.sendDeviceMessage(message,
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
|
||||
} catch (Exception e) {
|
||||
log.error("[handleDeviceStateChange][发送设备状态消息失败][username: {}][online: {}]",
|
||||
username, online, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送成功响应
|
||||
*/
|
||||
private void sendSuccessResponse(RoutingContext context, String message) {
|
||||
context.response()
|
||||
.setStatusCode(200)
|
||||
.putHeader("Content-Type", "text/plain; charset=utf-8")
|
||||
.end(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送错误响应
|
||||
*/
|
||||
private void sendErrorResponse(RoutingContext context, int statusCode, String message) {
|
||||
context.response()
|
||||
.setStatusCode(statusCode)
|
||||
.putHeader("Content-Type", "text/plain; charset=utf-8")
|
||||
.end(message);
|
||||
}
|
||||
}
|
|
@ -6,7 +6,7 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
|||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
@ -65,7 +65,7 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
|||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
|
@ -100,7 +100,7 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
|||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
|
@ -132,7 +132,7 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
|||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
|
|
|
@ -6,7 +6,7 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
|||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
@ -49,7 +49,7 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler {
|
|||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
|
|
|
@ -5,7 +5,7 @@ import cn.hutool.extra.spring.SpringUtil;
|
|||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.mqtt.messages.MqttPublishMessage;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
server:
|
||||
port: 8090 # IoT 网关服务端口
|
||||
# ==================== IoT 网关本地开发环境配置 ====================
|
||||
|
||||
--- #################### 消息队列相关 ####################
|
||||
|
||||
|
@ -26,16 +25,23 @@ yudao:
|
|||
# 针对引入的 HTTP 组件的配置
|
||||
# ====================================
|
||||
http:
|
||||
enabled: true
|
||||
server-port: 8092
|
||||
# ====================================
|
||||
# 针对引入的 EMQX 组件的配置
|
||||
# ====================================
|
||||
emqx:
|
||||
enabled: true
|
||||
http-auth-port: 8090 # MQTT HTTP 认证服务端口
|
||||
mqtt-host: 127.0.0.1 # MQTT Broker 地址
|
||||
mqtt-port: 1883 # MQTT Broker 端口
|
||||
mqtt-username: admin # MQTT 用户名
|
||||
mqtt-password: public # MQTT 密码
|
||||
auth-port: 8101 # 认证端口
|
||||
mqtt-ssl: false # 是否开启 SSL
|
||||
mqtt-topics:
|
||||
- "/sys/#" # 系统主题(设备上报)
|
||||
- "/ota/#" # OTA 升级主题
|
||||
- "/config/#" # 配置主题
|
||||
|
||||
# 消息总线配置
|
||||
message-bus:
|
||||
|
|
Loading…
Reference in New Issue