reactor:【IoT 物联网】合并 MQTT 上行协议与 HTTP 认证协议,优化消息处理逻辑
This commit is contained in:
parent
af0ff1ce26
commit
c658ac69c0
|
@ -4,7 +4,6 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
|||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttHttpAuthProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
|
@ -45,8 +44,11 @@ public class IotGatewayConfiguration {
|
|||
@Slf4j
|
||||
public static class MqttProtocolConfiguration {
|
||||
|
||||
/**
|
||||
* MQTT 统一协议:集成上行协议和HTTP认证协议
|
||||
*/
|
||||
@Bean
|
||||
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) {
|
||||
public IotMqttUpstreamProtocol iotMqttUnifiedProtocol(IotGatewayProperties gatewayProperties) {
|
||||
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx());
|
||||
}
|
||||
|
||||
|
@ -55,14 +57,6 @@ public class IotGatewayConfiguration {
|
|||
IotMessageBus messageBus) {
|
||||
return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus);
|
||||
}
|
||||
|
||||
/**
|
||||
* MQTT HTTP 认证协议:提供 HTTP 认证接口供 EMQX 调用
|
||||
*/
|
||||
@Bean
|
||||
public IotMqttHttpAuthProtocol mqttHttpAuthProtocol(IotGatewayProperties gatewayProperties) {
|
||||
return new IotMqttHttpAuthProtocol(gatewayProperties.getProtocol().getEmqx());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
@ -27,7 +27,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
private final IotMessageBus messageBus;
|
||||
|
||||
@Resource
|
||||
private IotDeviceCommonApi deviceApi;
|
||||
private IotDeviceService deviceService;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
|
@ -56,137 +56,87 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
return;
|
||||
}
|
||||
|
||||
// TODO @haohao:看看怎么融合下
|
||||
if (method.startsWith("thing.service.property.")) {
|
||||
handlePropertyMessage(message);
|
||||
} else if (method.startsWith("thing.service.") && !method.contains("property") && !method.contains("config")
|
||||
&& !method.contains("ota")) {
|
||||
handleServiceMessage(message);
|
||||
} else if (method.startsWith("thing.service.config.")) {
|
||||
handleConfigMessage(message);
|
||||
} else if (method.startsWith("thing.service.ota.")) {
|
||||
handleOtaMessage(message);
|
||||
} else {
|
||||
log.warn("[onMessage][未知的消息方法:{}]", method);
|
||||
}
|
||||
// 处理下行消息
|
||||
handleDownstreamMessage(message);
|
||||
} catch (Exception e) {
|
||||
log.error("[onMessage][处理下行消息失败:{}]", message, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理属性相关消息
|
||||
* 处理下行消息
|
||||
*
|
||||
* @param message 设备消息
|
||||
*/
|
||||
private void handlePropertyMessage(IotDeviceMessage message) {
|
||||
String method = message.getMethod();
|
||||
|
||||
// 通过 deviceId 获取设备信息
|
||||
// TODO @haohao:通过缓存拿;
|
||||
IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO();
|
||||
reqDTO.setId(message.getDeviceId());
|
||||
IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData();
|
||||
private void handleDownstreamMessage(IotDeviceMessage message) {
|
||||
// 1. 获取设备信息(使用缓存)
|
||||
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handlePropertyMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
log.warn("[handleDownstreamMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = deviceInfo.getProductKey();
|
||||
String deviceName = deviceInfo.getDeviceName();
|
||||
|
||||
if ("thing.service.property.set".equals(method)) {
|
||||
// 属性设置
|
||||
String topic = IotDeviceTopicEnum.buildPropertySetTopic(productKey, deviceName);
|
||||
JSONObject payload = buildDownstreamPayload(message, method);
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handlePropertyMessage][发送属性设置消息][topic: {}]", topic);
|
||||
} else if ("thing.service.property.get".equals(method)) {
|
||||
// 属性获取
|
||||
String topic = IotDeviceTopicEnum.buildPropertyGetTopic(productKey, deviceName);
|
||||
JSONObject payload = buildDownstreamPayload(message, method);
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handlePropertyMessage][发送属性获取消息][topic: {}]", topic);
|
||||
} else {
|
||||
log.warn("[handlePropertyMessage][未知的属性操作:{}]", method);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理服务调用消息
|
||||
*
|
||||
* @param message 设备消息
|
||||
*/
|
||||
private void handleServiceMessage(IotDeviceMessage message) {
|
||||
String method = message.getMethod();
|
||||
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO();
|
||||
reqDTO.setId(message.getDeviceId());
|
||||
IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData();
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleServiceMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
// 2. 根据方法构建主题
|
||||
String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName());
|
||||
if (StrUtil.isBlank(topic)) {
|
||||
log.warn("[handleDownstreamMessage][未知的消息方法:{}]", message.getMethod());
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = deviceInfo.getProductKey();
|
||||
String deviceName = deviceInfo.getDeviceName();
|
||||
|
||||
// 从方法中提取服务标识符
|
||||
String serviceIdentifier = method.substring("thing.service.".length());
|
||||
|
||||
String topic = IotDeviceTopicEnum.buildServiceTopic(productKey, deviceName, serviceIdentifier);
|
||||
JSONObject payload = buildDownstreamPayload(message, method);
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handleServiceMessage][发送服务调用消息][topic: {}]", topic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理配置设置消息
|
||||
*
|
||||
* @param message 设备消息
|
||||
*/
|
||||
private void handleConfigMessage(IotDeviceMessage message) {
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO();
|
||||
reqDTO.setId(message.getDeviceId());
|
||||
IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData();
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleConfigMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = deviceInfo.getProductKey();
|
||||
String deviceName = deviceInfo.getDeviceName();
|
||||
|
||||
String topic = IotDeviceTopicEnum.buildConfigSetTopic(productKey, deviceName);
|
||||
// 3. 构建载荷
|
||||
JSONObject payload = buildDownstreamPayload(message, message.getMethod());
|
||||
|
||||
// 4. 发送消息
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handleConfigMessage][发送配置设置消息][topic: {}]", topic);
|
||||
log.info("[handleDownstreamMessage][发送下行消息成功][method: {}][topic: {}]", message.getMethod(), topic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 OTA 升级消息
|
||||
* 根据方法构建主题
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param method 消息方法
|
||||
* @param productKey 产品标识
|
||||
* @param deviceName 设备名称
|
||||
* @return 构建的主题,如果方法不支持返回 null
|
||||
*/
|
||||
private void handleOtaMessage(IotDeviceMessage message) {
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO();
|
||||
reqDTO.setId(message.getDeviceId());
|
||||
IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData();
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleOtaMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
private String buildTopicByMethod(String method, String productKey, String deviceName) {
|
||||
if (StrUtil.isBlank(method)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String productKey = deviceInfo.getProductKey();
|
||||
String deviceName = deviceInfo.getDeviceName();
|
||||
// 属性相关操作
|
||||
if (method.startsWith("thing.service.property.")) {
|
||||
if ("thing.service.property.set".equals(method)) {
|
||||
return IotDeviceTopicEnum.buildPropertySetTopic(productKey, deviceName);
|
||||
} else if ("thing.service.property.get".equals(method)) {
|
||||
return IotDeviceTopicEnum.buildPropertyGetTopic(productKey, deviceName);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
String topic = IotDeviceTopicEnum.buildOtaUpgradeTopic(productKey, deviceName);
|
||||
JSONObject payload = buildDownstreamPayload(message, message.getMethod());
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handleOtaMessage][发送 OTA 升级消息][topic: {}]", topic);
|
||||
// 配置设置操作
|
||||
if (method.startsWith("thing.service.config.")) {
|
||||
return IotDeviceTopicEnum.buildConfigSetTopic(productKey, deviceName);
|
||||
}
|
||||
|
||||
// OTA 升级操作
|
||||
if (method.startsWith("thing.service.ota.")) {
|
||||
return IotDeviceTopicEnum.buildOtaUpgradeTopic(productKey, deviceName);
|
||||
}
|
||||
|
||||
// 一般服务调用操作
|
||||
if (method.startsWith("thing.service.")) {
|
||||
// 排除属性、配置、OTA相关的服务调用
|
||||
if (method.contains("property") || method.contains("config") || method.contains("ota")) {
|
||||
return null; // 已在上面处理
|
||||
}
|
||||
// 从方法中提取服务标识符
|
||||
String serviceIdentifier = method.substring("thing.service.".length());
|
||||
return IotDeviceTopicEnum.buildServiceTopic(productKey, deviceName, serviceIdentifier);
|
||||
}
|
||||
|
||||
// 不支持的方法
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,98 +0,0 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.handler.BodyHandler;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
// TODO @haohao:可以融合到 IotMqttUpstreamProtocol 么?主要考虑,一个协议,一个 protocol;
|
||||
/**
|
||||
* IoT 网关 MQTT HTTP 认证协议:提供 HTTP 认证接口供 EMQX 调用
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotMqttHttpAuthProtocol extends AbstractVerticle {
|
||||
|
||||
private final IotGatewayProperties.EmqxProperties emqxProperties;
|
||||
|
||||
private HttpServer httpServer;
|
||||
private Vertx vertx;
|
||||
|
||||
@Override
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
log.info("[start][开始启动 MQTT HTTP 认证协议服务]");
|
||||
|
||||
// 创建 Vertx 实例
|
||||
this.vertx = Vertx.vertx();
|
||||
|
||||
// 创建路由
|
||||
Router router = Router.router(vertx);
|
||||
router.route().handler(BodyHandler.create());
|
||||
|
||||
// 创建 MQTT 认证处理器
|
||||
IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler();
|
||||
|
||||
// 添加 MQTT 认证路由
|
||||
router.post("/mqtt/auth/authenticate").handler(authHandler::authenticate);
|
||||
router.post("/mqtt/auth/connected").handler(authHandler::connected);
|
||||
router.post("/mqtt/auth/disconnected").handler(authHandler::disconnected);
|
||||
|
||||
// 启动 HTTP 服务器,使用独立端口
|
||||
int authPort = getAuthPort();
|
||||
try {
|
||||
httpServer = vertx.createHttpServer()
|
||||
.requestHandler(router)
|
||||
.listen(authPort)
|
||||
.result();
|
||||
log.info("[start][MQTT HTTP 认证协议启动成功,端口:{}]", authPort);
|
||||
} catch (Exception e) {
|
||||
log.error("[start][MQTT HTTP 认证协议启动失败]", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (httpServer != null) {
|
||||
try {
|
||||
httpServer.close().result();
|
||||
log.info("[stop][MQTT HTTP 认证协议已停止]");
|
||||
} catch (Exception e) {
|
||||
log.error("[stop][MQTT HTTP 认证协议停止失败]", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (vertx != null) {
|
||||
try {
|
||||
vertx.close();
|
||||
log.info("[stop][Vertx 已关闭]");
|
||||
} catch (Exception e) {
|
||||
log.warn("[stop][关闭 Vertx 异常]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO @haohao:这里不搞默认,必须填写哈;
|
||||
/**
|
||||
* 获取认证服务端口
|
||||
* 从配置中获取,默认使用 8090 端口
|
||||
*/
|
||||
private int getAuthPort() {
|
||||
return emqxProperties.getHttpAuthPort() != null ? emqxProperties.getHttpAuthPort() : 8090;
|
||||
}
|
||||
|
||||
public String getServerId() {
|
||||
return "mqtt_auth_gateway_" + getAuthPort();
|
||||
}
|
||||
}
|
|
@ -1,140 +0,0 @@
|
|||
### TODO 芋艿:这个文件是不是要留着;
|
||||
### IoT MQTT HTTP 认证服务测试文件
|
||||
### 用于测试独立的 MQTT HTTP 认证服务接口
|
||||
### 服务由 IotMqttHttpAuthProtocol 启动,运行在配置的端口上(默认 8090)
|
||||
### 注意:这些接口通常由 EMQX 调用,也可以用于手动测试
|
||||
|
||||
### 配置说明:
|
||||
### 在 application-local.yaml 中可以配置端口:
|
||||
### yudao:
|
||||
### iot:
|
||||
### gateway:
|
||||
### protocol:
|
||||
### emqx:
|
||||
### http-auth-port: 8090 # 可以修改为其他端口
|
||||
|
||||
### 1. MQTT 设备认证接口 => 成功认证
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"password": "test_device_secret_hmac_password"
|
||||
}
|
||||
|
||||
### 2. MQTT 设备认证接口 => 参数不完整(clientid 为空)
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "",
|
||||
"username": "test_device&test_product",
|
||||
"password": "test_device_secret"
|
||||
}
|
||||
|
||||
### 3. MQTT 设备认证接口 => 参数不完整(username 为空)
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "",
|
||||
"password": "test_device_secret"
|
||||
}
|
||||
|
||||
### 4. MQTT 设备认证接口 => 参数不完整(password 为空)
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"password": ""
|
||||
}
|
||||
|
||||
### 5. MQTT 设备认证接口 => 认证失败(错误的设备密钥)
|
||||
POST http://localhost:8090/mqtt/auth/authenticate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"password": "wrong_password"
|
||||
}
|
||||
|
||||
### 6. EMQX 客户端连接事件钩子 => 设备上线
|
||||
POST http://localhost:8090/mqtt/auth/connected
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"timestamp": 1703808000000,
|
||||
"peername": "192.168.1.100:52036",
|
||||
"sockname": "127.0.0.1:1883"
|
||||
}
|
||||
|
||||
### 7. EMQX 客户端连接事件钩子 => 用户名为空
|
||||
POST http://localhost:8090/mqtt/auth/connected
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "",
|
||||
"timestamp": 1703808000000,
|
||||
"peername": "192.168.1.100:52036",
|
||||
"sockname": "127.0.0.1:1883"
|
||||
}
|
||||
|
||||
### 8. EMQX 客户端断开连接事件钩子 => 设备下线(正常断开)
|
||||
POST http://localhost:8090/mqtt/auth/disconnected
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"clientid": "test_product.test_device",
|
||||
"username": "test_device&test_product",
|
||||
"reason": "normal",
|
||||
"timestamp": 1703808060000
|
||||
}
|
||||
|
||||
### 9. 使用自定义端口测试(如果配置了不同端口)
|
||||
### 假设配置了端口 8091:
|
||||
### POST http://localhost:8091/mqtt/auth/authenticate
|
||||
### Content-Type: application/json
|
||||
###
|
||||
### {
|
||||
### "clientid": "test_product.test_device",
|
||||
### "username": "test_device&test_product",
|
||||
### "password": "test_device_secret_hmac_password"
|
||||
### }
|
||||
|
||||
### EMQX 配置参考:
|
||||
### authentication = [
|
||||
### {
|
||||
### mechanism = http
|
||||
### method = post
|
||||
### url = "http://localhost:8090/mqtt/auth/authenticate" # 使用配置的端口
|
||||
### body = {
|
||||
### clientid = "${clientid}"
|
||||
### username = "${username}"
|
||||
### password = "${password}"
|
||||
### }
|
||||
### headers = {
|
||||
### "content-type" = "application/json"
|
||||
### }
|
||||
### }
|
||||
### ]
|
||||
###
|
||||
### webhooks = [
|
||||
### {
|
||||
### name = "client_connected"
|
||||
### url = "http://localhost:8090/mqtt/auth/connected" # 使用配置的端口
|
||||
### events = ["client.connected"]
|
||||
### },
|
||||
### {
|
||||
### name = "client_disconnected"
|
||||
### url = "http://localhost:8090/mqtt/auth/disconnected" # 使用配置的端口
|
||||
### events = ["client.disconnected"]
|
||||
### }
|
||||
### ]
|
|
@ -6,15 +6,18 @@ import cn.hutool.core.util.ObjUtil;
|
|||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.handler.BodyHandler;
|
||||
import io.vertx.mqtt.MqttClient;
|
||||
import io.vertx.mqtt.MqttClientOptions;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -22,11 +25,14 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 协议:接收设备上行消息
|
||||
* IoT 网关 MQTT 统一协议
|
||||
* <p>
|
||||
* 集成了 MQTT 上行协议和 HTTP 认证协议的功能:
|
||||
* 1. MQTT 客户端:连接 EMQX,处理设备上行和下行消息
|
||||
* 2. HTTP 认证服务:为 EMQX 提供设备认证接口
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotMqttUpstreamProtocol {
|
||||
|
||||
|
@ -37,30 +43,143 @@ public class IotMqttUpstreamProtocol {
|
|||
|
||||
private final IotGatewayProperties.EmqxProperties emqxProperties;
|
||||
|
||||
// 共享资源
|
||||
private Vertx vertx;
|
||||
|
||||
// MQTT 客户端相关
|
||||
private MqttClient mqttClient;
|
||||
private IotMqttUpstreamRouter messageRouter;
|
||||
private IotMqttUpstreamHandler upstreamHandler;
|
||||
|
||||
// HTTP 认证服务相关
|
||||
private HttpServer httpAuthServer;
|
||||
private IotMqttHttpAuthHandler authHandler;
|
||||
|
||||
/**
|
||||
* 服务运行状态标志
|
||||
*/
|
||||
private volatile boolean isRunning = false;
|
||||
|
||||
/**
|
||||
* 构造函数
|
||||
*/
|
||||
public IotMqttUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) {
|
||||
this.emqxProperties = emqxProperties;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
if (isRunning) {
|
||||
log.warn("[start][MQTT 协议服务已经在运行中,请勿重复启动]");
|
||||
log.warn("[start][MQTT 统一协议服务已经在运行中,请勿重复启动]");
|
||||
return;
|
||||
}
|
||||
log.info("[start][开始启动 MQTT 协议服务]");
|
||||
log.info("[start][开始启动 MQTT 统一协议服务]");
|
||||
|
||||
// 初始化组件
|
||||
this.vertx = Vertx.vertx();
|
||||
this.messageRouter = new IotMqttUpstreamRouter(this);
|
||||
try {
|
||||
// 1. 创建共享的 Vertx 实例
|
||||
this.vertx = Vertx.vertx();
|
||||
log.info("[start][共享 Vertx 实例创建成功]");
|
||||
|
||||
// 2. 启动 HTTP 认证服务
|
||||
startHttpAuthServer();
|
||||
|
||||
// 3. 启动 MQTT 客户端
|
||||
startMqttClient();
|
||||
|
||||
isRunning = true;
|
||||
log.info("[start][MQTT 统一协议服务启动完成]");
|
||||
} catch (Exception e) {
|
||||
log.error("[start][MQTT 统一协议服务启动失败]", e);
|
||||
// 启动失败时清理资源
|
||||
stop();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (!isRunning) {
|
||||
log.warn("[stop][MQTT 统一协议服务已经停止,无需再次停止]");
|
||||
return;
|
||||
}
|
||||
log.info("[stop][开始停止 MQTT 统一协议服务]");
|
||||
|
||||
// 1. 停止 MQTT 客户端
|
||||
stopMqttClient();
|
||||
|
||||
// 2. 停止 HTTP 认证服务
|
||||
stopHttpAuthServer();
|
||||
|
||||
// 3. 关闭 Vertx 实例
|
||||
if (vertx != null) {
|
||||
try {
|
||||
vertx.close();
|
||||
log.info("[stop][Vertx 实例已关闭]");
|
||||
} catch (Exception e) {
|
||||
log.warn("[stop][关闭 Vertx 实例失败]", e);
|
||||
}
|
||||
}
|
||||
|
||||
isRunning = false;
|
||||
log.info("[stop][MQTT 统一协议服务已停止]");
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动 HTTP 认证服务
|
||||
*/
|
||||
private void startHttpAuthServer() {
|
||||
log.info("[startHttpAuthServer][开始启动 HTTP 认证服务]");
|
||||
|
||||
// 创建路由
|
||||
Router router = Router.router(vertx);
|
||||
router.route().handler(BodyHandler.create());
|
||||
|
||||
// 创建认证处理器
|
||||
this.authHandler = new IotMqttHttpAuthHandler();
|
||||
|
||||
// 添加认证路由
|
||||
router.post("/mqtt/auth/authenticate").handler(authHandler::authenticate);
|
||||
router.post("/mqtt/auth/connected").handler(authHandler::connected);
|
||||
router.post("/mqtt/auth/disconnected").handler(authHandler::disconnected);
|
||||
|
||||
// 启动 HTTP 服务器
|
||||
int authPort = emqxProperties.getHttpAuthPort();
|
||||
try {
|
||||
httpAuthServer = vertx.createHttpServer()
|
||||
.requestHandler(router)
|
||||
.listen(authPort)
|
||||
.result();
|
||||
log.info("[startHttpAuthServer][HTTP 认证服务启动成功,端口:{}]", authPort);
|
||||
} catch (Exception e) {
|
||||
log.error("[startHttpAuthServer][HTTP 认证服务启动失败]", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 停止 HTTP 认证服务
|
||||
*/
|
||||
private void stopHttpAuthServer() {
|
||||
if (httpAuthServer != null) {
|
||||
try {
|
||||
httpAuthServer.close().result();
|
||||
log.info("[stopHttpAuthServer][HTTP 认证服务已停止]");
|
||||
} catch (Exception e) {
|
||||
log.error("[stopHttpAuthServer][HTTP 认证服务停止失败]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动 MQTT 客户端
|
||||
*/
|
||||
private void startMqttClient() {
|
||||
log.info("[startMqttClient][开始启动 MQTT 客户端]");
|
||||
|
||||
// 初始化消息处理器
|
||||
this.upstreamHandler = new IotMqttUpstreamHandler();
|
||||
|
||||
// 创建 MQTT 客户端
|
||||
MqttClientOptions options = new MqttClientOptions()
|
||||
// TODO @haohao:clientid 建议也配置文件;想通名字,会冲突哇?
|
||||
.setClientId("yudao-iot-gateway-" + IdUtil.fastSimpleUUID())
|
||||
.setUsername(emqxProperties.getMqttUsername())
|
||||
.setPassword(emqxProperties.getMqttPassword())
|
||||
|
@ -72,14 +191,10 @@ public class IotMqttUpstreamProtocol {
|
|||
connectMqtt();
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (!isRunning) {
|
||||
log.warn("[stop][MQTT 协议服务已经停止,无需再次停止]");
|
||||
return;
|
||||
}
|
||||
log.info("[stop][开始停止 MQTT 协议服务]");
|
||||
|
||||
/**
|
||||
* 停止 MQTT 客户端
|
||||
*/
|
||||
private void stopMqttClient() {
|
||||
// 1. 取消 MQTT 主题订阅
|
||||
if (mqttClient != null && mqttClient.isConnected()) {
|
||||
List<String> topicList = emqxProperties.getMqttTopics();
|
||||
|
@ -87,9 +202,9 @@ public class IotMqttUpstreamProtocol {
|
|||
for (String topic : topicList) {
|
||||
try {
|
||||
mqttClient.unsubscribe(topic);
|
||||
log.debug("[stop][取消订阅主题: {}]", topic);
|
||||
log.debug("[stopMqttClient][取消订阅主题: {}]", topic);
|
||||
} catch (Exception e) {
|
||||
log.warn("[stop][取消订阅主题异常: {}]", topic, e);
|
||||
log.warn("[stopMqttClient][取消订阅主题异常: {}]", topic, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -99,33 +214,20 @@ public class IotMqttUpstreamProtocol {
|
|||
try {
|
||||
if (mqttClient != null && mqttClient.isConnected()) {
|
||||
mqttClient.disconnect();
|
||||
log.info("[stopMqttClient][MQTT 客户端已断开]");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[stop][关闭 MQTT 客户端异常]", e);
|
||||
log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e);
|
||||
}
|
||||
|
||||
// 3. 关闭 Vertx
|
||||
try {
|
||||
if (vertx != null) {
|
||||
vertx.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[stop][关闭 Vertx 异常]", e);
|
||||
}
|
||||
|
||||
// 4. 更新状态
|
||||
isRunning = false;
|
||||
log.info("[stop][MQTT 协议服务已停止]");
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接 MQTT Broker 并订阅主题
|
||||
*/
|
||||
private void connectMqtt() {
|
||||
// 检查必要的 MQTT 配置
|
||||
// TODO @haohao:这些通过配置文件的 validate 去做哈,简化下;
|
||||
String host = emqxProperties.getMqttHost();
|
||||
Integer port = emqxProperties.getMqttPort();
|
||||
|
||||
if (StrUtil.isBlank(host)) {
|
||||
String msg = "[connectMqtt][MQTT Host 为空,无法连接]";
|
||||
log.error(msg);
|
||||
|
@ -133,7 +235,7 @@ public class IotMqttUpstreamProtocol {
|
|||
}
|
||||
if (port == null) {
|
||||
log.warn("[connectMqtt][MQTT Port 为 null,使用默认端口 1883]");
|
||||
port = 1883; // 默认 MQTT 端口
|
||||
port = 1883;
|
||||
}
|
||||
|
||||
final Integer finalPort = port;
|
||||
|
@ -152,7 +254,6 @@ public class IotMqttUpstreamProtocol {
|
|||
// 订阅主题
|
||||
subscribeToTopics();
|
||||
})
|
||||
// TODO @haohao:这个要不要改成,必须连接成功?不做重试;不然启动也蛮危险的?
|
||||
.exceptionally(error -> {
|
||||
log.error("[connectMqtt][连接 MQTT Broker 失败]", error);
|
||||
reconnectWithDelay();
|
||||
|
@ -162,10 +263,9 @@ public class IotMqttUpstreamProtocol {
|
|||
// 等待连接完成
|
||||
try {
|
||||
connectFuture.get(10, TimeUnit.SECONDS);
|
||||
isRunning = true;
|
||||
log.info("[connectMqtt][MQTT 协议服务启动完成]");
|
||||
log.info("[connectMqtt][MQTT 客户端启动完成]");
|
||||
} catch (Exception e) {
|
||||
log.error("[connectMqtt][MQTT 协议服务启动失败]", e);
|
||||
log.error("[connectMqtt][MQTT 客户端启动失败]", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -173,7 +273,7 @@ public class IotMqttUpstreamProtocol {
|
|||
* 设置 MQTT 消息处理器
|
||||
*/
|
||||
private void setupMessageHandler() {
|
||||
mqttClient.publishHandler(messageRouter::route);
|
||||
mqttClient.publishHandler(upstreamHandler::handle);
|
||||
log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]");
|
||||
}
|
||||
|
||||
|
@ -182,76 +282,54 @@ public class IotMqttUpstreamProtocol {
|
|||
*/
|
||||
private void subscribeToTopics() {
|
||||
List<String> topicList = emqxProperties.getMqttTopics();
|
||||
// TODO @haohao:建议 topicList 直接 validate 校验
|
||||
if (CollUtil.isEmpty(topicList)) {
|
||||
log.warn("[subscribeToTopics][未配置 MQTT 主题,使用默认主题]");
|
||||
topicList = List.of("/sys/#"); // 默认订阅所有系统主题
|
||||
log.warn("[subscribeToTopics][没有配置要订阅的主题]");
|
||||
return;
|
||||
}
|
||||
|
||||
for (String topic : topicList) {
|
||||
// TODO @haohao:直接 validate 校验;嘿嘿,主要保证核心逻辑,简单点
|
||||
if (StrUtil.isBlank(topic)) {
|
||||
log.warn("[subscribeToTopics][跳过空主题]");
|
||||
continue;
|
||||
}
|
||||
|
||||
mqttClient.subscribe(topic, DEFAULT_QOS.value())
|
||||
.onSuccess(ack -> log.info("[subscribeToTopics][订阅主题成功: {}]", topic))
|
||||
.onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err));
|
||||
mqttClient.subscribe(topic, DEFAULT_QOS.value(), subscribeResult -> {
|
||||
if (subscribeResult.succeeded()) {
|
||||
log.info("[subscribeToTopics][订阅主题成功: {}]", topic);
|
||||
} else {
|
||||
log.error("[subscribeToTopics][订阅主题失败: {}]", topic, subscribeResult.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 重连 MQTT 客户端
|
||||
* 延迟重连
|
||||
*/
|
||||
private void reconnectWithDelay() {
|
||||
if (!isRunning) {
|
||||
log.info("[reconnectWithDelay][服务已停止,不再尝试重连]");
|
||||
return;
|
||||
}
|
||||
|
||||
// 默认重连延迟 5 秒
|
||||
int reconnectDelayMs = 5000;
|
||||
vertx.setTimer(reconnectDelayMs, id -> {
|
||||
log.info("[reconnectWithDelay][开始重新连接 MQTT]");
|
||||
connectMqtt();
|
||||
vertx.setTimer(5000, timerId -> {
|
||||
if (isRunning && (mqttClient == null || !mqttClient.isConnected())) {
|
||||
log.info("[reconnectWithDelay][开始重连 MQTT Broker]");
|
||||
connectMqtt();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布消息到 MQTT
|
||||
* 发布消息到 MQTT Broker
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param payload 消息内容
|
||||
*/
|
||||
public void publishMessage(String topic, String payload) {
|
||||
if (mqttClient == null || !mqttClient.isConnected()) {
|
||||
log.warn("[publishMessage][MQTT 客户端未连接,无法发送消息][topic: {}]", topic);
|
||||
return;
|
||||
if (mqttClient != null && mqttClient.isConnected()) {
|
||||
mqttClient.publish(topic, Buffer.buffer(payload), DEFAULT_QOS, false, false);
|
||||
log.debug("[publishMessage][发布消息成功][topic: {}]", topic);
|
||||
} else {
|
||||
log.warn("[publishMessage][MQTT 客户端未连接,无法发布消息][topic: {}]", topic);
|
||||
}
|
||||
|
||||
mqttClient.publish(topic, Buffer.buffer(payload), DEFAULT_QOS, false, false)
|
||||
.onSuccess(v -> log.debug("[publishMessage][发送消息成功][topic: {}]", topic))
|
||||
.onFailure(err -> log.error("[publishMessage][发送消息失败][topic: {}]", topic, err));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取服务器 ID
|
||||
*
|
||||
* @return 服务器 ID
|
||||
*/
|
||||
public String getServerId() {
|
||||
return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
|
||||
}
|
||||
|
||||
// TODO @haohao:这个要删除哇?
|
||||
/**
|
||||
* 获取 MQTT 客户端
|
||||
*
|
||||
* @return MQTT 客户端
|
||||
*/
|
||||
public MqttClient getMqttClient() {
|
||||
return mqttClient;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 协议的路由处理器抽象基类
|
||||
* <p>
|
||||
* 提供通用的处理方法,所有 MQTT 消息处理器都应继承此类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class IotMqttAbstractHandler {
|
||||
|
||||
/**
|
||||
* 处理 MQTT 消息
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param payload 消息内容
|
||||
*/
|
||||
public abstract void handle(String topic, String payload);
|
||||
|
||||
/**
|
||||
* 解析主题,获取主题各部分
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 主题各部分数组,如果解析失败返回 null
|
||||
*/
|
||||
protected String[] parseTopic(String topic) {
|
||||
String[] topicParts = topic.split("/");
|
||||
if (topicParts.length < 7) {
|
||||
log.warn("[parseTopic][主题格式不正确][topic: {}]", topic);
|
||||
return null;
|
||||
}
|
||||
return topicParts;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,109 +0,0 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 事件处理器
|
||||
* <p>
|
||||
* 处理设备事件相关的 MQTT 消息
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotMqttEventHandler extends IotMqttAbstractHandler {
|
||||
|
||||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
@Override
|
||||
public void handle(String topic, String payload) {
|
||||
try {
|
||||
log.info("[handle][接收到设备事件上报][topic: {}]", topic);
|
||||
|
||||
// 解析消息内容
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 构建设备消息
|
||||
String productKey = topicParts[2];
|
||||
String deviceName = topicParts[3];
|
||||
String eventIdentifier = getEventIdentifier(topicParts, topic);
|
||||
if (eventIdentifier == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
log.info("[handle][处理设备事件上报成功][topic: {}]", topic);
|
||||
|
||||
// 发送响应消息
|
||||
// TODO @haohao:这里应该只 ack 哈;reply 在 biz 业务处理了。handleUpstreamDeviceMessage
|
||||
String method = "thing.event." + eventIdentifier + ".post";
|
||||
sendResponse(topic, JSONUtil.parseObj(payload), method);
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][处理设备事件上报失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从主题部分中获取事件标识符
|
||||
*
|
||||
* @param topicParts 主题各部分
|
||||
* @param topic 原始主题,用于日志
|
||||
* @return 事件标识符,如果获取失败返回 null
|
||||
*/
|
||||
private String getEventIdentifier(String[] topicParts, String topic) {
|
||||
try {
|
||||
return topicParts[6];
|
||||
} catch (ArrayIndexOutOfBoundsException e) {
|
||||
log.warn("[getEventIdentifier][无法从主题中获取事件标识符][topic: {}]", topic);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送响应消息
|
||||
*
|
||||
* @param topic 原始主题
|
||||
* @param jsonObject 原始消息 JSON 对象
|
||||
* @param method 响应方法
|
||||
*/
|
||||
private void sendResponse(String topic, JSONObject jsonObject, String method) {
|
||||
try {
|
||||
String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
|
||||
|
||||
// 构建响应消息
|
||||
JSONObject response = new JSONObject();
|
||||
response.set("id", jsonObject.getStr("id"));
|
||||
response.set("code", 200);
|
||||
response.set("method", method);
|
||||
response.set("data", new JSONObject());
|
||||
|
||||
// 发送响应
|
||||
protocol.publishMessage(replyTopic, response.toString());
|
||||
log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,171 +0,0 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 属性处理器
|
||||
* <p>
|
||||
* 处理设备属性相关的 MQTT 消息
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
||||
|
||||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
@Override
|
||||
public void handle(String topic, String payload) {
|
||||
if (topic.endsWith(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic())) {
|
||||
// 属性上报
|
||||
handlePropertyPost(topic, payload);
|
||||
} else if (topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic())) {
|
||||
// 属性设置响应
|
||||
handlePropertySetReply(topic, payload);
|
||||
} else if (topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic())) {
|
||||
// 属性获取响应
|
||||
handlePropertyGetReply(topic, payload);
|
||||
} else {
|
||||
log.warn("[handle][未知的属性主题][topic: {}]", topic);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备属性上报消息
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param payload 消息内容
|
||||
*/
|
||||
private void handlePropertyPost(String topic, String payload) {
|
||||
try {
|
||||
log.info("[handlePropertyPost][接收到设备属性上报][topic: {}]", topic);
|
||||
|
||||
// 解析主题获取设备信息
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = topicParts[2];
|
||||
String deviceName = topicParts[3];
|
||||
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
log.info("[handlePropertyPost][处理设备属性上报成功][topic: {}]", topic);
|
||||
|
||||
// 发送响应消息
|
||||
sendResponse(topic, JSONUtil.parseObj(payload), "thing.event.property.post");
|
||||
} catch (Exception e) {
|
||||
log.error("[handlePropertyPost][处理设备属性上报失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理属性设置响应消息
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param payload 消息内容
|
||||
*/
|
||||
private void handlePropertySetReply(String topic, String payload) {
|
||||
try {
|
||||
log.info("[handlePropertySetReply][接收到属性设置响应][topic: {}]", topic);
|
||||
|
||||
// 解析主题获取设备信息
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = topicParts[2];
|
||||
String deviceName = topicParts[3];
|
||||
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
log.info("[handlePropertySetReply][处理属性设置响应成功][topic: {}]", topic);
|
||||
} catch (Exception e) {
|
||||
log.error("[handlePropertySetReply][处理属性设置响应失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理属性获取响应消息
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param payload 消息内容
|
||||
*/
|
||||
private void handlePropertyGetReply(String topic, String payload) {
|
||||
try {
|
||||
log.info("[handlePropertyGetReply][接收到属性获取响应][topic: {}]", topic);
|
||||
|
||||
// 解析主题获取设备信息
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = topicParts[2];
|
||||
String deviceName = topicParts[3];
|
||||
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
log.info("[handlePropertyGetReply][处理属性获取响应成功][topic: {}]", topic);
|
||||
} catch (Exception e) {
|
||||
log.error("[handlePropertyGetReply][处理属性获取响应失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送响应消息
|
||||
*
|
||||
* @param topic 原始主题
|
||||
* @param jsonObject 原始消息 JSON 对象
|
||||
* @param method 响应方法
|
||||
*/
|
||||
private void sendResponse(String topic, JSONObject jsonObject, String method) {
|
||||
try {
|
||||
String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
|
||||
|
||||
// 构建响应消息
|
||||
JSONObject response = new JSONObject();
|
||||
response.set("id", jsonObject.getStr("id"));
|
||||
response.set("code", 200);
|
||||
response.set("method", method);
|
||||
response.set("data", new JSONObject());
|
||||
|
||||
// 发送响应
|
||||
protocol.publishMessage(replyTopic, response.toString());
|
||||
log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,109 +0,0 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 服务处理器
|
||||
* <p>
|
||||
* 处理设备服务调用相关的 MQTT 消息
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotMqttServiceHandler extends IotMqttAbstractHandler {
|
||||
|
||||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
@Override
|
||||
public void handle(String topic, String payload) {
|
||||
try {
|
||||
log.info("[handle][接收到设备服务调用响应][topic: {}]", topic);
|
||||
|
||||
// 解析消息内容
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 构建设备消息
|
||||
String productKey = topicParts[2];
|
||||
String deviceName = topicParts[3];
|
||||
String serviceIdentifier = getServiceIdentifier(topicParts, topic);
|
||||
if (serviceIdentifier == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
log.info("[handle][处理设备服务调用响应成功][topic: {}]", topic);
|
||||
|
||||
// 发送响应消息
|
||||
String method = "thing.service." + serviceIdentifier;
|
||||
sendResponse(topic, JSONUtil.parseObj(payload), method);
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][处理设备服务调用响应失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从主题部分中获取服务标识符
|
||||
*
|
||||
* @param topicParts 主题各部分
|
||||
* @param topic 原始主题,用于日志
|
||||
* @return 服务标识符,如果获取失败返回 null
|
||||
*/
|
||||
private String getServiceIdentifier(String[] topicParts, String topic) {
|
||||
try {
|
||||
// 服务主题格式:/sys/{productKey}/{deviceName}/thing/service/{serviceIdentifier}
|
||||
return topicParts[6];
|
||||
} catch (ArrayIndexOutOfBoundsException e) {
|
||||
log.warn("[getServiceIdentifier][无法从主题中获取服务标识符][topic: {}]", topic);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送响应消息
|
||||
*
|
||||
* @param topic 原始主题
|
||||
* @param jsonObject 原始消息 JSON 对象
|
||||
* @param method 响应方法
|
||||
*/
|
||||
private void sendResponse(String topic, JSONObject jsonObject, String method) {
|
||||
try {
|
||||
String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
|
||||
|
||||
// 构建响应消息
|
||||
JSONObject response = new JSONObject();
|
||||
response.set("id", jsonObject.getStr("id"));
|
||||
response.set("code", 200);
|
||||
response.set("method", method);
|
||||
response.set("data", new JSONObject());
|
||||
|
||||
// 发送响应
|
||||
protocol.publishMessage(replyTopic, response.toString());
|
||||
log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,145 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.mqtt.messages.MqttPublishMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 协议的【上行】处理器
|
||||
* <p>
|
||||
* 处理设备上行消息,包括事件上报、属性上报、服务调用响应等
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotMqttUpstreamHandler {
|
||||
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotMqttUpstreamHandler() {
|
||||
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 MQTT 发布消息
|
||||
*/
|
||||
public void handle(MqttPublishMessage message) {
|
||||
String topic = message.topicName();
|
||||
String payload = message.payload().toString(StandardCharsets.UTF_8);
|
||||
|
||||
if (StrUtil.isBlank(topic)) {
|
||||
log.warn("[handle][主题为空,忽略消息]");
|
||||
return;
|
||||
}
|
||||
|
||||
if (StrUtil.isBlank(payload)) {
|
||||
log.warn("[handle][消息内容为空][topic: {}]", topic);
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[handle][收到 MQTT 消息][topic: {}]", topic);
|
||||
handle(topic, payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 MQTT 消息
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param payload 消息内容
|
||||
*/
|
||||
public void handle(String topic, String payload) {
|
||||
try {
|
||||
// 1. 识别并验证消息类型
|
||||
String messageType = getMessageType(topic);
|
||||
if (messageType == null) {
|
||||
log.warn("[handle][未知的消息类型][topic: {}]", topic);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 处理消息
|
||||
processMessage(topic, payload, messageType);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息的统一逻辑
|
||||
*/
|
||||
private void processMessage(String topic, String payload, String messageType) {
|
||||
log.info("[processMessage][接收到{}][topic: {}]", messageType, topic);
|
||||
|
||||
// 解析主题获取设备信息
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = topicParts[2];
|
||||
String deviceName = topicParts[3];
|
||||
|
||||
// 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName);
|
||||
|
||||
// 发送消息到队列
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
|
||||
// 记录成功日志
|
||||
log.info("[processMessage][处理{}成功][topic: {}]", messageType, topic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 识别消息类型
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 消息类型描述,如果不支持返回 null
|
||||
*/
|
||||
private String getMessageType(String topic) {
|
||||
// 设备事件上报: /sys/{productKey}/{deviceName}/thing/event/{eventIdentifier}/post
|
||||
if (topic.contains("/thing/event/") && topic.endsWith("/post")) {
|
||||
return "设备事件上报";
|
||||
}
|
||||
|
||||
// 设备属性操作: /sys/{productKey}/{deviceName}/thing/property/post
|
||||
// 或属性响应: /sys/{productKey}/{deviceName}/thing/service/property/set_reply
|
||||
if (topic.endsWith("/thing/property/post") ||
|
||||
topic.contains("/thing/service/property/set") ||
|
||||
topic.contains("/thing/service/property/get")) {
|
||||
return "设备属性操作";
|
||||
}
|
||||
|
||||
// 设备服务调用: /sys/{productKey}/{deviceName}/thing/service/{serviceIdentifier}
|
||||
if (topic.contains("/thing/service/") && !topic.contains("/property/")) {
|
||||
return "设备服务调用";
|
||||
}
|
||||
|
||||
// 不支持的消息类型
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析主题,获取主题各部分
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 主题各部分数组,如果解析失败返回 null
|
||||
*/
|
||||
private String[] parseTopic(String topic) {
|
||||
String[] topicParts = topic.split("/");
|
||||
if (topicParts.length < 7) {
|
||||
log.warn("[parseTopic][主题格式不正确][topic: {}]", topic);
|
||||
return null;
|
||||
}
|
||||
return topicParts;
|
||||
}
|
||||
}
|
|
@ -1,108 +0,0 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.mqtt.messages.MqttPublishMessage;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 上行路由器
|
||||
* <p>
|
||||
* 根据消息主题路由到不同的处理器
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotMqttUpstreamRouter {
|
||||
|
||||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
// 处理器实例
|
||||
private IotMqttPropertyHandler propertyHandler;
|
||||
private IotMqttEventHandler eventHandler;
|
||||
private IotMqttServiceHandler serviceHandler;
|
||||
|
||||
public IotMqttUpstreamRouter(IotMqttUpstreamProtocol protocol) {
|
||||
this.protocol = protocol;
|
||||
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
// 初始化处理器
|
||||
this.propertyHandler = new IotMqttPropertyHandler(protocol, deviceMessageProducer, deviceMessageService);
|
||||
this.eventHandler = new IotMqttEventHandler(protocol, deviceMessageProducer, deviceMessageService);
|
||||
this.serviceHandler = new IotMqttServiceHandler(protocol, deviceMessageProducer, deviceMessageService);
|
||||
}
|
||||
|
||||
/**
|
||||
* 路由 MQTT 消息
|
||||
*
|
||||
* @param message MQTT 发布消息
|
||||
*/
|
||||
public void route(MqttPublishMessage message) {
|
||||
String topic = message.topicName();
|
||||
String payload = message.payload().toString();
|
||||
log.info("[route][接收到 MQTT 消息][topic: {}][payload: {}]", topic, payload);
|
||||
|
||||
try {
|
||||
if (StrUtil.isEmpty(payload)) {
|
||||
log.warn("[route][消息内容为空][topic: {}]", topic);
|
||||
return;
|
||||
}
|
||||
|
||||
// 根据主题路由到相应的处理器
|
||||
if (isPropertyTopic(topic)) {
|
||||
propertyHandler.handle(topic, payload);
|
||||
} else if (isEventTopic(topic)) {
|
||||
eventHandler.handle(topic, payload);
|
||||
} else if (isServiceTopic(topic)) {
|
||||
serviceHandler.handle(topic, payload);
|
||||
} else {
|
||||
log.warn("[route][未知的消息类型][topic: {}]", topic);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[route][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为属性相关主题
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 是否为属性主题
|
||||
*/
|
||||
private boolean isPropertyTopic(String topic) {
|
||||
return topic.endsWith(IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic()) ||
|
||||
topic.contains(IotDeviceTopicEnum.PROPERTY_SET_TOPIC.getTopic()) ||
|
||||
topic.contains(IotDeviceTopicEnum.PROPERTY_GET_TOPIC.getTopic());
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为事件相关主题
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 是否为事件主题
|
||||
*/
|
||||
private boolean isEventTopic(String topic) {
|
||||
return topic.contains(IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic()) &&
|
||||
topic.endsWith(IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic());
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为服务相关主题
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 是否为服务主题
|
||||
*/
|
||||
private boolean isServiceTopic(String topic) {
|
||||
return topic.contains(IotDeviceTopicEnum.SERVICE_TOPIC_PREFIX.getTopic()) &&
|
||||
!isPropertyTopic(topic); // 排除属性相关的服务调用
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue