Merge branch 'feature/iot' of https://gitee.com/zhijiantianya/ruoyi-vue-pro into feature/iot
This commit is contained in:
commit
0cb148a33d
Binary file not shown.
|
@ -67,7 +67,7 @@
|
||||||
<bizlog-sdk.version>3.0.6</bizlog-sdk.version>
|
<bizlog-sdk.version>3.0.6</bizlog-sdk.version>
|
||||||
<mqtt.version>1.2.5</mqtt.version>
|
<mqtt.version>1.2.5</mqtt.version>
|
||||||
<pf4j-spring.version>0.9.0</pf4j-spring.version>
|
<pf4j-spring.version>0.9.0</pf4j-spring.version>
|
||||||
<vertx.version>4.4.0</vertx.version>
|
<vertx.version>4.5.11</vertx.version>
|
||||||
<!-- 三方云服务相关 -->
|
<!-- 三方云服务相关 -->
|
||||||
<okio.version>3.5.0</okio.version>
|
<okio.version>3.5.0</okio.version>
|
||||||
<okhttp3.version>4.11.0</okhttp3.version>
|
<okhttp3.version>4.11.0</okhttp3.version>
|
||||||
|
@ -626,6 +626,12 @@
|
||||||
<artifactId>vertx-web</artifactId>
|
<artifactId>vertx-web</artifactId>
|
||||||
<version>${vertx.version}</version>
|
<version>${vertx.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<!-- Vert.x MQTT 模块 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.vertx</groupId>
|
||||||
|
<artifactId>vertx-mqtt</artifactId>
|
||||||
|
<version>${vertx.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|
|
@ -1,37 +0,0 @@
|
||||||
package cn.iocoder.yudao.module.iot.api;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
// TODO 芋艿:纠结下
|
|
||||||
/**
|
|
||||||
* 服务注册表 - 插架模块使用,无法使用 Spring 注入
|
|
||||||
*/
|
|
||||||
public class ServiceRegistry {
|
|
||||||
|
|
||||||
private static final Map<Class<?>, Object> services = new HashMap<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 注册服务
|
|
||||||
*
|
|
||||||
* @param serviceClass 服务类
|
|
||||||
* @param serviceImpl 服务实现
|
|
||||||
* @param <T> 服务类
|
|
||||||
*/
|
|
||||||
public static <T> void registerService(Class<T> serviceClass, T serviceImpl) {
|
|
||||||
services.put(serviceClass, serviceImpl);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获得服务
|
|
||||||
*
|
|
||||||
* @param serviceClass 服务类
|
|
||||||
* @param <T> 服务类
|
|
||||||
* @return 服务实现
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public static <T> T getService(Class<T> serviceClass) {
|
|
||||||
return (T) services.get(serviceClass);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,39 +0,0 @@
|
||||||
package cn.iocoder.yudao.module.iot.mqttrpc.common;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
// TODO @芋艿:要不要加个 mqtt 值了的前缀
|
|
||||||
/**
|
|
||||||
* MQTT RPC 请求
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@Builder
|
|
||||||
@NoArgsConstructor
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class RpcRequest {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 方法名
|
|
||||||
*/
|
|
||||||
private String method;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 参数
|
|
||||||
*/
|
|
||||||
// TODO @haohao:object 对象会不会不好序列化?
|
|
||||||
private Object[] params;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 关联 ID
|
|
||||||
*/
|
|
||||||
private String correlationId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 回复地址
|
|
||||||
*/
|
|
||||||
private String replyTo;
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,33 +0,0 @@
|
||||||
package cn.iocoder.yudao.module.iot.mqttrpc.common;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT RPC 响应
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@Builder
|
|
||||||
@NoArgsConstructor
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class RpcResponse {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 关联 ID
|
|
||||||
*/
|
|
||||||
private String correlationId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 结果
|
|
||||||
*/
|
|
||||||
// TODO @haohao:object 对象会不会不好反序列化?
|
|
||||||
private Object result;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 错误
|
|
||||||
*/
|
|
||||||
private String error;
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,19 +0,0 @@
|
||||||
package cn.iocoder.yudao.module.iot.mqttrpc.common;
|
|
||||||
|
|
||||||
import cn.hutool.json.JSONUtil;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 序列化工具类
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class SerializationUtils {
|
|
||||||
|
|
||||||
public static String serialize(Object obj) {
|
|
||||||
return JSONUtil.toJsonStr(obj);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> T deserialize(String json, Class<T> clazz) {
|
|
||||||
return JSONUtil.toBean(json, clazz);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,22 +1,20 @@
|
||||||
package cn.iocoder.yudao.module.iot.framework.plugin;
|
package cn.iocoder.yudao.module.iot.framework.plugin;
|
||||||
|
|
||||||
import java.util.List;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||||
import javax.annotation.Resource;
|
import cn.iocoder.yudao.module.iot.dal.dataobject.plugininfo.PluginInfoDO;
|
||||||
|
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
|
||||||
|
import cn.iocoder.yudao.module.iot.service.plugin.PluginInfoService;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.pf4j.spring.SpringPluginManager;
|
import org.pf4j.spring.SpringPluginManager;
|
||||||
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationArguments;
|
||||||
import org.springframework.boot.ApplicationRunner;
|
import org.springframework.boot.ApplicationRunner;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import javax.annotation.Resource;
|
||||||
|
import java.util.List;
|
||||||
import cn.iocoder.yudao.module.iot.service.plugin.PluginInfoService;
|
|
||||||
import cn.hutool.core.collection.CollUtil;
|
|
||||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
|
||||||
import cn.iocoder.yudao.module.iot.dal.dataobject.plugininfo.PluginInfoDO;
|
|
||||||
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
|
|
||||||
|
|
||||||
|
// TODO @芋艿:需要 review 下
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class PluginStart implements ApplicationRunner {
|
public class PluginStart implements ApplicationRunner {
|
||||||
|
|
|
@ -1,42 +1,35 @@
|
||||||
package cn.iocoder.yudao.module.iot.framework.plugin;
|
package cn.iocoder.yudao.module.iot.framework.plugin;
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
|
|
||||||
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
|
|
||||||
import cn.iocoder.yudao.module.iot.framework.plugin.listener.CustomPluginStateListener;
|
import cn.iocoder.yudao.module.iot.framework.plugin.listener.CustomPluginStateListener;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.pf4j.spring.SpringPluginManager;
|
import org.pf4j.spring.SpringPluginManager;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.DependsOn;
|
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
|
// TODO @芋艿:需要 review 下
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Configuration
|
@Configuration
|
||||||
public class UnifiedConfiguration {
|
public class UnifiedConfiguration {
|
||||||
|
|
||||||
private static final String SERVICE_REGISTRY_INITIALIZED_MARKER = "serviceRegistryInitializedMarker";
|
@Value("${pf4j.pluginsDir:pluginsDir}")
|
||||||
|
private String pluginsDir;
|
||||||
@Resource
|
|
||||||
private DeviceDataApi deviceDataApi;
|
|
||||||
|
|
||||||
@Bean(SERVICE_REGISTRY_INITIALIZED_MARKER)
|
|
||||||
public Object serviceRegistryInitializedMarker() {
|
|
||||||
ServiceRegistry.registerService(DeviceDataApi.class, deviceDataApi);
|
|
||||||
log.info("[init][将 DeviceDataApi 实例注册到 ServiceRegistry 中]");
|
|
||||||
return new Object();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@DependsOn(SERVICE_REGISTRY_INITIALIZED_MARKER)
|
// @DependsOn("deviceDataApiImpl")
|
||||||
public SpringPluginManager pluginManager() {
|
public SpringPluginManager pluginManager() {
|
||||||
log.info("[init][实例化 SpringPluginManager]");
|
log.info("[init][实例化 SpringPluginManager]");
|
||||||
SpringPluginManager springPluginManager = new SpringPluginManager() {
|
SpringPluginManager springPluginManager = new SpringPluginManager(Paths.get(pluginsDir)) {
|
||||||
|
// SpringPluginManager springPluginManager = new SpringPluginManager() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startPlugins() {
|
public void startPlugins() {
|
||||||
// 禁用插件启动,避免插件启动时,启动所有插件
|
// 禁用插件启动,避免插件启动时,启动所有插件
|
||||||
log.info("[init][禁用默认启动所有插件]");
|
log.info("[init][禁用默认启动所有插件]");
|
||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
springPluginManager.addPluginStateListener(new CustomPluginStateListener());
|
springPluginManager.addPluginStateListener(new CustomPluginStateListener());
|
||||||
return springPluginManager;
|
return springPluginManager;
|
||||||
|
|
|
@ -5,6 +5,7 @@ import org.pf4j.PluginStateEvent;
|
||||||
import org.pf4j.PluginStateListener;
|
import org.pf4j.PluginStateListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
// TODO @芋艿:需要 review 下
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class CustomPluginStateListener implements PluginStateListener {
|
public class CustomPluginStateListener implements PluginStateListener {
|
||||||
|
|
|
@ -1,40 +0,0 @@
|
||||||
package cn.iocoder.yudao.module.iot.mqttrpc.config;
|
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@Configuration
|
|
||||||
@ConfigurationProperties(prefix = "mqtt")
|
|
||||||
public class MqttConfig {
|
|
||||||
/**
|
|
||||||
* MQTT 代理地址
|
|
||||||
*/
|
|
||||||
private String broker;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 用户名
|
|
||||||
*/
|
|
||||||
private String username;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 密码
|
|
||||||
*/
|
|
||||||
private String password;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 客户端 ID
|
|
||||||
*/
|
|
||||||
private String clientId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 请求主题
|
|
||||||
*/
|
|
||||||
private String requestTopic;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 响应主题前缀
|
|
||||||
*/
|
|
||||||
private String responseTopicPrefix;
|
|
||||||
}
|
|
|
@ -1,100 +0,0 @@
|
||||||
package cn.iocoder.yudao.module.iot.mqttrpc.server;
|
|
||||||
|
|
||||||
import cn.hutool.core.lang.UUID;
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest;
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse;
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils;
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.eclipse.paho.client.mqttv3.*;
|
|
||||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
import javax.annotation.PreDestroy;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
// TODO @芋艿:server 逻辑,再瞅瞅;
|
|
||||||
// TODO @haohao:如果只写在 iot biz 里,那么后续 server => client 貌似不方便?微信再讨论下~;
|
|
||||||
@Service
|
|
||||||
@Slf4j
|
|
||||||
public class RpcServer {
|
|
||||||
|
|
||||||
private final MqttConfig mqttConfig;
|
|
||||||
private final MqttClient mqttClient;
|
|
||||||
private final Map<String, MethodInvoker> methodRegistry = new HashMap<>();
|
|
||||||
|
|
||||||
public RpcServer(MqttConfig mqttConfig) throws MqttException {
|
|
||||||
this.mqttConfig = mqttConfig;
|
|
||||||
this.mqttClient = new MqttClient(mqttConfig.getBroker(), "rpc-server-" + UUID.randomUUID(), new MemoryPersistence());
|
|
||||||
MqttConnectOptions options = new MqttConnectOptions();
|
|
||||||
options.setAutomaticReconnect(true);
|
|
||||||
options.setCleanSession(true);
|
|
||||||
options.setUserName(mqttConfig.getUsername());
|
|
||||||
options.setPassword(mqttConfig.getPassword().toCharArray());
|
|
||||||
this.mqttClient.connect(options);
|
|
||||||
}
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init() throws MqttException {
|
|
||||||
mqttClient.subscribe(mqttConfig.getRequestTopic(), this::handleRequest);
|
|
||||||
log.info("RPC Server subscribed to topic: {}", mqttConfig.getRequestTopic());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleRequest(String topic, MqttMessage message) {
|
|
||||||
RpcRequest request = SerializationUtils.deserialize(new String(message.getPayload()), RpcRequest.class);
|
|
||||||
RpcResponse response = new RpcResponse();
|
|
||||||
response.setCorrelationId(request.getCorrelationId());
|
|
||||||
|
|
||||||
try {
|
|
||||||
MethodInvoker invoker = methodRegistry.get(request.getMethod());
|
|
||||||
if (invoker == null) {
|
|
||||||
throw new NoSuchMethodException("Unknown method: " + request.getMethod());
|
|
||||||
}
|
|
||||||
Object result = invoker.invoke(request.getParams());
|
|
||||||
response.setResult(result);
|
|
||||||
} catch (Exception e) {
|
|
||||||
response.setError(e.getMessage());
|
|
||||||
log.error("Error processing RPC request: {}", e.getMessage(), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
String replyPayload = SerializationUtils.serialize(response);
|
|
||||||
MqttMessage replyMessage = new MqttMessage(replyPayload.getBytes());
|
|
||||||
replyMessage.setQos(1);
|
|
||||||
try {
|
|
||||||
mqttClient.publish(request.getReplyTo(), replyMessage);
|
|
||||||
log.info("Published response to {}", request.getReplyTo());
|
|
||||||
} catch (MqttException e) {
|
|
||||||
log.error("Failed to publish response: {}", e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 注册可调用的方法
|
|
||||||
*
|
|
||||||
* @param methodName 方法名称
|
|
||||||
* @param invoker 方法调用器
|
|
||||||
*/
|
|
||||||
public void registerMethod(String methodName, MethodInvoker invoker) {
|
|
||||||
methodRegistry.put(methodName, invoker);
|
|
||||||
log.info("Registered method: {}", methodName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@PreDestroy
|
|
||||||
public void cleanup() throws MqttException {
|
|
||||||
mqttClient.disconnect();
|
|
||||||
log.info("RPC Server disconnected");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 方法调用器接口
|
|
||||||
*/
|
|
||||||
@FunctionalInterface
|
|
||||||
public interface MethodInvoker {
|
|
||||||
|
|
||||||
Object invoke(Object[] params) throws Exception;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,43 +0,0 @@
|
||||||
package cn.iocoder.yudao.module.iot.service.plugin;
|
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.server.RpcServer;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class ExampleService {
|
|
||||||
|
|
||||||
private final RpcServer rpcServer;
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void registerMethods() {
|
|
||||||
rpcServer.registerMethod("add", params -> {
|
|
||||||
if (params.length != 2) {
|
|
||||||
throw new IllegalArgumentException("add方法需要两个参数");
|
|
||||||
}
|
|
||||||
int a = ((Number) params[0]).intValue();
|
|
||||||
int b = ((Number) params[1]).intValue();
|
|
||||||
return add(a, b);
|
|
||||||
});
|
|
||||||
|
|
||||||
rpcServer.registerMethod("concat", params -> {
|
|
||||||
if (params.length != 2) {
|
|
||||||
throw new IllegalArgumentException("concat方法需要两个参数");
|
|
||||||
}
|
|
||||||
String str1 = params[0].toString();
|
|
||||||
String str2 = params[1].toString();
|
|
||||||
return concat(str1, str2);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private int add(int a, int b) {
|
|
||||||
return a + b;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String concat(String a, String b) {
|
|
||||||
return a + b;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
plugin.id=emqx-plugin
|
||||||
|
plugin.class=cn.iocoder.yudao.module.iot.plugin.EmqxPlugin
|
||||||
|
plugin.version=0.0.1
|
||||||
|
plugin.provider=ahh
|
||||||
|
plugin.dependencies=
|
||||||
|
plugin.description=emqx-plugin-0.0.1
|
|
@ -0,0 +1,164 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="
|
||||||
|
http://maven.apache.org/POM/4.0.0
|
||||||
|
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>yudao-module-iot-plugin</artifactId>
|
||||||
|
<groupId>cn.iocoder.boot</groupId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<artifactId>yudao-module-iot-emqx-plugin</artifactId>
|
||||||
|
|
||||||
|
<name>${project.artifactId}</name>
|
||||||
|
<description>
|
||||||
|
物联网 插件模块 - emqx 插件
|
||||||
|
</description>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<!-- 插件相关 -->
|
||||||
|
<plugin.id>emqx-plugin</plugin.id>
|
||||||
|
<plugin.class>cn.iocoder.yudao.module.iot.plugin.EmqxPlugin</plugin.class>
|
||||||
|
<plugin.version>0.0.1</plugin.version>
|
||||||
|
<plugin.provider>ahh</plugin.provider>
|
||||||
|
<plugin.description>emqx-plugin-0.0.1</plugin.description>
|
||||||
|
<plugin.dependencies/>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<!-- DOESN'T WORK WITH MAVEN 3 (I defined the plugin metadata in properties section)
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.codehaus.mojo</groupId>
|
||||||
|
<artifactId>properties-maven-plugin</artifactId>
|
||||||
|
<version>1.0-alpha-2</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<phase>initialize</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>read-project-properties</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<files>
|
||||||
|
<file>plugin.properties</file>
|
||||||
|
</files>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
-->
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-antrun-plugin</artifactId>
|
||||||
|
<version>1.6</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>unzip jar file</id>
|
||||||
|
<phase>package</phase>
|
||||||
|
<configuration>
|
||||||
|
<target>
|
||||||
|
<unzip src="target/${project.artifactId}-${project.version}.${project.packaging}"
|
||||||
|
dest="target/plugin-classes"/>
|
||||||
|
</target>
|
||||||
|
</configuration>
|
||||||
|
<goals>
|
||||||
|
<goal>run</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-assembly-plugin</artifactId>
|
||||||
|
<version>2.3</version>
|
||||||
|
<configuration>
|
||||||
|
<descriptors>
|
||||||
|
<descriptor>
|
||||||
|
src/main/assembly/assembly.xml
|
||||||
|
</descriptor>
|
||||||
|
</descriptors>
|
||||||
|
<appendAssemblyId>false</appendAssemblyId>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>make-assembly</id>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>attached</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-jar-plugin</artifactId>
|
||||||
|
<version>2.4</version>
|
||||||
|
<configuration>
|
||||||
|
<archive>
|
||||||
|
<manifestEntries>
|
||||||
|
<Plugin-Id>${plugin.id}</Plugin-Id>
|
||||||
|
<Plugin-Class>${plugin.class}</Plugin-Class>
|
||||||
|
<Plugin-Version>${plugin.version}</Plugin-Version>
|
||||||
|
<Plugin-Provider>${plugin.provider}</Plugin-Provider>
|
||||||
|
<Plugin-Description>${plugin.description}</Plugin-Description>
|
||||||
|
<Plugin-Dependencies>${plugin.dependencies}</Plugin-Dependencies>
|
||||||
|
</manifestEntries>
|
||||||
|
</archive>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-deploy-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<skip>true</skip>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<!-- 其他依赖项 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-web</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<!-- PF4J Spring 集成 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.pf4j</groupId>
|
||||||
|
<artifactId>pf4j-spring</artifactId>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<!-- 项目依赖 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>cn.iocoder.boot</groupId>
|
||||||
|
<artifactId>yudao-module-iot-api</artifactId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>${lombok.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<!-- Vert.x 核心依赖 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.vertx</groupId>
|
||||||
|
<artifactId>vertx-core</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<!-- Vert.x Web 模块 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.vertx</groupId>
|
||||||
|
<artifactId>vertx-web</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<!-- MQTT -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.paho</groupId>
|
||||||
|
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,31 @@
|
||||||
|
<assembly>
|
||||||
|
<id>plugin</id>
|
||||||
|
<formats>
|
||||||
|
<format>zip</format>
|
||||||
|
</formats>
|
||||||
|
<includeBaseDirectory>false</includeBaseDirectory>
|
||||||
|
<dependencySets>
|
||||||
|
<dependencySet>
|
||||||
|
<useProjectArtifact>false</useProjectArtifact>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
<outputDirectory>lib</outputDirectory>
|
||||||
|
<includes>
|
||||||
|
<include>*:jar:*</include>
|
||||||
|
</includes>
|
||||||
|
</dependencySet>
|
||||||
|
</dependencySets>
|
||||||
|
<!--
|
||||||
|
<fileSets>
|
||||||
|
<fileSet>
|
||||||
|
<directory>target/classes</directory>
|
||||||
|
<outputDirectory>classes</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
</fileSets>
|
||||||
|
-->
|
||||||
|
<fileSets>
|
||||||
|
<fileSet>
|
||||||
|
<directory>target/plugin-classes</directory>
|
||||||
|
<outputDirectory>classes</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
</fileSets>
|
||||||
|
</assembly>
|
|
@ -0,0 +1,45 @@
|
||||||
|
package cn.iocoder.yudao.module.iot.plugin;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
|
||||||
|
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.pf4j.Plugin;
|
||||||
|
import org.pf4j.PluginWrapper;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class EmqxPlugin extends Plugin {
|
||||||
|
|
||||||
|
private ExecutorService executorService;
|
||||||
|
@Resource
|
||||||
|
private DeviceDataApi deviceDataApi;
|
||||||
|
|
||||||
|
public EmqxPlugin(PluginWrapper wrapper) {
|
||||||
|
super(wrapper);
|
||||||
|
this.executorService = Executors.newSingleThreadExecutor();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
log.info("EmqxPlugin.start()");
|
||||||
|
|
||||||
|
if (executorService.isShutdown() || executorService.isTerminated()) {
|
||||||
|
executorService = Executors.newSingleThreadExecutor();
|
||||||
|
}
|
||||||
|
|
||||||
|
deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class);
|
||||||
|
if (deviceDataApi == null) {
|
||||||
|
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
log.info("EmqxPlugin.stop()");
|
||||||
|
}
|
||||||
|
}
|
|
@ -127,7 +127,7 @@
|
||||||
<!-- 其他依赖项 -->
|
<!-- 其他依赖项 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-web</artifactId>
|
<artifactId>spring-boot-starter</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- PF4J Spring 集成 -->
|
<!-- PF4J Spring 集成 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -147,20 +147,11 @@
|
||||||
<version>${lombok.version}</version>
|
<version>${lombok.version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- Vert.x 核心依赖 -->
|
<!-- Vert.x Web -->
|
||||||
<dependency>
|
|
||||||
<groupId>io.vertx</groupId>
|
|
||||||
<artifactId>vertx-core</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<!-- Vert.x Web 模块 -->
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.vertx</groupId>
|
<groupId>io.vertx</groupId>
|
||||||
<artifactId>vertx-web</artifactId>
|
<artifactId>vertx-web</artifactId>
|
||||||
</dependency>
|
<version>4.5.11</version>
|
||||||
<!-- MQTT -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.eclipse.paho</groupId>
|
|
||||||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
|
@ -5,7 +5,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
public class HttpPluginSpringbootApplication {
|
public class HttpPluginSpringbootApplication {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
SpringApplication.run(HttpPluginSpringbootApplication.class, args);
|
SpringApplication.run(HttpPluginSpringbootApplication.class, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
package cn.iocoder.yudao.module.iot.config;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
|
||||||
|
import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO;
|
||||||
|
import cn.iocoder.yudao.module.iot.plugin.HttpVertxPlugin;
|
||||||
|
import org.pf4j.DefaultPluginManager;
|
||||||
|
import org.pf4j.PluginWrapper;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
// TODO 芋艿:临时实现;
|
||||||
|
@Configuration
|
||||||
|
public class TestConfiguration {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public DeviceDataApi deviceDataApi() {
|
||||||
|
return new DeviceDataApi() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void saveDeviceData(DeviceDataCreateReqDTO createDTO) {
|
||||||
|
System.out.println("saveDeviceData");
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO @haohao:可能要看下,有没更好的方式
|
||||||
|
@Bean(initMethod = "start")
|
||||||
|
public HttpVertxPlugin HttpVertxPlugin() {
|
||||||
|
PluginWrapper pluginWrapper = new PluginWrapper(new DefaultPluginManager(), null, null, null);
|
||||||
|
return new HttpVertxPlugin(pluginWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,38 +0,0 @@
|
||||||
|
|
||||||
package cn.iocoder.yudao.module.iot.controller;
|
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.client.RpcClient;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import org.springframework.web.bind.annotation.PostMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RequestParam;
|
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
|
|
||||||
// TODO 芋艿:后续 review 下
|
|
||||||
/**
|
|
||||||
* 插件实例 RPC 接口
|
|
||||||
*
|
|
||||||
* @author 芋道源码
|
|
||||||
*/
|
|
||||||
@RestController
|
|
||||||
@RequestMapping("/rpc")
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class RpcController {
|
|
||||||
|
|
||||||
@Resource
|
|
||||||
private RpcClient rpcClient;
|
|
||||||
|
|
||||||
@PostMapping("/add")
|
|
||||||
public CompletableFuture<Object> add(@RequestParam int a, @RequestParam int b) throws Exception {
|
|
||||||
return rpcClient.call("add", new Object[]{a, b}, 10);
|
|
||||||
}
|
|
||||||
|
|
||||||
@PostMapping("/concat")
|
|
||||||
public CompletableFuture<Object> concat(@RequestParam String str1, @RequestParam String str2) throws Exception {
|
|
||||||
return rpcClient.call("concat", new Object[]{str1, str2}, 10);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,93 +0,0 @@
|
||||||
package cn.iocoder.yudao.module.iot.mqttrpc.client;
|
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest;
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse;
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils;
|
|
||||||
import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
||||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
import javax.annotation.PreDestroy;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.*;
|
|
||||||
|
|
||||||
// TODO @芋艿:需要考虑,怎么公用!
|
|
||||||
@Service
|
|
||||||
@Slf4j
|
|
||||||
public class RpcClient {
|
|
||||||
|
|
||||||
private final MqttConfig mqttConfig;
|
|
||||||
private final MqttClient mqttClient;
|
|
||||||
private final ConcurrentMap<String, CompletableFuture<RpcResponse>> pendingRequests = new ConcurrentHashMap<>();
|
|
||||||
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
|
||||||
|
|
||||||
public RpcClient(MqttConfig mqttConfig) throws MqttException {
|
|
||||||
this.mqttConfig = mqttConfig;
|
|
||||||
this.mqttClient = new MqttClient(mqttConfig.getBroker(), mqttConfig.getClientId(), new MemoryPersistence());
|
|
||||||
MqttConnectOptions options = new MqttConnectOptions();
|
|
||||||
options.setAutomaticReconnect(true);
|
|
||||||
options.setCleanSession(true);
|
|
||||||
options.setUserName(mqttConfig.getUsername());
|
|
||||||
options.setPassword(mqttConfig.getPassword().toCharArray());
|
|
||||||
this.mqttClient.connect(options);
|
|
||||||
}
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init() throws MqttException {
|
|
||||||
mqttClient.subscribe(mqttConfig.getResponseTopicPrefix() + "#", this::handleResponse);
|
|
||||||
log.info("RPC Client subscribed to topics: {}", mqttConfig.getResponseTopicPrefix() + "#");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleResponse(String topic, MqttMessage message) {
|
|
||||||
String correlationId = topic.substring(mqttConfig.getResponseTopicPrefix().length());
|
|
||||||
RpcResponse response = SerializationUtils.deserialize(new String(message.getPayload()), RpcResponse.class);
|
|
||||||
CompletableFuture<RpcResponse> future = pendingRequests.remove(correlationId);
|
|
||||||
if (future != null) {
|
|
||||||
if (response.getError() != null) {
|
|
||||||
future.completeExceptionally(new RuntimeException(response.getError()));
|
|
||||||
} else {
|
|
||||||
future.complete(response);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.warn("Received response for unknown correlationId: {}", correlationId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public CompletableFuture<Object> call(String method, Object[] params, int timeoutSeconds) throws MqttException {
|
|
||||||
String correlationId = UUID.randomUUID().toString();
|
|
||||||
String replyTo = mqttConfig.getResponseTopicPrefix() + correlationId;
|
|
||||||
|
|
||||||
RpcRequest request = new RpcRequest(method, params, correlationId, replyTo);
|
|
||||||
String payload = SerializationUtils.serialize(request);
|
|
||||||
MqttMessage message = new MqttMessage(payload.getBytes());
|
|
||||||
message.setQos(1);
|
|
||||||
mqttClient.publish(mqttConfig.getRequestTopic(), message);
|
|
||||||
|
|
||||||
CompletableFuture<RpcResponse> futureResponse = new CompletableFuture<>();
|
|
||||||
pendingRequests.put(correlationId, futureResponse);
|
|
||||||
|
|
||||||
// 设置超时
|
|
||||||
scheduler.schedule(() -> {
|
|
||||||
CompletableFuture<RpcResponse> removed = pendingRequests.remove(correlationId);
|
|
||||||
if (removed != null) {
|
|
||||||
removed.completeExceptionally(new TimeoutException("RPC call timed out"));
|
|
||||||
}
|
|
||||||
}, timeoutSeconds, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
// 返回最终的结果
|
|
||||||
return futureResponse.thenApply(RpcResponse::getResult);
|
|
||||||
}
|
|
||||||
|
|
||||||
@PreDestroy
|
|
||||||
public void cleanup() throws MqttException {
|
|
||||||
mqttClient.disconnect();
|
|
||||||
scheduler.shutdown();
|
|
||||||
log.info("RPC Client disconnected");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,41 +0,0 @@
|
||||||
package cn.iocoder.yudao.module.iot.mqttrpc.config;
|
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@Configuration
|
|
||||||
@ConfigurationProperties(prefix = "mqtt")
|
|
||||||
public class MqttConfig {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 代理地址
|
|
||||||
*/
|
|
||||||
private String broker;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 用户名
|
|
||||||
*/
|
|
||||||
private String username;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 密码
|
|
||||||
*/
|
|
||||||
private String password;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 客户端 ID
|
|
||||||
*/
|
|
||||||
private String clientId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 请求主题
|
|
||||||
*/
|
|
||||||
private String requestTopic;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT 响应主题前缀
|
|
||||||
*/
|
|
||||||
private String responseTopicPrefix;
|
|
||||||
}
|
|
|
@ -1,22 +1,22 @@
|
||||||
package cn.iocoder.yudao.module.iot.plugin;
|
package cn.iocoder.yudao.module.iot.plugin;
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
|
import cn.hutool.extra.spring.SpringUtil;
|
||||||
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
|
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
|
||||||
import io.vertx.core.Vertx;
|
import io.vertx.core.Vertx;
|
||||||
import io.vertx.ext.web.Router;
|
import io.vertx.ext.web.Router;
|
||||||
import io.vertx.ext.web.handler.BodyHandler;
|
import io.vertx.ext.web.handler.BodyHandler;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.pf4j.PluginWrapper;
|
import org.pf4j.PluginWrapper;
|
||||||
import org.pf4j.spring.SpringPlugin;
|
import org.pf4j.spring.SpringPlugin;
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class HttpVertxPlugin extends SpringPlugin {
|
public class HttpVertxPlugin extends SpringPlugin {
|
||||||
|
|
||||||
private static final int PORT = 8092;
|
private static final int PORT = 8092;
|
||||||
private Vertx vertx;
|
private Vertx vertx;
|
||||||
|
|
||||||
private DeviceDataApi deviceDataApi;
|
private DeviceDataApi deviceDataApi;
|
||||||
|
|
||||||
public HttpVertxPlugin(PluginWrapper wrapper) {
|
public HttpVertxPlugin(PluginWrapper wrapper) {
|
||||||
|
@ -28,7 +28,7 @@ public class HttpVertxPlugin extends SpringPlugin {
|
||||||
log.info("HttpVertxPlugin.start()");
|
log.info("HttpVertxPlugin.start()");
|
||||||
|
|
||||||
// 获取 DeviceDataApi 实例
|
// 获取 DeviceDataApi 实例
|
||||||
deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class);
|
deviceDataApi = SpringUtil.getBean(DeviceDataApi.class);
|
||||||
if (deviceDataApi == null) {
|
if (deviceDataApi == null) {
|
||||||
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
|
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -1,6 +1,3 @@
|
||||||
server:
|
|
||||||
port: 8092
|
|
||||||
|
|
||||||
spring:
|
spring:
|
||||||
application:
|
application:
|
||||||
name: yudao-module-iot-http-plugin
|
name: yudao-module-iot-http-plugin
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
plugin.id=mqtt-plugin
|
plugin.id=mqtt-plugin
|
||||||
|
plugin.description=Vert.x MQTT plugin
|
||||||
plugin.class=cn.iocoder.yudao.module.iot.plugin.MqttPlugin
|
plugin.class=cn.iocoder.yudao.module.iot.plugin.MqttPlugin
|
||||||
plugin.version=0.0.1
|
plugin.version=1.0.0
|
||||||
|
plugin.requires=
|
||||||
plugin.provider=ahh
|
plugin.provider=ahh
|
||||||
plugin.dependencies=
|
plugin.license=Apache-2.0
|
||||||
plugin.description=mqtt-plugin-0.0.1
|
|
||||||
|
|
|
@ -145,10 +145,11 @@
|
||||||
<version>${lombok.version}</version>
|
<version>${lombok.version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- MQTT -->
|
<!-- Vert.x MQTT -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.eclipse.paho</groupId>
|
<groupId>io.vertx</groupId>
|
||||||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
<artifactId>vertx-mqtt</artifactId>
|
||||||
|
<version>4.5.11</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
|
@ -1,45 +1,36 @@
|
||||||
package cn.iocoder.yudao.module.iot.plugin;
|
package cn.iocoder.yudao.module.iot.plugin;
|
||||||
|
|
||||||
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
|
|
||||||
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.pf4j.Plugin;
|
import org.pf4j.Plugin;
|
||||||
import org.pf4j.PluginWrapper;
|
import org.pf4j.PluginWrapper;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class MqttPlugin extends Plugin {
|
public class MqttPlugin extends Plugin {
|
||||||
|
|
||||||
private ExecutorService executorService;
|
private MqttServerExtension mqttServerExtension;
|
||||||
@Resource
|
|
||||||
private DeviceDataApi deviceDataApi;
|
|
||||||
|
|
||||||
public MqttPlugin(PluginWrapper wrapper) {
|
public MqttPlugin(PluginWrapper wrapper) {
|
||||||
super(wrapper);
|
super(wrapper);
|
||||||
this.executorService = Executors.newSingleThreadExecutor();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
log.info("MqttPlugin.start()");
|
log.info("MQTT Plugin started.");
|
||||||
|
mqttServerExtension = new MqttServerExtension();
|
||||||
if (executorService.isShutdown() || executorService.isTerminated()) {
|
mqttServerExtension.startMqttServer();
|
||||||
executorService = Executors.newSingleThreadExecutor();
|
|
||||||
}
|
|
||||||
|
|
||||||
deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class);
|
|
||||||
if (deviceDataApi == null) {
|
|
||||||
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
log.info("MqttPlugin.stop()");
|
log.info("MQTT Plugin stopped.");
|
||||||
|
if (mqttServerExtension != null) {
|
||||||
|
mqttServerExtension.stopMqttServer().onComplete(ar -> {
|
||||||
|
if (ar.succeeded()) {
|
||||||
|
log.info("Stopped MQTT Server successfully");
|
||||||
|
} else {
|
||||||
|
log.error("Failed to stop MQTT Server: {}", ar.cause().getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,231 @@
|
||||||
|
package cn.iocoder.yudao.module.iot.plugin;
|
||||||
|
|
||||||
|
import io.netty.handler.codec.mqtt.MqttProperties;
|
||||||
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
|
import io.vertx.core.Future;
|
||||||
|
import io.vertx.core.Vertx;
|
||||||
|
import io.vertx.core.buffer.Buffer;
|
||||||
|
import io.vertx.mqtt.MqttEndpoint;
|
||||||
|
import io.vertx.mqtt.MqttServer;
|
||||||
|
import io.vertx.mqtt.MqttServerOptions;
|
||||||
|
import io.vertx.mqtt.MqttTopicSubscription;
|
||||||
|
import io.vertx.mqtt.messages.MqttDisconnectMessage;
|
||||||
|
import io.vertx.mqtt.messages.MqttPublishMessage;
|
||||||
|
import io.vertx.mqtt.messages.MqttSubscribeMessage;
|
||||||
|
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
|
||||||
|
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.pf4j.Extension;
|
||||||
|
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据官方示例,整合常见 MQTT 功能到 PF4J 的 Extension 类中
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Extension
|
||||||
|
public class MqttServerExtension {
|
||||||
|
|
||||||
|
private Vertx vertx;
|
||||||
|
private MqttServer mqttServer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 启动 MQTT 服务端
|
||||||
|
* 可根据需要决定是否启用 SSL/TLS、WebSocket、多实例部署等
|
||||||
|
*/
|
||||||
|
public void startMqttServer() {
|
||||||
|
// 初始化 Vert.x
|
||||||
|
vertx = Vertx.vertx();
|
||||||
|
|
||||||
|
// ========== 如果需要 SSL/TLS,请参考下面注释,启用注释并替换端口、证书路径等 ==========
|
||||||
|
// MqttServerOptions options = new MqttServerOptions()
|
||||||
|
// .setPort(8883)
|
||||||
|
// .setKeyCertOptions(new PemKeyCertOptions()
|
||||||
|
// .setKeyPath("./src/test/resources/tls/server-key.pem")
|
||||||
|
// .setCertPath("./src/test/resources/tls/server-cert.pem"))
|
||||||
|
// .setSsl(true);
|
||||||
|
|
||||||
|
// ========== 如果需要 WebSocket,请设置 setUseWebSocket(true) ==========
|
||||||
|
// options.setUseWebSocket(true);
|
||||||
|
|
||||||
|
// ========== 默认不启用 SSL 的示例 ==========
|
||||||
|
MqttServerOptions options = new MqttServerOptions()
|
||||||
|
.setPort(1883)
|
||||||
|
.setHost("0.0.0.0")
|
||||||
|
.setUseWebSocket(false); // 如果需要 WebSocket,请改为 true
|
||||||
|
|
||||||
|
mqttServer = MqttServer.create(vertx, options);
|
||||||
|
|
||||||
|
// 指定 endpointHandler,处理客户端连接等
|
||||||
|
mqttServer.endpointHandler(endpoint -> {
|
||||||
|
handleClientConnect(endpoint);
|
||||||
|
handleDisconnect(endpoint);
|
||||||
|
handleSubscribe(endpoint);
|
||||||
|
handleUnsubscribe(endpoint);
|
||||||
|
handlePublish(endpoint);
|
||||||
|
handlePing(endpoint);
|
||||||
|
});
|
||||||
|
|
||||||
|
// 启动监听
|
||||||
|
mqttServer.listen(ar -> {
|
||||||
|
if (ar.succeeded()) {
|
||||||
|
log.info("MQTT server is listening on port {}", mqttServer.actualPort());
|
||||||
|
} else {
|
||||||
|
log.error("Error on starting the server", ar.cause());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 优雅关闭 MQTT 服务端
|
||||||
|
*/
|
||||||
|
public Future<Void> stopMqttServer() {
|
||||||
|
if (mqttServer != null) {
|
||||||
|
return mqttServer.close().onComplete(ar -> {
|
||||||
|
if (ar.succeeded()) {
|
||||||
|
log.info("MQTT server closed.");
|
||||||
|
if (vertx != null) {
|
||||||
|
vertx.close();
|
||||||
|
log.info("Vert.x instance closed.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.error("Failed to close MQTT server: {}", ar.cause().getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return Future.succeededFuture();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== 以下为官方示例中常见事件的处理封装 ====================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理客户端连接 (CONNECT)
|
||||||
|
*/
|
||||||
|
private void handleClientConnect(MqttEndpoint endpoint) {
|
||||||
|
// 打印 CONNECT 的主要信息
|
||||||
|
log.info("MQTT client [{}] request to connect, clean session = {}",
|
||||||
|
endpoint.clientIdentifier(), endpoint.isCleanSession());
|
||||||
|
|
||||||
|
if (endpoint.auth() != null) {
|
||||||
|
log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword());
|
||||||
|
}
|
||||||
|
log.info("[properties = {}]", endpoint.connectProperties());
|
||||||
|
|
||||||
|
if (endpoint.will() != null) {
|
||||||
|
log.info("[will topic = {}, msg = {}, QoS = {}, isRetain = {}]",
|
||||||
|
endpoint.will().getWillTopic(),
|
||||||
|
new String(endpoint.will().getWillMessageBytes()),
|
||||||
|
endpoint.will().getWillQos(),
|
||||||
|
endpoint.will().isWillRetain());
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds());
|
||||||
|
|
||||||
|
// 接受远程客户端的连接
|
||||||
|
endpoint.accept(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理客户端主动断开 (DISCONNECT)
|
||||||
|
*/
|
||||||
|
private void handleDisconnect(MqttEndpoint endpoint) {
|
||||||
|
endpoint.disconnectMessageHandler((MqttDisconnectMessage disconnectMessage) -> {
|
||||||
|
log.info("Received disconnect from client [{}], reason code = {}",
|
||||||
|
endpoint.clientIdentifier(), disconnectMessage.code());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理客户端订阅 (SUBSCRIBE)
|
||||||
|
*/
|
||||||
|
private void handleSubscribe(MqttEndpoint endpoint) {
|
||||||
|
endpoint.subscribeHandler((MqttSubscribeMessage subscribe) -> {
|
||||||
|
List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();
|
||||||
|
for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
|
||||||
|
log.info("Subscription for {} with QoS {}", s.topicName(), s.qualityOfService());
|
||||||
|
// 将客户端请求的 QoS 转换为返回给客户端的 reason code(可能是错误码或实际 granted QoS)
|
||||||
|
reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
|
||||||
|
}
|
||||||
|
// 回复 SUBACK,MQTT 5.0 时可指定 reasonCodes、properties
|
||||||
|
endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理客户端取消订阅 (UNSUBSCRIBE)
|
||||||
|
*/
|
||||||
|
private void handleUnsubscribe(MqttEndpoint endpoint) {
|
||||||
|
endpoint.unsubscribeHandler((MqttUnsubscribeMessage unsubscribe) -> {
|
||||||
|
for (String topic : unsubscribe.topics()) {
|
||||||
|
log.info("Unsubscription for {}", topic);
|
||||||
|
}
|
||||||
|
// 回复 UNSUBACK,MQTT 5.0 时可指定 reasonCodes、properties
|
||||||
|
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理客户端发布的消息 (PUBLISH)
|
||||||
|
*/
|
||||||
|
private void handlePublish(MqttEndpoint endpoint) {
|
||||||
|
// 接收 PUBLISH 消息
|
||||||
|
endpoint.publishHandler((MqttPublishMessage message) -> {
|
||||||
|
String payload = message.payload().toString(Charset.defaultCharset());
|
||||||
|
log.info("Received message [{}] on topic [{}] with QoS [{}]",
|
||||||
|
payload, message.topicName(), message.qosLevel());
|
||||||
|
|
||||||
|
// 根据不同 QoS,回复客户端
|
||||||
|
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
|
||||||
|
endpoint.publishAcknowledge(message.messageId());
|
||||||
|
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
|
||||||
|
endpoint.publishReceived(message.messageId());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 如果 QoS = 2,需要处理 PUBREL
|
||||||
|
endpoint.publishReleaseHandler(messageId -> {
|
||||||
|
endpoint.publishComplete(messageId);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理客户端 PINGREQ
|
||||||
|
*/
|
||||||
|
private void handlePing(MqttEndpoint endpoint) {
|
||||||
|
endpoint.pingHandler(v -> {
|
||||||
|
// 这里仅做日志, PINGRESP 已自动发送
|
||||||
|
log.info("Ping received from client [{}]", endpoint.clientIdentifier());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== 如果需要服务端向客户端发布消息,可用以下示例 ====================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 服务端主动向已连接的某个 endpoint 发布消息的示例
|
||||||
|
* 如果使用 MQTT 5.0,可以传递更多消息属性
|
||||||
|
*/
|
||||||
|
public void publishToClient(MqttEndpoint endpoint, String topic, String content) {
|
||||||
|
endpoint.publish(topic,
|
||||||
|
Buffer.buffer(content),
|
||||||
|
MqttQoS.AT_LEAST_ONCE, // QoS 自行选择
|
||||||
|
false,
|
||||||
|
false);
|
||||||
|
|
||||||
|
// 处理 QoS 1 和 QoS 2 的 ACK
|
||||||
|
endpoint.publishAcknowledgeHandler(messageId -> {
|
||||||
|
log.info("Received PUBACK from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId);
|
||||||
|
}).publishReceivedHandler(messageId -> {
|
||||||
|
endpoint.publishRelease(messageId);
|
||||||
|
}).publishCompletionHandler(messageId -> {
|
||||||
|
log.info("Received PUBCOMP from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== 如果需要多实例部署,用于多核扩展,可参考以下思路 ====================
|
||||||
|
// 例如,在宿主应用或插件中循环启动多个 MqttServerExtension 实例,或使用 Vert.x 的 deployVerticle:
|
||||||
|
// DeploymentOptions options = new DeploymentOptions().setInstances(10);
|
||||||
|
// vertx.deployVerticle(() -> new MyMqttVerticle(), options);
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue