review:【IoT 物联网】mqtt broker 协议

This commit is contained in:
YunaiV 2025-06-12 09:55:17 +08:00
parent 74db72c9c0
commit af0ff1ce26
5 changed files with 17 additions and 5 deletions

View File

@ -48,15 +48,15 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
@Override @Override
public void onMessage(IotDeviceMessage message) { public void onMessage(IotDeviceMessage message) {
log.info("[onMessage][接收到下行消息:{}]", message); log.info("[onMessage][接收到下行消息:{}]", message);
try { try {
// 根据消息方法处理不同的下行消息 // 根据消息方法处理不同的下行消息
String method = message.getMethod(); String method = message.getMethod();
if (method == null) { if (method == null) {
log.warn("[onMessage][消息方法为空]"); log.warn("[onMessage][消息({})方法为空]", message);
return; return;
} }
// TODO @haohao看看怎么融合下
if (method.startsWith("thing.service.property.")) { if (method.startsWith("thing.service.property.")) {
handlePropertyMessage(message); handlePropertyMessage(message);
} else if (method.startsWith("thing.service.") && !method.contains("property") && !method.contains("config") } else if (method.startsWith("thing.service.") && !method.contains("property") && !method.contains("config")
@ -83,6 +83,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
String method = message.getMethod(); String method = message.getMethod();
// 通过 deviceId 获取设备信息 // 通过 deviceId 获取设备信息
// TODO @haohao通过缓存拿
IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO(); IotDeviceGetReqDTO reqDTO = new IotDeviceGetReqDTO();
reqDTO.setId(message.getDeviceId()); reqDTO.setId(message.getDeviceId());
IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData(); IotDeviceRespDTO deviceInfo = deviceApi.getDevice(reqDTO).getData();

View File

@ -12,6 +12,7 @@ import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
// TODO @haohao可以融合到 IotMqttUpstreamProtocol 主要考虑一个协议一个 protocol
/** /**
* IoT 网关 MQTT HTTP 认证协议提供 HTTP 认证接口供 EMQX 调用 * IoT 网关 MQTT HTTP 认证协议提供 HTTP 认证接口供 EMQX 调用
* *
@ -82,6 +83,7 @@ public class IotMqttHttpAuthProtocol extends AbstractVerticle {
} }
} }
// TODO @haohao这里不搞默认必须填写哈
/** /**
* 获取认证服务端口 * 获取认证服务端口
* 从配置中获取默认使用 8090 端口 * 从配置中获取默认使用 8090 端口

View File

@ -1,3 +1,4 @@
### TODO 芋艿:这个文件是不是要留着;
### IoT MQTT HTTP 认证服务测试文件 ### IoT MQTT HTTP 认证服务测试文件
### 用于测试独立的 MQTT HTTP 认证服务接口 ### 用于测试独立的 MQTT HTTP 认证服务接口
### 服务由 IotMqttHttpAuthProtocol 启动,运行在配置的端口上(默认 8090 ### 服务由 IotMqttHttpAuthProtocol 启动,运行在配置的端口上(默认 8090
@ -101,7 +102,7 @@ Content-Type: application/json
### 假设配置了端口 8091 ### 假设配置了端口 8091
### POST http://localhost:8091/mqtt/auth/authenticate ### POST http://localhost:8091/mqtt/auth/authenticate
### Content-Type: application/json ### Content-Type: application/json
### ###
### { ### {
### "clientid": "test_product.test_device", ### "clientid": "test_product.test_device",
### "username": "test_device&test_product", ### "username": "test_device&test_product",
@ -136,4 +137,4 @@ Content-Type: application/json
### url = "http://localhost:8090/mqtt/auth/disconnected" # 使用配置的端口 ### url = "http://localhost:8090/mqtt/auth/disconnected" # 使用配置的端口
### events = ["client.disconnected"] ### events = ["client.disconnected"]
### } ### }
### ] ### ]

View File

@ -9,6 +9,7 @@ import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions; import io.vertx.mqtt.MqttClientOptions;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
@ -59,6 +60,7 @@ public class IotMqttUpstreamProtocol {
// 创建 MQTT 客户端 // 创建 MQTT 客户端
MqttClientOptions options = new MqttClientOptions() MqttClientOptions options = new MqttClientOptions()
// TODO @haohaoclientid 建议也配置文件想通名字会冲突哇
.setClientId("yudao-iot-gateway-" + IdUtil.fastSimpleUUID()) .setClientId("yudao-iot-gateway-" + IdUtil.fastSimpleUUID())
.setUsername(emqxProperties.getMqttUsername()) .setUsername(emqxProperties.getMqttUsername())
.setPassword(emqxProperties.getMqttPassword()) .setPassword(emqxProperties.getMqttPassword())
@ -121,6 +123,7 @@ public class IotMqttUpstreamProtocol {
*/ */
private void connectMqtt() { private void connectMqtt() {
// 检查必要的 MQTT 配置 // 检查必要的 MQTT 配置
// TODO @haohao这些通过配置文件的 validate 去做哈简化下
String host = emqxProperties.getMqttHost(); String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort(); Integer port = emqxProperties.getMqttPort();
if (StrUtil.isBlank(host)) { if (StrUtil.isBlank(host)) {
@ -149,6 +152,7 @@ public class IotMqttUpstreamProtocol {
// 订阅主题 // 订阅主题
subscribeToTopics(); subscribeToTopics();
}) })
// TODO @haohao这个要不要改成必须连接成功不做重试不然启动也蛮危险的
.exceptionally(error -> { .exceptionally(error -> {
log.error("[connectMqtt][连接 MQTT Broker 失败]", error); log.error("[connectMqtt][连接 MQTT Broker 失败]", error);
reconnectWithDelay(); reconnectWithDelay();
@ -178,12 +182,14 @@ public class IotMqttUpstreamProtocol {
*/ */
private void subscribeToTopics() { private void subscribeToTopics() {
List<String> topicList = emqxProperties.getMqttTopics(); List<String> topicList = emqxProperties.getMqttTopics();
// TODO @haohao建议 topicList 直接 validate 校验
if (CollUtil.isEmpty(topicList)) { if (CollUtil.isEmpty(topicList)) {
log.warn("[subscribeToTopics][未配置 MQTT 主题,使用默认主题]"); log.warn("[subscribeToTopics][未配置 MQTT 主题,使用默认主题]");
topicList = List.of("/sys/#"); // 默认订阅所有系统主题 topicList = List.of("/sys/#"); // 默认订阅所有系统主题
} }
for (String topic : topicList) { for (String topic : topicList) {
// TODO @haohao直接 validate 校验嘿嘿主要保证核心逻辑简单点
if (StrUtil.isBlank(topic)) { if (StrUtil.isBlank(topic)) {
log.warn("[subscribeToTopics][跳过空主题]"); log.warn("[subscribeToTopics][跳过空主题]");
continue; continue;
@ -224,7 +230,7 @@ public class IotMqttUpstreamProtocol {
return; return;
} }
mqttClient.publish(topic, io.vertx.core.buffer.Buffer.buffer(payload), DEFAULT_QOS, false, false) mqttClient.publish(topic, Buffer.buffer(payload), DEFAULT_QOS, false, false)
.onSuccess(v -> log.debug("[publishMessage][发送消息成功][topic: {}]", topic)) .onSuccess(v -> log.debug("[publishMessage][发送消息成功][topic: {}]", topic))
.onFailure(err -> log.error("[publishMessage][发送消息失败][topic: {}]", topic, err)); .onFailure(err -> log.error("[publishMessage][发送消息失败][topic: {}]", topic, err));
} }
@ -238,6 +244,7 @@ public class IotMqttUpstreamProtocol {
return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); return IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
} }
// TODO @haohao这个要删除哇
/** /**
* 获取 MQTT 客户端 * 获取 MQTT 客户端
* *

View File

@ -56,6 +56,7 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler {
log.info("[handle][处理设备事件上报成功][topic: {}]", topic); log.info("[handle][处理设备事件上报成功][topic: {}]", topic);
// 发送响应消息 // 发送响应消息
// TODO @haohao这里应该只 ack reply biz 业务处理了handleUpstreamDeviceMessage
String method = "thing.event." + eventIdentifier + ".post"; String method = "thing.event." + eventIdentifier + ".post";
sendResponse(topic, JSONUtil.parseObj(payload), method); sendResponse(topic, JSONUtil.parseObj(payload), method);
} catch (Exception e) { } catch (Exception e) {