【功能完善】IoT: 更新 MQTT 主题配置为数组,重构 EMQX 认证逻辑,优化异常处理和响应格式

This commit is contained in:
安浩浩 2025-03-02 20:47:50 +08:00
parent c6b58b0ebf
commit 3c9985978b
8 changed files with 47 additions and 92 deletions

View File

@ -1,39 +0,0 @@
package cn.iocoder.yudao.module.iot.framework.tdengine.core;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import java.sql.Timestamp;
import java.util.Map;
// TODO @haohao这个还需要的么
/**
* TaosAspect 是一个处理 Taos 数据库返回值的切面
*/
@Aspect
@Component
@Slf4j
public class TaosAspect {
@Around("execution(java.util.Map<String,Object> cn.iocoder.yudao.module.iot.dal.tdengine.*.*(..))")
public Object handleType(ProceedingJoinPoint joinPoint) {
Map<String, Object> result = null;
try {
result = (Map<String, Object>) joinPoint.proceed();
result.replaceAll((key, value) -> {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else if (value instanceof Timestamp) {
return ((Timestamp) value).getTime();
}
return value;
});
} catch (Throwable e) {
log.error("TaosAspect handleType error", e);
}
return result;
}
}

View File

@ -49,4 +49,18 @@ public class IotPluginCommonUtils {
.end(result);
}
/**
* 将对象转换为JSON字符串后写入响应
*
* @param routingContext 路由上下文
* @param data 要转换为JSON的数据对象
*/
@SuppressWarnings("deprecation")
public static void writeJson(RoutingContext routingContext, Object data) {
routingContext.response()
.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(JsonUtils.toJsonString(data));
}
}

View File

@ -36,11 +36,10 @@ public class IotPluginEmqxProperties {
*/
private boolean mqttSsl;
// TODO @haohao这个是不是改成数组
/**
* 订阅的主题
* 订阅的主题列表
*/
private String mqttTopics;
private String[] mqttTopics;
/**
* 认证端口

View File

@ -67,6 +67,7 @@ public class IotDeviceUpstreamServer {
router.route().handler(BodyHandler.create()); // 处理 Body
router.post(IotDeviceAuthVertxHandler.PATH)
// TODO @haohao疑问mqtt 的认证需要通过 http
// 回复MQTT 认证不必须通过 HTTP 进行 HTTP 认证是 EMQX MQTT 服务器支持的一种灵活的认证方式
.handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
// 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router);
@ -165,15 +166,14 @@ public class IotDeviceUpstreamServer {
* @return 订阅结果的Future
*/
private Future<Void> subscribeToTopics() {
String topicsStr = emqxProperties.getMqttTopics();
if (topicsStr == null || topicsStr.trim().isEmpty()) {
String[] topics = emqxProperties.getMqttTopics();
if (topics == null || topics.length == 0) {
log.warn("[subscribeToTopics] 未配置MQTT主题跳过订阅");
return Future.succeededFuture();
}
log.info("[subscribeToTopics] 开始订阅设备上行消息主题");
String[] topics = topicsStr.split(TOPIC_SEPARATOR);
Future<Void> compositeFuture = Future.succeededFuture();
for (String topic : topics) {

View File

@ -10,9 +10,12 @@ import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
/**
* IoT Emqx 连接认证的 Vert.x Handler
* <a href="https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">...</a>
* <a href=
* "https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">...</a>
*
* @author haohao
*/
@ -27,30 +30,29 @@ public class IotDeviceAuthVertxHandler implements Handler<RoutingContext> {
@Override
@SuppressWarnings("unchecked")
public void handle(RoutingContext routingContext) {
// TODO @haohaotry catch 兜底异常
try {
JsonObject json = routingContext.body().asJsonObject();
String clientId = json.getString("clientid");
String username = json.getString("username");
String password = json.getString("password");
IotDeviceEmqxAuthReqDTO authReqDTO = buildDeviceEmqxAuthReqDTO(clientId, username, password);
// 构建认证请求DTO
IotDeviceEmqxAuthReqDTO authReqDTO = new IotDeviceEmqxAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password);
// 调用认证API
CommonResult<Boolean> authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO);
if (authResult.getCode() != 0 || !authResult.getData()) {
denyAccess(routingContext);
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny"));
return;
}
// TODO @haohao貌似可以考虑封装一个 writeJson 里面有个参数是 data然后里面去 JsonUtils.toJsonString(data)
IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"allow\"}");
}
// TODO @haohao下面两个简单方法貌似可以考虑不抽小方法哈
private void denyAccess(RoutingContext routingContext) {
IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"deny\"}");
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "allow"));
} catch (Exception e) {
log.error("[handle][EMQX认证异常]", e);
IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "deny"));
}
private IotDeviceEmqxAuthReqDTO buildDeviceEmqxAuthReqDTO(String clientId, String username, String password) {
return new IotDeviceEmqxAuthReqDTO().setClientId(clientId).setUsername(username).setPassword(password);
}
}

View File

@ -15,5 +15,6 @@ yudao:
mqtt-ssl: false
mqtt-username: yudao
mqtt-password: 123456
mqtt-topics: "/sys/#"
mqtt-topics:
- "/sys/#"
auth-port: 8101

View File

@ -4,7 +4,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
/**
* 独立运行入口
@ -16,14 +15,7 @@ public class IotHttpPluginApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(IotHttpPluginApplication.class);
application.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = application.run(args);
// 手动获取 VertxService 并启动
// TODO @haohao可以放在 bean init 里么回复会和插件模式冲突 @芋艿测试下
// TODO @haohao貌似去掉没有问题额
// IotDeviceUpstreamServer vertxService = context.getBean(IotDeviceUpstreamServer.class);
// vertxService.start();
application.run(args);
log.info("[main][独立模式启动完成]");
}

View File

@ -2,11 +2,9 @@ package cn.iocoder.yudao.module.iot.plugin.http.config;
import cn.hutool.core.lang.Assert;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginWrapper;
import org.pf4j.spring.SpringPlugin;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@ -29,11 +27,6 @@ public class IotHttpVertxPlugin extends SpringPlugin {
ApplicationContext pluginContext = getApplicationContext();
Assert.notNull(pluginContext, "pluginContext 不能为空");
// 2. 启动 Vert.x
// TODO @haohao貌似去掉没有问题额
// IotDeviceUpstreamServer vertxService = pluginContext.getBean(IotDeviceUpstreamServer.class);
// vertxService.start();
log.info("[HttpVertxPlugin][HttpVertxPlugin 插件启动成功...]");
} catch (Exception e) {
log.error("[HttpVertxPlugin][HttpVertxPlugin 插件开启动异常...]", e);
@ -44,13 +37,6 @@ public class IotHttpVertxPlugin extends SpringPlugin {
public void stop() {
log.info("[HttpVertxPlugin][HttpVertxPlugin 插件停止开始...]");
try {
// 停止服务器
// ApplicationContext pluginContext = getApplicationContext();
// if (pluginContext != null) {
// IotDeviceUpstreamServer vertxService = pluginContext.getBean(IotDeviceUpstreamServer.class);
// vertxService.stop();
// }
log.info("[HttpVertxPlugin][HttpVertxPlugin 插件停止成功...]");
} catch (Exception e) {
log.error("[HttpVertxPlugin][HttpVertxPlugin 插件停止异常...]", e);