From 824a801b39225d356bf5a7d6745e62d084b93a83 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com>
Date: Fri, 7 Mar 2025 22:36:38 +0800
Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84?=
=?UTF-8?q?=E3=80=91IoT:=20=E6=B7=BB=E5=8A=A0=20Webhook=20=E5=A4=84?=
=?UTF-8?q?=E7=90=86=E5=99=A8=E4=BB=A5=E5=A4=84=E7=90=86=E8=AE=BE=E5=A4=87?=
=?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=92=8C=E6=96=AD=E5=BC=80=E4=BA=8B=E4=BB=B6?=
=?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0=E8=AE=BE=E5=A4=87=E7=8A=B6=E6=80=81?=
=?UTF-8?q?=E7=AE=A1=E7=90=86=E9=80=BB=E8=BE=91?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../upstream/IotDeviceUpstreamServer.java | 4 +
.../router/IotDeviceWebhookVertxHandler.java | 154 ++++++++++++++++++
2 files changed, 158 insertions(+)
create mode 100644 yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceWebhookVertxHandler.java
diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java
index c9db423e99..040985ba9a 100644
--- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java
+++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java
@@ -5,6 +5,7 @@ import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceMqttMessageHandler;
+import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceWebhookVertxHandler;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
@@ -70,6 +71,9 @@ public class IotDeviceUpstreamServer {
// TODO @haohao:疑问,mqtt 的认证,需要通过 http 呀?
// 回复:MQTT 认证不必须通过 HTTP 进行,但 HTTP 认证是 EMQX 等 MQTT 服务器支持的一种灵活的认证方式
.handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
+ // 添加 Webhook 处理器,用于处理设备连接和断开连接事件
+ router.post(IotDeviceWebhookVertxHandler.PATH)
+ .handler(new IotDeviceWebhookVertxHandler(deviceUpstreamApi));
// 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router);
this.mqttMessageHandler = new IotDeviceMqttMessageHandler(deviceUpstreamApi, client);
diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceWebhookVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceWebhookVertxHandler.java
new file mode 100644
index 0000000000..a2499826fd
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceWebhookVertxHandler.java
@@ -0,0 +1,154 @@
+package cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router;
+
+import cn.hutool.core.util.StrUtil;
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
+import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
+import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
+import io.vertx.core.Handler;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.RoutingContext;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.LocalDateTime;
+import java.util.Collections;
+
+/**
+ * IoT Emqx Webhook 事件处理的 Vert.x Handler
+ * EMQX
+ * Webhook
+ *
+ * @author haohao
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class IotDeviceWebhookVertxHandler implements Handler {
+
+ public static final String PATH = "/mqtt/webhook";
+
+ private final IotDeviceUpstreamApi deviceUpstreamApi;
+
+ @Override
+ public void handle(RoutingContext routingContext) {
+ try {
+ // 解析请求体
+ JsonObject json = routingContext.body().asJsonObject();
+ String event = json.getString("event");
+ String clientId = json.getString("clientid");
+ String username = json.getString("username");
+
+ // 处理不同的事件类型
+ switch (event) {
+ case "client.connected":
+ handleClientConnected(clientId, username);
+ break;
+ case "client.disconnected":
+ handleClientDisconnected(clientId, username);
+ break;
+ default:
+ log.info("[handle][未处理的 Webhook 事件] event={}, clientId={}, username={}", event, clientId,
+ username);
+ break;
+ }
+
+ // 返回成功响应
+ IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "success"));
+ } catch (Exception e) {
+ log.error("[handle][处理 Webhook 事件异常]", e);
+ IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "error"));
+ }
+ }
+
+ /**
+ * 处理客户端连接事件
+ *
+ * @param clientId 客户端ID
+ * @param username 用户名
+ */
+ private void handleClientConnected(String clientId, String username) {
+ if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
+ log.warn("[handleClientConnected][客户端连接事件,但用户名为空] clientId={}", clientId);
+ return;
+ }
+
+ // 解析产品标识和设备名称
+ String[] parts = parseUsername(username);
+ if (parts == null) {
+ return;
+ }
+
+ // 更新设备状态为在线
+ IotDeviceStateUpdateReqDTO updateReqDTO = new IotDeviceStateUpdateReqDTO();
+ updateReqDTO.setProductKey(parts[1]);
+ updateReqDTO.setDeviceName(parts[0]);
+ updateReqDTO.setState(IotDeviceStateEnum.ONLINE.getState());
+ updateReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+ updateReqDTO.setReportTime(LocalDateTime.now());
+
+ CommonResult result = deviceUpstreamApi.updateDeviceState(updateReqDTO);
+ if (result.getCode() != 0 || !result.getData()) {
+ log.error("[handleClientConnected][更新设备状态为在线失败] clientId={}, username={}, code={}, msg={}",
+ clientId, username, result.getCode(), result.getMsg());
+ } else {
+ log.info("[handleClientConnected][更新设备状态为在线成功] clientId={}, username={}", clientId, username);
+ }
+ }
+
+ /**
+ * 处理客户端断开连接事件
+ *
+ * @param clientId 客户端ID
+ * @param username 用户名
+ */
+ private void handleClientDisconnected(String clientId, String username) {
+ if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
+ log.warn("[handleClientDisconnected][客户端断开连接事件,但用户名为空] clientId={}", clientId);
+ return;
+ }
+
+ // 解析产品标识和设备名称
+ String[] parts = parseUsername(username);
+ if (parts == null) {
+ return;
+ }
+
+ // 更新设备状态为离线
+ IotDeviceStateUpdateReqDTO offlineReqDTO = new IotDeviceStateUpdateReqDTO();
+ offlineReqDTO.setProductKey(parts[1]);
+ offlineReqDTO.setDeviceName(parts[0]);
+ offlineReqDTO.setState(IotDeviceStateEnum.OFFLINE.getState());
+ offlineReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+ offlineReqDTO.setReportTime(LocalDateTime.now());
+
+ CommonResult offlineResult = deviceUpstreamApi.updateDeviceState(offlineReqDTO);
+ if (offlineResult.getCode() != 0 || !offlineResult.getData()) {
+ log.error("[handleClientDisconnected][更新设备状态为离线失败] clientId={}, username={}, code={}, msg={}",
+ clientId, username, offlineResult.getCode(), offlineResult.getMsg());
+ } else {
+ log.info("[handleClientDisconnected][更新设备状态为离线成功] clientId={}, username={}", clientId, username);
+ }
+ }
+
+ /**
+ * 解析用户名,格式为 deviceName&productKey
+ *
+ * @param username 用户名
+ * @return 解析结果,[0]为deviceName,[1]为productKey,解析失败返回null
+ */
+ private String[] parseUsername(String username) {
+ if (StrUtil.isEmpty(username)) {
+ return null;
+ }
+
+ String[] parts = username.split("&");
+ if (parts.length != 2) {
+ log.warn("[parseUsername][用户名格式不正确,无法解析产品标识和设备名称] username={}", username);
+ return null;
+ }
+
+ return parts;
+ }
+}
\ No newline at end of file