【代码评审】IoT:MQTT 插件

This commit is contained in:
YunaiV 2025-02-27 12:45:42 +08:00
parent 006ef40c4b
commit 36dd18d41f
6 changed files with 35 additions and 10 deletions

View File

@ -59,9 +59,11 @@ public class MqttSignUtils {
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public static class MqttSignResult { public static class MqttSignResult {
private final String clientId; private final String clientId;
private final String username; private final String username;
private final String password; private final String password;
} }
} }

View File

@ -36,6 +36,7 @@ public class IotPluginEmqxProperties {
*/ */
private boolean mqttSsl; private boolean mqttSsl;
// TODO @haohao这个是不是改成数组
/** /**
* 订阅的主题 * 订阅的主题
*/ */

View File

@ -26,6 +26,7 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
private static final String SYS_TOPIC_PREFIX = "/sys/"; private static final String SYS_TOPIC_PREFIX = "/sys/";
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈
// 设备服务调用 标准 JSON // 设备服务调用 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier} // 请求Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}
// 响应Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply // 响应Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply

View File

@ -26,17 +26,33 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class IotDeviceUpstreamServer { public class IotDeviceUpstreamServer {
private static final int RECONNECT_DELAY_MS = 5000; // 重连延迟时间(毫秒) /**
private static final int CONNECTION_TIMEOUT_MS = 10000; // 连接超时时间(毫秒) * 重连延迟时间(毫秒)
private static final String TOPIC_SEPARATOR = ","; // 主题分隔符 */
private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE; // 默认QoS级别 private static final int RECONNECT_DELAY_MS = 5000;
/**
* 连接超时时间(毫秒)
*/
private static final int CONNECTION_TIMEOUT_MS = 10000;
/**
* 主题分隔符
*/
private static final String TOPIC_SEPARATOR = ",";
/**
* 默认 QoS 级别
*/
private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE;
private final Vertx vertx; private final Vertx vertx;
private final HttpServer server; private final HttpServer server;
private final MqttClient client; private final MqttClient client;
private final IotPluginEmqxProperties emqxProperties; private final IotPluginEmqxProperties emqxProperties;
private final IotDeviceMqttMessageHandler mqttMessageHandler; private final IotDeviceMqttMessageHandler mqttMessageHandler;
private volatile boolean isRunning = false; // 服务运行状态标志
/**
* 服务运行状态标志
*/
private volatile boolean isRunning = false;
public IotDeviceUpstreamServer(IotPluginEmqxProperties emqxProperties, public IotDeviceUpstreamServer(IotPluginEmqxProperties emqxProperties,
IotDeviceUpstreamApi deviceUpstreamApi, IotDeviceUpstreamApi deviceUpstreamApi,
@ -50,6 +66,7 @@ public class IotDeviceUpstreamServer {
Router router = Router.router(vertx); Router router = Router.router(vertx);
router.route().handler(BodyHandler.create()); // 处理 Body router.route().handler(BodyHandler.create()); // 处理 Body
router.post(IotDeviceAuthVertxHandler.PATH) router.post(IotDeviceAuthVertxHandler.PATH)
// TODO @haohao疑问mqtt 的认证需要通过 http
.handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi)); .handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
// 创建 HttpServer 实例 // 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router); this.server = vertx.createHttpServer().requestHandler(router);

View File

@ -27,7 +27,7 @@ public class IotDeviceAuthVertxHandler implements Handler<RoutingContext> {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void handle(RoutingContext routingContext) { public void handle(RoutingContext routingContext) {
// TODO @haohaotry catch 兜底异常
JsonObject json = routingContext.body().asJsonObject(); JsonObject json = routingContext.body().asJsonObject();
String clientId = json.getString("clientid"); String clientId = json.getString("clientid");
String username = json.getString("username"); String username = json.getString("username");
@ -40,9 +40,11 @@ public class IotDeviceAuthVertxHandler implements Handler<RoutingContext> {
denyAccess(routingContext); denyAccess(routingContext);
return; return;
} }
// TODO @haohao貌似可以考虑封装一个 writeJson 里面有个参数是 data然后里面去 JsonUtils.toJsonString(data)
IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"allow\"}"); IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"allow\"}");
} }
// TODO @haohao下面两个简单方法貌似可以考虑不抽小方法哈
private void denyAccess(RoutingContext routingContext) { private void denyAccess(RoutingContext routingContext) {
IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"deny\"}"); IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"deny\"}");
} }

View File

@ -26,9 +26,11 @@ import java.util.Arrays;
@Slf4j @Slf4j
public class IotDeviceMqttMessageHandler { public class IotDeviceMqttMessageHandler {
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈
// 设备上报属性 标准 JSON // 设备上报属性 标准 JSON
// 请求 Topic/sys/${productKey}/${deviceName}/thing/event/property/post // 请求 Topic/sys/${productKey}/${deviceName}/thing/event/property/post
// 响应 Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply // 响应 Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply
// 设备上报事件 标准 JSON // 设备上报事件 标准 JSON
// 请求 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post // 请求 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
// 响应 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply // 响应 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply