【代码评审】IoT:评审 plugin 实现
This commit is contained in:
parent
a2532013ec
commit
e998b0c7eb
|
@ -22,6 +22,7 @@
|
||||||
<artifactId>yudao-common</artifactId>
|
<artifactId>yudao-common</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- PF4J -->
|
<!-- PF4J -->
|
||||||
|
<!-- TODO 芋艿:这个依赖,要不要放在 api 包 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.pf4j</groupId>
|
<groupId>org.pf4j</groupId>
|
||||||
<artifactId>pf4j-spring</artifactId>
|
<artifactId>pf4j-spring</artifactId>
|
||||||
|
|
|
@ -7,6 +7,7 @@ import java.util.Map;
|
||||||
* 服务注册表 - 插架模块使用,无法使用 Spring 注入
|
* 服务注册表 - 插架模块使用,无法使用 Spring 注入
|
||||||
*/
|
*/
|
||||||
public class ServiceRegistry {
|
public class ServiceRegistry {
|
||||||
|
|
||||||
private static final Map<Class<?>, Object> services = new HashMap<>();
|
private static final Map<Class<?>, Object> services = new HashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -31,4 +32,5 @@ public class ServiceRegistry {
|
||||||
public static <T> T getService(Class<T> serviceClass) {
|
public static <T> T getService(Class<T> serviceClass) {
|
||||||
return (T) services.get(serviceClass);
|
return (T) services.get(serviceClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package cn.iocoder.yudao.module.iot.api;
|
package cn.iocoder.yudao.module.iot.api.device;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设备数据 API
|
* 设备数据 API
|
|
@ -81,6 +81,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.pf4j</groupId>
|
<groupId>org.pf4j</groupId>
|
||||||
<artifactId>pf4j-spring</artifactId>
|
<artifactId>pf4j-spring</artifactId>
|
||||||
|
<!-- TODO @芋艿:可以放到 bom 里配置 -->
|
||||||
<exclusions>
|
<exclusions>
|
||||||
<exclusion>
|
<exclusion>
|
||||||
<groupId>org.slf4j</groupId>
|
<groupId>org.slf4j</groupId>
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
package cn.iocoder.yudao.module.iot.api.device;
|
package cn.iocoder.yudao.module.iot.api.device;
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.api.DeviceDataApi;
|
|
||||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceDataService;
|
import cn.iocoder.yudao.module.iot.service.device.IotDeviceDataService;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.validation.annotation.Validated;
|
import org.springframework.validation.annotation.Validated;
|
||||||
|
@ -15,11 +14,11 @@ import javax.annotation.Resource;
|
||||||
public class DeviceDataApiImpl implements DeviceDataApi {
|
public class DeviceDataApiImpl implements DeviceDataApi {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private IotDeviceDataService iotDeviceDataService;
|
private IotDeviceDataService deviceDataService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void saveDeviceData(String productKey, String deviceName, String message) {
|
public void saveDeviceData(String productKey, String deviceName, String message) {
|
||||||
iotDeviceDataService.saveDeviceData(productKey, deviceName, message);
|
deviceDataService.saveDeviceData(productKey, deviceName, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -1 +1,6 @@
|
||||||
|
/**
|
||||||
|
* 占位
|
||||||
|
*
|
||||||
|
* TODO 芋艿:后续删除
|
||||||
|
*/
|
||||||
package cn.iocoder.yudao.module.iot.api;
|
package cn.iocoder.yudao.module.iot.api;
|
|
@ -1,6 +1,6 @@
|
||||||
package cn.iocoder.yudao.module.iot.framework.plugin;
|
package cn.iocoder.yudao.module.iot.framework.plugin;
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.api.DeviceDataApi;
|
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
|
||||||
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
|
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
|
@ -26,9 +26,10 @@ public class ServiceRegistryConfiguration {
|
||||||
/**
|
/**
|
||||||
* 定义一个标记用的 Bean,用于表示 ServiceRegistry 已初始化完成
|
* 定义一个标记用的 Bean,用于表示 ServiceRegistry 已初始化完成
|
||||||
*/
|
*/
|
||||||
@Bean("serviceRegistryInitializedMarker")
|
@Bean("serviceRegistryInitializedMarker") // TODO @haohao:1)这个名字,可以搞个 public static final 常量;2)是不是 conditionBefore 啥
|
||||||
public Object serviceRegistryInitializedMarker() {
|
public Object serviceRegistryInitializedMarker() {
|
||||||
// 返回任意对象即可,这里返回null都可以,但最好返回个实际对象
|
// 返回任意对象即可,这里返回 null 都可以,但最好返回个实际对象
|
||||||
return new Object();
|
return new Object();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -21,7 +21,7 @@ public interface IotDeviceDataService {
|
||||||
* @param productKey 产品 key
|
* @param productKey 产品 key
|
||||||
* @param deviceName 设备名称
|
* @param deviceName 设备名称
|
||||||
* @param message 消息
|
* @param message 消息
|
||||||
* <p>JSON 格式,参见 <a href="https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services?spm=a2c4g.11186623.0.0.3a3335aeUdzkz2#concept-mvc-4tw-y2b">...</a>
|
* <p>参见 <a href="https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services?spm=a2c4g.11186623.0.0.3a3335aeUdzkz2#concept-mvc-4tw-y2b">JSON 格式</a>
|
||||||
*/
|
*/
|
||||||
void saveDeviceData(String productKey, String deviceName, String message);
|
void saveDeviceData(String productKey, String deviceName, String message);
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import org.springframework.validation.annotation.Validated;
|
||||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||||
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.PLUGIN_INSTANCE_NOT_EXISTS;
|
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.PLUGIN_INSTANCE_NOT_EXISTS;
|
||||||
|
|
||||||
|
// TODO @haohao:可以搞个 plugin 包,然后把 plugininfo、plugininstance
|
||||||
/**
|
/**
|
||||||
* IoT 插件实例 Service 实现类
|
* IoT 插件实例 Service 实现类
|
||||||
*
|
*
|
||||||
|
|
|
@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.plugin;
|
||||||
|
|
||||||
import cn.hutool.json.JSONObject;
|
import cn.hutool.json.JSONObject;
|
||||||
import cn.hutool.json.JSONUtil;
|
import cn.hutool.json.JSONUtil;
|
||||||
import cn.iocoder.yudao.module.iot.api.DeviceDataApi;
|
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
@ -12,12 +12,9 @@ import io.netty.util.CharsetUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 基于 Netty 的 HTTP 处理器,用于接收设备上报的数据并调用主程序的 DeviceDataApi 接口进行处理。
|
* 基于 Netty 的 HTTP 处理器,用于接收设备上报的数据并调用主程序的 DeviceDataApi 接口进行处理。
|
||||||
* <p>
|
*
|
||||||
* 请求格式:
|
* 1. 请求格式:JSON 格式,地址为 POST /sys/{productKey}/{deviceName}/thing/event/property/post
|
||||||
* POST /sys/{productKey}/{deviceName}/thing/event/property/post
|
* 2. 返回结果:JSON 格式,包含统一的 code、data、id、message、method、version 字段
|
||||||
* 请求体为 JSON 格式数据。
|
|
||||||
* <p>
|
|
||||||
* 返回结果为 JSON 格式,包含统一的 code、data、id、message、method、version 字段。
|
|
||||||
*/
|
*/
|
||||||
public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
||||||
|
|
||||||
|
@ -29,10 +26,9 @@ public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
|
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
|
||||||
String uri = request.uri();
|
|
||||||
|
|
||||||
// 期望的路径格式: /sys/{productKey}/{deviceName}/thing/event/property/post
|
// 期望的路径格式: /sys/{productKey}/{deviceName}/thing/event/property/post
|
||||||
// 使用 "/" 拆分路径
|
// 使用 "/" 拆分路径
|
||||||
|
String uri = request.uri();
|
||||||
String[] parts = uri.split("/");
|
String[] parts = uri.split("/");
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -52,25 +48,19 @@ public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
||||||
&& "event".equals(parts[5])
|
&& "event".equals(parts[5])
|
||||||
&& "property".equals(parts[6])
|
&& "property".equals(parts[6])
|
||||||
&& "post".equals(parts[7]);
|
&& "post".equals(parts[7]);
|
||||||
|
|
||||||
if (!isCorrectPath) {
|
if (!isCorrectPath) {
|
||||||
// 如果路径不匹配,返回 404 错误
|
|
||||||
writeResponse(ctx, HttpResponseStatus.NOT_FOUND, "Not Found");
|
writeResponse(ctx, HttpResponseStatus.NOT_FOUND, "Not Found");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String productKey = parts[2];
|
String productKey = parts[2];
|
||||||
String deviceName = parts[3];
|
String deviceName = parts[3];
|
||||||
|
|
||||||
// 从请求中获取原始数据
|
// 从请求中获取原始数据,尝试解析请求数据为 JSON 对象
|
||||||
String requestBody = request.content().toString(CharsetUtil.UTF_8);
|
String requestBody = request.content().toString(CharsetUtil.UTF_8);
|
||||||
|
|
||||||
// 尝试解析请求数据为 JSON 对象
|
|
||||||
JSONObject jsonData;
|
JSONObject jsonData;
|
||||||
try {
|
try {
|
||||||
jsonData = JSONUtil.parseObj(requestBody);
|
jsonData = JSONUtil.parseObj(requestBody);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// 数据不是合法的 JSON 格式,返回 400 错误
|
|
||||||
JSONObject res = createResponseJson(
|
JSONObject res = createResponseJson(
|
||||||
400,
|
400,
|
||||||
new JSONObject(),
|
new JSONObject(),
|
||||||
|
@ -82,8 +72,6 @@ public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
||||||
writeResponse(ctx, HttpResponseStatus.BAD_REQUEST, res.toString());
|
writeResponse(ctx, HttpResponseStatus.BAD_REQUEST, res.toString());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取请求中的 id 字段,若不存在则为 null
|
|
||||||
String id = jsonData.getStr("id", null);
|
String id = jsonData.getStr("id", null);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -101,7 +89,6 @@ public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
||||||
);
|
);
|
||||||
writeResponse(ctx, HttpResponseStatus.OK, successRes.toString());
|
writeResponse(ctx, HttpResponseStatus.OK, successRes.toString());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// 保存数据过程中出现异常,返回 500 错误
|
|
||||||
JSONObject errorRes = createResponseJson(
|
JSONObject errorRes = createResponseJson(
|
||||||
500,
|
500,
|
||||||
new JSONObject(),
|
new JSONObject(),
|
||||||
|
@ -128,7 +115,7 @@ public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
||||||
private JSONObject createResponseJson(int code, JSONObject data, String id, String message, String method, String version) {
|
private JSONObject createResponseJson(int code, JSONObject data, String id, String message, String method, String version) {
|
||||||
JSONObject res = new JSONObject();
|
JSONObject res = new JSONObject();
|
||||||
res.set("code", code);
|
res.set("code", code);
|
||||||
res.set("data", data != null ? data : new JSONObject()); // 确保 data 不为 null
|
res.set("data", data != null ? data : new JSONObject());
|
||||||
res.set("id", id);
|
res.set("id", id);
|
||||||
res.set("message", message);
|
res.set("message", message);
|
||||||
res.set("method", method);
|
res.set("method", method);
|
||||||
|
@ -137,24 +124,24 @@ public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 向客户端返回 HTTP 响应的辅助方法。
|
* 向客户端返回 HTTP 响应的辅助方法
|
||||||
*
|
*
|
||||||
* @param ctx 通道上下文
|
* @param ctx 通道上下文
|
||||||
* @param status HTTP 响应状态码(网络层面的)
|
* @param status HTTP 响应状态码(网络层面的)
|
||||||
* @param content 响应内容(JSON 字符串或其他文本)
|
* @param content 响应内容(JSON 字符串或其他文本)
|
||||||
*/
|
*/
|
||||||
private void writeResponse(ChannelHandlerContext ctx, HttpResponseStatus status, String content) {
|
private void writeResponse(ChannelHandlerContext ctx, HttpResponseStatus status, String content) {
|
||||||
|
// 设置响应头为 JSON 类型和正确的编码
|
||||||
FullHttpResponse response = new DefaultFullHttpResponse(
|
FullHttpResponse response = new DefaultFullHttpResponse(
|
||||||
HttpVersion.HTTP_1_1,
|
HttpVersion.HTTP_1_1,
|
||||||
status,
|
status,
|
||||||
Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)
|
Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)
|
||||||
);
|
);
|
||||||
|
|
||||||
// 设置响应头为 JSON 类型和正确的编码
|
|
||||||
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
|
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
|
||||||
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
|
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
|
||||||
|
|
||||||
// 发送响应并在发送完成后关闭连接
|
// 发送响应并在发送完成后关闭连接
|
||||||
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
|
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,6 +1,6 @@
|
||||||
package cn.iocoder.yudao.module.iot.plugin;
|
package cn.iocoder.yudao.module.iot.plugin;
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.api.DeviceDataApi;
|
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
|
||||||
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
|
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
import io.netty.channel.*;
|
import io.netty.channel.*;
|
||||||
|
@ -18,12 +18,14 @@ import java.util.concurrent.Executors;
|
||||||
public class HttpPlugin extends Plugin {
|
public class HttpPlugin extends Plugin {
|
||||||
|
|
||||||
private static final int PORT = 8092;
|
private static final int PORT = 8092;
|
||||||
|
|
||||||
private final ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
private DeviceDataApi deviceDataApi; // 用于保存设备数据的 API
|
private DeviceDataApi deviceDataApi;
|
||||||
|
|
||||||
public HttpPlugin(PluginWrapper wrapper) {
|
public HttpPlugin(PluginWrapper wrapper) {
|
||||||
super(wrapper);
|
super(wrapper);
|
||||||
this.executorService = Executors.newSingleThreadExecutor(); // 创建单线程池
|
// 创建单线程池
|
||||||
|
this.executorService = Executors.newSingleThreadExecutor();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -44,10 +46,13 @@ public class HttpPlugin extends Plugin {
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
log.info("HttpPlugin.stop()");
|
log.info("HttpPlugin.stop()");
|
||||||
executorService.shutdownNow(); // 停止线程池
|
// 停止线程池
|
||||||
|
executorService.shutdownNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
// 启动 HTTP 服务
|
/**
|
||||||
|
* 启动 HTTP 服务
|
||||||
|
*/
|
||||||
private void startHttpServer() {
|
private void startHttpServer() {
|
||||||
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
||||||
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||||
|
@ -56,7 +61,8 @@ public class HttpPlugin extends Plugin {
|
||||||
ServerBootstrap bootstrap = new ServerBootstrap();
|
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||||
bootstrap.group(bossGroup, workerGroup)
|
bootstrap.group(bossGroup, workerGroup)
|
||||||
.channel(NioServerSocketChannel.class)
|
.channel(NioServerSocketChannel.class)
|
||||||
.childHandler(new ChannelInitializer<Channel>() {
|
.childHandler(new ChannelInitializer<>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initChannel(Channel channel) {
|
protected void initChannel(Channel channel) {
|
||||||
channel.pipeline().addLast(new HttpServerCodec());
|
channel.pipeline().addLast(new HttpServerCodec());
|
||||||
|
@ -64,6 +70,7 @@ public class HttpPlugin extends Plugin {
|
||||||
// 将从 ServiceRegistry 获取的 deviceDataApi 传入处理器
|
// 将从 ServiceRegistry 获取的 deviceDataApi 传入处理器
|
||||||
channel.pipeline().addLast(new HttpHandler(deviceDataApi));
|
channel.pipeline().addLast(new HttpHandler(deviceDataApi));
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// 绑定端口并启动服务器
|
// 绑定端口并启动服务器
|
||||||
|
@ -78,4 +85,5 @@ public class HttpPlugin extends Plugin {
|
||||||
workerGroup.shutdownGracefully();
|
workerGroup.shutdownGracefully();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue