authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
- Boolean result = deviceUpstreamService.authenticateEmqxConnection(authReqDTO);
+ boolean result = deviceUpstreamService.authenticateEmqxConnection(authReqDTO);
return success(result);
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDeviceRespVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDeviceRespVO.java
index 38e41de70d..cc45c0280f 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDeviceRespVO.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDeviceRespVO.java
@@ -79,18 +79,6 @@ public class IotDeviceRespVO {
@ExcelProperty("设备密钥")
private String deviceSecret;
- @Schema(description = "MQTT 客户端 ID", example = "24602")
- @ExcelProperty("MQTT 客户端 ID")
- private String mqttClientId;
-
- @Schema(description = "MQTT 用户名", example = "芋艿")
- @ExcelProperty("MQTT 用户名")
- private String mqttUsername;
-
- @Schema(description = "MQTT 密码")
- @ExcelProperty("MQTT 密码")
- private String mqttPassword;
-
@Schema(description = "认证类型(如一机一密、动态注册)", example = "2")
@ExcelProperty("认证类型(如一机一密、动态注册)")
private String authType;
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java
index cb2e5b4003..dba529df2c 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java
@@ -67,6 +67,6 @@ public interface IotDeviceUpstreamService {
*
* @param authReqDTO Emqx 连接认证 DTO
*/
- Boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO);
+ boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO);
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java
index 9ce0efbb1e..29e8096a95 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java
@@ -174,7 +174,7 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
}
private void registerDevice0(String productKey, String deviceName, Long gatewayId,
- IotDeviceUpstreamAbstractReqDTO registerReqDTO) {
+ IotDeviceUpstreamAbstractReqDTO registerReqDTO) {
// 1.1 注册设备
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(productKey, deviceName);
boolean registerNew = device == null;
@@ -280,16 +280,15 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
sendDeviceMessage(message, device);
}
- // TODO @haohao:建议返回 boolean;
@Override
- public Boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
+ public boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
log.info("[authenticateEmqxConnection][认证 Emqx 连接: {}]", authReqDTO);
// 1. 校验设备是否存在
// username 格式:${DeviceName}&${ProductKey}
String[] usernameParts = authReqDTO.getUsername().split("&");
if (usernameParts.length != 2) {
log.error("[authenticateEmqxConnection][认证失败,username 格式不正确]");
- return Boolean.FALSE;
+ return false;
}
String deviceName = usernameParts[0];
String productKey = usernameParts[1];
@@ -298,19 +297,18 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
if (device == null) {
log.error("[authenticateEmqxConnection][设备({}/{}) 不存在]",
productKey, deviceName);
- return Boolean.FALSE;
+ return false;
}
// 2. 校验密码
String deviceSecret = device.getDeviceSecret();
String clientId = authReqDTO.getClientId();
MqttSignResult sign = MqttSignUtils.calculate(productKey, deviceName, deviceSecret, clientId);
- // TODO @haohao:notEquals,尽量不走取反逻辑哈
- if (!StrUtil.equals(sign.getPassword(), authReqDTO.getPassword())) {
- log.error("[authenticateEmqxConnection][认证失败,密码不正确]");
- return Boolean.FALSE;
+ if (StrUtil.equals(sign.getPassword(), authReqDTO.getPassword())) {
+ log.info("[authenticateEmqxConnection][认证成功]");
+ return true;
}
- log.info("[authenticateEmqxConnection][认证成功]");
- return Boolean.TRUE;
+ log.error("[authenticateEmqxConnection][认证失败,密码不正确]");
+ return false;
}
private void updateDeviceLastTime(IotDeviceDO device, IotDeviceUpstreamAbstractReqDTO reqDTO) {
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/util/MqttSignUtils.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/util/MqttSignUtils.java
index bf364c53ea..7a6b6e441a 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/util/MqttSignUtils.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/util/MqttSignUtils.java
@@ -1,9 +1,10 @@
package cn.iocoder.yudao.module.iot.util;
+import cn.hutool.crypto.digest.HMac;
+import cn.hutool.crypto.digest.HmacAlgorithm;
+import lombok.AllArgsConstructor;
import lombok.Getter;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
/**
@@ -13,10 +14,6 @@ import java.nio.charset.StandardCharsets;
*/
public class MqttSignUtils {
- private static final String SIGN_METHOD = "hmacsha256";
-
- // TODO @haohao:calculate 方法,可以融合么?
-
/**
* 计算 MQTT 连接参数
*
@@ -26,14 +23,7 @@ public class MqttSignUtils {
* @return 包含 clientId, username, password 的结果对象
*/
public static MqttSignResult calculate(String productKey, String deviceName, String deviceSecret) {
- String clientId = productKey + "." + deviceName;
- String username = deviceName + "&" + productKey;
- // 生成 password
- // TODO @haohao:signContent 和 signContentBuilder 风格保持统一的实现哈
- String signContent = String.format("clientId%sdeviceName%sdeviceSecret%sproductKey%s",
- clientId, deviceName, deviceSecret, productKey);
- String password = sign(signContent, deviceSecret);
- return new MqttSignResult(clientId, username, password);
+ return calculate(productKey, deviceName, deviceSecret, productKey + "." + deviceName);
}
/**
@@ -47,56 +37,31 @@ public class MqttSignUtils {
*/
public static MqttSignResult calculate(String productKey, String deviceName, String deviceSecret, String clientId) {
String username = deviceName + "&" + productKey;
- String signContentBuilder = "clientId" + clientId +
- "deviceName" + deviceName +
- "deviceSecret" + deviceSecret +
- "productKey" + productKey;
+ // 构建签名内容
+ StringBuilder signContentBuilder = new StringBuilder()
+ .append("clientId").append(clientId)
+ .append("deviceName").append(deviceName)
+ .append("deviceSecret").append(deviceSecret)
+ .append("productKey").append(productKey);
- String password = sign(signContentBuilder, deviceSecret);
+ // 使用 HMac 计算签名
+ byte[] key = deviceSecret.getBytes(StandardCharsets.UTF_8);
+ String signContent = signContentBuilder.toString();
+ HMac mac = new HMac(HmacAlgorithm.HmacSHA256, key);
+ String password = mac.digestHex(signContent);
return new MqttSignResult(clientId, username, password);
}
- // TODO @haohao:hutool 貌似有工具类可以用哈。
- private static String sign(String content, String key) {
- try {
- Mac mac = Mac.getInstance(SIGN_METHOD);
- mac.init(new SecretKeySpec(key.getBytes(StandardCharsets.UTF_8), SIGN_METHOD));
- byte[] signData = mac.doFinal(content.getBytes(StandardCharsets.UTF_8));
- return bytesToHex(signData);
- } catch (Exception e) {
- throw new RuntimeException("Failed to sign content with HmacSHA256", e);
- }
- }
-
- private static String bytesToHex(byte[] bytes) {
- StringBuilder hexString = new StringBuilder(bytes.length * 2);
- for (byte b : bytes) {
- String hex = Integer.toHexString(0xFF & b);
- if (hex.length() == 1) {
- hexString.append('0');
- }
- hexString.append(hex);
- }
- return hexString.toString();
- }
-
/**
* MQTT 签名结果类
*/
@Getter
- // TODO @haohao:可以用 lombok 哈
+ @AllArgsConstructor
public static class MqttSignResult {
-
private final String clientId;
private final String username;
private final String password;
-
- public MqttSignResult(String clientId, String username, String password) {
- this.clientId = clientId;
- this.username = username;
- this.password = password;
- }
-
}
+
}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml
index 34cb91d545..818c08b333 100644
--- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml
+++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml
@@ -160,5 +160,9 @@
io.vertx
vertx-web
+
+ io.vertx
+ vertx-mqtt
+
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java
index 9004e864ef..fbc689b353 100644
--- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java
+++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java
@@ -17,22 +17,29 @@ public class IotPluginEmqxProperties {
/**
* 服务主机
*/
- private String host;
-
+ private String mqttHost;
/**
* 服务端口
*/
- private int port;
+ private int mqttPort;
+ /**
+ * 服务用户名
+ */
+ private String mqttUsername;
+ /**
+ * 服务密码
+ */
+ private String mqttPassword;
/**
* 是否启用 SSL
*/
- private boolean ssl;
+ private boolean mqttSsl;
/**
* 订阅的主题
*/
- private String topics;
+ private String mqttTopics;
/**
* 认证端口
diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java
index 1d3ccba3ab..fbbfd7a5c8 100644
--- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java
+++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java
@@ -7,8 +7,6 @@ import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamH
/**
* EMQX 插件的 {@link IotDeviceDownstreamHandler} 实现类
*
- * 但是:由于设备通过 HTTP 短链接接入,导致其实无法下行指导给 device 设备,所以基本都是直接返回失败!!!
- * 类似 MQTT、WebSocket、TCP 插件,是可以实现下行指令的。
*
* @author 芋道源码
*/
diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java
index 49fdbe499f..bd0751511c 100644
--- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java
+++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java
@@ -1,20 +1,30 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.upstream;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
+import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.MqttClientOptions;
import lombok.extern.slf4j.Slf4j;
+import java.time.LocalDateTime;
+import java.util.UUID;
+
/**
* IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器
*
- * 协议:HTTP
+ * 协议:HTTP、MQTT
*
* @author haohao
*/
@@ -23,13 +33,16 @@ public class IotDeviceUpstreamServer {
private final Vertx vertx;
private final HttpServer server;
+ private final MqttClient client;
private final IotPluginEmqxProperties emqxProperties;
+ private final IotDeviceUpstreamApi deviceUpstreamApi;
public IotDeviceUpstreamServer(IotPluginCommonProperties commonProperties,
IotPluginEmqxProperties emqxProperties,
IotDeviceUpstreamApi deviceUpstreamApi,
IotDeviceDownstreamServer deviceDownstreamServer) {
this.emqxProperties = emqxProperties;
+ this.deviceUpstreamApi = deviceUpstreamApi;
// 创建 Vertx 实例
this.vertx = Vertx.vertx();
// 创建 Router 实例
@@ -39,18 +52,104 @@ public class IotDeviceUpstreamServer {
.handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
// 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router);
+
+ // 创建 MQTT 客户端
+ MqttClientOptions options = new MqttClientOptions()
+ .setClientId("yudao-iot-server-" + UUID.randomUUID())
+ .setUsername(emqxProperties.getMqttUsername())
+ .setPassword(emqxProperties.getMqttPassword())
+ .setSsl(emqxProperties.isMqttSsl());
+ client = MqttClient.create(vertx, options);
}
/**
- * 启动 HTTP 服务器
+ * 启动 HTTP 服务器、MQTT 客户端
*/
public void start() {
+ // 1. 启动 HTTP 服务器
log.info("[start][开始启动]");
server.listen(emqxProperties.getAuthPort())
.toCompletionStage()
.toCompletableFuture()
.join();
- log.info("[start][启动完成,端口({})]", this.server.actualPort());
+ log.info("[start][HTTP服务器启动完成,端口({})]", this.server.actualPort());
+
+ // 2. 连接 MQTT Broker
+ connectMqtt();
+
+ // 3. 添加 MQTT 断开重连监听器
+ client.closeHandler(v -> {
+ log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
+ // 等待 5 秒后重连,避免频繁重连
+ vertx.setTimer(5000, id -> {
+ log.info("[closeHandler][开始重新连接 MQTT]");
+ connectMqtt();
+ });
+ });
+
+ // 4. 设置 MQTT 消息处理器
+ client.publishHandler(message -> {
+ String topic = message.topicName();
+ String payload = message.payload().toString();
+ log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload);
+
+ try {
+ // 4.1 处理设备属性上报消息: /{productKey}/{deviceName}/event/property/post
+ if (topic.contains("/event/property/post")) {
+ // 4.2 解析消息内容
+ JSONObject jsonObject = JSONUtil.parseObj(payload);
+ String requestId = jsonObject.getStr("id");
+ Long timestamp = jsonObject.getLong("timestamp");
+
+ // 4.3 从 topic 中解析设备标识
+ String[] topicParts = topic.split("/");
+ String productKey = topicParts[1];
+ String deviceName = topicParts[2];
+
+ // 4.4 构建设备属性上报请求对象
+ IotDevicePropertyReportReqDTO devicePropertyReportReqDTO = ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
+ .setRequestId(requestId)
+ .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
+ .setProductKey(productKey).setDeviceName(deviceName))
+ .setProperties(jsonObject.getJSONObject("params"));
+
+ // 4.5 调用上游 API 处理设备上报数据
+ deviceUpstreamApi.reportDeviceProperty(devicePropertyReportReqDTO);
+ log.info("[messageHandler][处理设备上行消息成功][topic: {}][devicePropertyReportReqDTO: {}]",
+ topic, JSONUtil.toJsonStr(devicePropertyReportReqDTO));
+ }
+ } catch (Exception e) {
+ log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
+ }
+ });
+ }
+
+ /**
+ * 连接 MQTT Broker 并订阅主题
+ */
+ private void connectMqtt() {
+ // 连接 MQTT Broker
+ client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost())
+ .onSuccess(connAck -> {
+ log.info("[connectMqtt][MQTT客户端连接成功]");
+ // 连接成功后订阅主题
+ String mqttTopics = emqxProperties.getMqttTopics();
+ String[] topics = mqttTopics.split(",");
+ for (String topic : topics) {
+ client.subscribe(topic, 1)
+ .onSuccess(v -> log.info("[connectMqtt][成功订阅主题: {}]", topic))
+ .onFailure(err -> log.error("[connectMqtt][订阅主题失败: {}]", topic, err));
+ }
+ log.info("[connectMqtt][开始订阅设备上行消息主题]");
+ })
+ .onFailure(err -> {
+ log.error("[connectMqtt][连接 MQTT Broker 失败]", err);
+ // 连接失败后,等待 5 秒重试
+ vertx.setTimer(5000, id -> {
+ log.info("[connectMqtt][准备重新连接 MQTT]");
+ connectMqtt();
+ });
+ });
}
/**
@@ -67,6 +166,14 @@ public class IotDeviceUpstreamServer {
.join();
}
+ // 关闭 MQTT 客户端
+ if (client != null) {
+ client.disconnect()
+ .toCompletionStage()
+ .toCompletableFuture()
+ .join();
+ }
+
// 关闭 Vertx 实例
if (vertx != null) {
vertx.close()
diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml
index f31880f24a..7927313fd3 100644
--- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml
+++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml
@@ -10,8 +10,10 @@ yudao:
downstream-port: 8100
plugin-key: yudao-module-iot-plugin-emqx
emqx:
- host: 127.0.0.1
- port: 1883
- ssl: false
- topics: "/sys/#"
+ mqtt-host: 127.0.0.1
+ mqtt-port: 1883
+ mqtt-ssl: false
+ mqtt-username: yudao
+ mqtt-password: yudao
+ mqtt-topics: "/+/#"
auth-port: 8101