From 3c9985978b081cbdff7541b13f98d347b26d1fd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com> Date: Sun, 2 Mar 2025 20:47:50 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84?= =?UTF-8?q?=E3=80=91IoT:=20=E6=9B=B4=E6=96=B0=20MQTT=20=E4=B8=BB=E9=A2=98?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=BA=E6=95=B0=E7=BB=84=EF=BC=8C=E9=87=8D?= =?UTF-8?q?=E6=9E=84=20EMQX=20=E8=AE=A4=E8=AF=81=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E4=BC=98=E5=8C=96=E5=BC=82=E5=B8=B8=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=92=8C=E5=93=8D=E5=BA=94=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework/tdengine/core/TaosAspect.java | 39 --------------- .../common/util/IotPluginCommonUtils.java | 14 ++++++ .../emqx/config/IotPluginEmqxProperties.java | 5 +- .../upstream/IotDeviceUpstreamServer.java | 6 +-- .../router/IotDeviceAuthVertxHandler.java | 48 ++++++++++--------- .../src/main/resources/application.yml | 3 +- .../plugin/http/IotHttpPluginApplication.java | 10 +--- .../http/config/IotHttpVertxPlugin.java | 14 ------ 8 files changed, 47 insertions(+), 92 deletions(-) delete mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/tdengine/core/TaosAspect.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/tdengine/core/TaosAspect.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/tdengine/core/TaosAspect.java deleted file mode 100644 index d83f34d04c..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/tdengine/core/TaosAspect.java +++ /dev/null @@ -1,39 +0,0 @@ -package cn.iocoder.yudao.module.iot.framework.tdengine.core; - -import lombok.extern.slf4j.Slf4j; -import org.aspectj.lang.ProceedingJoinPoint; -import org.aspectj.lang.annotation.Around; -import org.aspectj.lang.annotation.Aspect; -import org.springframework.stereotype.Component; - -import java.sql.Timestamp; -import java.util.Map; - -// TODO @haohao:这个还需要的么? -/** - * TaosAspect 是一个处理 Taos 数据库返回值的切面。 - */ -@Aspect -@Component -@Slf4j -public class TaosAspect { - - @Around("execution(java.util.Map cn.iocoder.yudao.module.iot.dal.tdengine.*.*(..))") - public Object handleType(ProceedingJoinPoint joinPoint) { - Map result = null; - try { - result = (Map) joinPoint.proceed(); - result.replaceAll((key, value) -> { - if (value instanceof byte[]) { - return new String((byte[]) value); - } else if (value instanceof Timestamp) { - return ((Timestamp) value).getTime(); - } - return value; - }); - } catch (Throwable e) { - log.error("TaosAspect handleType error", e); - } - return result; - } -} diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java index a632c73c70..753e62c94b 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-common/src/main/java/cn/iocoder/yudao/module/iot/plugin/common/util/IotPluginCommonUtils.java @@ -49,4 +49,18 @@ public class IotPluginCommonUtils { .end(result); } + /** + * 将对象转换为JSON字符串后写入响应 + * + * @param routingContext 路由上下文 + * @param data 要转换为JSON的数据对象 + */ + @SuppressWarnings("deprecation") + public static void writeJson(RoutingContext routingContext, Object data) { + routingContext.response() + .setStatusCode(200) + .putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) + .end(JsonUtils.toJsonString(data)); + } + } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java index 72a085bd9d..4117e71820 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java @@ -36,11 +36,10 @@ public class IotPluginEmqxProperties { */ private boolean mqttSsl; - // TODO @haohao:这个是不是改成数组? /** - * 订阅的主题 + * 订阅的主题列表 */ - private String mqttTopics; + private String[] mqttTopics; /** * 认证端口 diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java index 5dd627671f..54479df158 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java @@ -67,6 +67,7 @@ public class IotDeviceUpstreamServer { router.route().handler(BodyHandler.create()); // 处理 Body router.post(IotDeviceAuthVertxHandler.PATH) // TODO @haohao:疑问,mqtt 的认证,需要通过 http 呀? + // 回复:MQTT 认证不必须通过 HTTP 进行,但 HTTP 认证是 EMQX 等 MQTT 服务器支持的一种灵活的认证方式 .handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi)); // 创建 HttpServer 实例 this.server = vertx.createHttpServer().requestHandler(router); @@ -165,15 +166,14 @@ public class IotDeviceUpstreamServer { * @return 订阅结果的Future */ private Future subscribeToTopics() { - String topicsStr = emqxProperties.getMqttTopics(); - if (topicsStr == null || topicsStr.trim().isEmpty()) { + String[] topics = emqxProperties.getMqttTopics(); + if (topics == null || topics.length == 0) { log.warn("[subscribeToTopics] 未配置MQTT主题,跳过订阅"); return Future.succeededFuture(); } log.info("[subscribeToTopics] 开始订阅设备上行消息主题"); - String[] topics = topicsStr.split(TOPIC_SEPARATOR); Future compositeFuture = Future.succeededFuture(); for (String topic : topics) { diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java index 8eac1ffbde..350de674cd 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceAuthVertxHandler.java @@ -10,9 +10,12 @@ import io.vertx.ext.web.RoutingContext; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import java.util.Collections; + /** * IoT Emqx 连接认证的 Vert.x Handler - * ... + * ... * * @author haohao */ @@ -27,30 +30,29 @@ public class IotDeviceAuthVertxHandler implements Handler { @Override @SuppressWarnings("unchecked") public void handle(RoutingContext routingContext) { - // TODO @haohao:try catch 兜底异常 - JsonObject json = routingContext.body().asJsonObject(); - String clientId = json.getString("clientid"); - String username = json.getString("username"); - String password = json.getString("password"); + try { + JsonObject json = routingContext.body().asJsonObject(); + String clientId = json.getString("clientid"); + String username = json.getString("username"); + String password = json.getString("password"); - IotDeviceEmqxAuthReqDTO authReqDTO = buildDeviceEmqxAuthReqDTO(clientId, username, password); + // 构建认证请求DTO + IotDeviceEmqxAuthReqDTO authReqDTO = new IotDeviceEmqxAuthReqDTO() + .setClientId(clientId) + .setUsername(username) + .setPassword(password); - CommonResult authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO); - if (authResult.getCode() != 0 || !authResult.getData()) { - denyAccess(routingContext); - return; + // 调用认证API + CommonResult authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO); + if (authResult.getCode() != 0 || !authResult.getData()) { + IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny")); + return; + } + + IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "allow")); + } catch (Exception e) { + log.error("[handle][EMQX认证异常]", e); + IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny")); } - // TODO @haohao:貌似可以考虑封装一个 writeJson ,里面有个参数是 data,然后里面去 JsonUtils.toJsonString(data) - IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"allow\"}"); } - - // TODO @haohao:下面两个简单方法,貌似可以考虑不抽小方法哈。 - private void denyAccess(RoutingContext routingContext) { - IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"deny\"}"); - } - - private IotDeviceEmqxAuthReqDTO buildDeviceEmqxAuthReqDTO(String clientId, String username, String password) { - return new IotDeviceEmqxAuthReqDTO().setClientId(clientId).setUsername(username).setPassword(password); - } - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml index 9343d3614e..c00621c82a 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml @@ -15,5 +15,6 @@ yudao: mqtt-ssl: false mqtt-username: yudao mqtt-password: 123456 - mqtt-topics: "/sys/#" + mqtt-topics: + - "/sys/#" auth-port: 8101 diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/IotHttpPluginApplication.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/IotHttpPluginApplication.java index 0aa08505aa..a88b34eb31 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/IotHttpPluginApplication.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/IotHttpPluginApplication.java @@ -4,7 +4,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.ConfigurableApplicationContext; /** * 独立运行入口 @@ -16,14 +15,7 @@ public class IotHttpPluginApplication { public static void main(String[] args) { SpringApplication application = new SpringApplication(IotHttpPluginApplication.class); application.setWebApplicationType(WebApplicationType.NONE); - ConfigurableApplicationContext context = application.run(args); - - // 手动获取 VertxService 并启动 - // TODO @haohao:可以放在 bean 的 init 里么?回复:会和插件模式冲突 @芋艿,测试下 - // TODO @haohao:貌似去掉,没有问题额。。。 -// IotDeviceUpstreamServer vertxService = context.getBean(IotDeviceUpstreamServer.class); -// vertxService.start(); - + application.run(args); log.info("[main][独立模式启动完成]"); } diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/config/IotHttpVertxPlugin.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/config/IotHttpVertxPlugin.java index ac9a933401..674980d005 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/config/IotHttpVertxPlugin.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-http/src/main/java/cn/iocoder/yudao/module/iot/plugin/http/config/IotHttpVertxPlugin.java @@ -2,11 +2,9 @@ package cn.iocoder.yudao.module.iot.plugin.http.config; import cn.hutool.core.lang.Assert; import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import lombok.extern.slf4j.Slf4j; import org.pf4j.PluginWrapper; import org.pf4j.spring.SpringPlugin; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -29,11 +27,6 @@ public class IotHttpVertxPlugin extends SpringPlugin { ApplicationContext pluginContext = getApplicationContext(); Assert.notNull(pluginContext, "pluginContext 不能为空"); - // 2. 启动 Vert.x - // TODO @haohao:貌似去掉,没有问题额。。。 -// IotDeviceUpstreamServer vertxService = pluginContext.getBean(IotDeviceUpstreamServer.class); -// vertxService.start(); - log.info("[HttpVertxPlugin][HttpVertxPlugin 插件启动成功...]"); } catch (Exception e) { log.error("[HttpVertxPlugin][HttpVertxPlugin 插件开启动异常...]", e); @@ -44,13 +37,6 @@ public class IotHttpVertxPlugin extends SpringPlugin { public void stop() { log.info("[HttpVertxPlugin][HttpVertxPlugin 插件停止开始...]"); try { - // 停止服务器 -// ApplicationContext pluginContext = getApplicationContext(); -// if (pluginContext != null) { -// IotDeviceUpstreamServer vertxService = pluginContext.getBean(IotDeviceUpstreamServer.class); -// vertxService.stop(); -// } - log.info("[HttpVertxPlugin][HttpVertxPlugin 插件停止成功...]"); } catch (Exception e) { log.error("[HttpVertxPlugin][HttpVertxPlugin 插件停止异常...]", e);