From 603649d2485ec36f30b77ae2a17957fb4bf28100 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com> Date: Mon, 6 Jan 2025 18:59:26 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84?= =?UTF-8?q?=E3=80=91IoT:=20=E6=96=B0=E5=A2=9E=20MQTT=20RPC=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=EF=BC=8C=E5=8C=85=E5=90=AB=E8=AF=B7=E6=B1=82=E5=92=8C?= =?UTF-8?q?=E5=93=8D=E5=BA=94=E6=A8=A1=E5=9E=8B=E3=80=81=E5=BA=8F=E5=88=97?= =?UTF-8?q?=E5=8C=96=E5=B7=A5=E5=85=B7=E3=80=81MQTT=20=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E5=8F=8A=E5=AE=A2=E6=88=B7=E7=AB=AF/=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=E5=AE=9E=E7=8E=B0=EF=BC=8C=E6=8F=90=E4=BE=9B=E7=A4=BA?= =?UTF-8?q?=E4=BE=8B=E6=9C=8D=E5=8A=A1=E5=92=8C=E6=8E=A7=E5=88=B6=E5=99=A8?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=EF=BC=8C=E4=BC=98=E5=8C=96=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E7=BB=93=E6=9E=84=E4=BB=A5=E6=94=AF=E6=8C=81=20HTTP=20?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=E7=9A=84=E9=9B=86=E6=88=90=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/iot/mqttrpc/common/RpcRequest.java | 37 ++++++++ .../iot/mqttrpc/common/RpcResponse.java | 32 +++++++ .../mqttrpc/common/SerializationUtils.java | 18 ++++ .../module/iot/mqttrpc/config/MqttConfig.java | 40 ++++++++ .../module/iot/mqttrpc/server/RpcServer.java | 95 +++++++++++++++++++ .../iot/service/plugin/ExampleService.java | 43 +++++++++ .../service/plugin/PluginInfoServiceImpl.java | 1 - .../yudao-module-iot-http-plugin/pom.xml | 5 + .../iot/HttpPluginSpringbootApplication.java | 11 +++ .../module/iot/controller/RpcController.java | 31 ++++++ .../module/iot/mqttrpc/client/RpcClient.java | 95 +++++++++++++++++++ .../module/iot/mqttrpc/config/MqttConfig.java | 41 ++++++++ .../src/main/resources/application.yml | 15 +++ .../yudao-module-iot-mqtt-plugin/pom.xml | 6 +- .../yudao/module/iot/plugin/MqttPlugin.java | 13 +-- 15 files changed, 471 insertions(+), 12 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcRequest.java create mode 100644 yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcResponse.java create mode 100644 yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/SerializationUtils.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/ExampleService.java create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/HttpPluginSpringbootApplication.java create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/resources/application.yml diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcRequest.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcRequest.java new file mode 100644 index 0000000000..14e84175c0 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcRequest.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.module.iot.mqttrpc.common; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * MQTT RPC 请求 + * + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class RpcRequest { + + /** + * 方法名 + */ + private String method; + + /** + * 参数 + */ + private Object[] params; + + /** + * 关联 ID + */ + private String correlationId; + + /** + * 回复地址 + */ + private String replyTo; +} diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcResponse.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcResponse.java new file mode 100644 index 0000000000..675a6ee71b --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcResponse.java @@ -0,0 +1,32 @@ +package cn.iocoder.yudao.module.iot.mqttrpc.common; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * MQTT RPC 响应 + * + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class RpcResponse { + + /** + * 关联 ID + */ + private String correlationId; + + /** + * 结果 + */ + private Object result; + + /** + * 错误 + */ + private String error; +} diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/SerializationUtils.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/SerializationUtils.java new file mode 100644 index 0000000000..1529e2dba1 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/SerializationUtils.java @@ -0,0 +1,18 @@ +package cn.iocoder.yudao.module.iot.mqttrpc.common; + +import cn.hutool.json.JSONUtil; + +/** + * 序列化工具类 + * + */ +public class SerializationUtils { + + public static String serialize(Object obj) { + return JSONUtil.toJsonStr(obj); + } + + public static T deserialize(String json, Class clazz) { + return JSONUtil.toBean(json, clazz); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java new file mode 100644 index 0000000000..c7a0500030 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java @@ -0,0 +1,40 @@ +package cn.iocoder.yudao.module.iot.mqttrpc.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "mqtt") +public class MqttConfig { + /** + * MQTT 代理地址 + */ + private String broker; + + /** + * MQTT 用户名 + */ + private String username; + + /** + * MQTT 密码 + */ + private String password; + + /** + * MQTT 客户端 ID + */ + private String clientId; + + /** + * MQTT 请求主题 + */ + private String requestTopic; + + /** + * MQTT 响应主题前缀 + */ + private String responseTopicPrefix; +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java new file mode 100644 index 0000000000..be6ca6f831 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java @@ -0,0 +1,95 @@ +package cn.iocoder.yudao.module.iot.mqttrpc.server; + +import cn.hutool.core.lang.UUID; +import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest; +import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse; +import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils; +import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.HashMap; +import java.util.Map; + +@Service +@Slf4j +public class RpcServer { + + private final MqttConfig mqttConfig; + private final MqttClient mqttClient; + private final Map methodRegistry = new HashMap<>(); + + public RpcServer(MqttConfig mqttConfig) throws MqttException { + this.mqttConfig = mqttConfig; + this.mqttClient = new MqttClient(mqttConfig.getBroker(), "rpc-server-" + UUID.randomUUID(), new MemoryPersistence()); + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + options.setUserName(mqttConfig.getUsername()); + options.setPassword(mqttConfig.getPassword().toCharArray()); + this.mqttClient.connect(options); + } + + @PostConstruct + public void init() throws MqttException { + mqttClient.subscribe(mqttConfig.getRequestTopic(), this::handleRequest); + log.info("RPC Server subscribed to topic: {}", mqttConfig.getRequestTopic()); + } + + private void handleRequest(String topic, MqttMessage message) { + RpcRequest request = SerializationUtils.deserialize(new String(message.getPayload()), RpcRequest.class); + RpcResponse response = new RpcResponse(); + response.setCorrelationId(request.getCorrelationId()); + + try { + MethodInvoker invoker = methodRegistry.get(request.getMethod()); + if (invoker == null) { + throw new NoSuchMethodException("Unknown method: " + request.getMethod()); + } + Object result = invoker.invoke(request.getParams()); + response.setResult(result); + } catch (Exception e) { + response.setError(e.getMessage()); + log.error("Error processing RPC request: {}", e.getMessage(), e); + } + + String replyPayload = SerializationUtils.serialize(response); + MqttMessage replyMessage = new MqttMessage(replyPayload.getBytes()); + replyMessage.setQos(1); + try { + mqttClient.publish(request.getReplyTo(), replyMessage); + log.info("Published response to {}", request.getReplyTo()); + } catch (MqttException e) { + log.error("Failed to publish response: {}", e.getMessage(), e); + } + } + + /** + * 注册可调用的方法 + * + * @param methodName 方法名称 + * @param invoker 方法调用器 + */ + public void registerMethod(String methodName, MethodInvoker invoker) { + methodRegistry.put(methodName, invoker); + log.info("Registered method: {}", methodName); + } + + @PreDestroy + public void cleanup() throws MqttException { + mqttClient.disconnect(); + log.info("RPC Server disconnected"); + } + + /** + * 方法调用器接口 + */ + @FunctionalInterface + public interface MethodInvoker { + Object invoke(Object[] params) throws Exception; + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/ExampleService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/ExampleService.java new file mode 100644 index 0000000000..22ebe8b4f2 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/ExampleService.java @@ -0,0 +1,43 @@ +package cn.iocoder.yudao.module.iot.service.plugin; + +import cn.iocoder.yudao.module.iot.mqttrpc.server.RpcServer; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; + +@Service +@RequiredArgsConstructor +public class ExampleService { + + private final RpcServer rpcServer; + + @PostConstruct + public void registerMethods() { + rpcServer.registerMethod("add", params -> { + if (params.length != 2) { + throw new IllegalArgumentException("add方法需要两个参数"); + } + int a = ((Number) params[0]).intValue(); + int b = ((Number) params[1]).intValue(); + return add(a, b); + }); + + rpcServer.registerMethod("concat", params -> { + if (params.length != 2) { + throw new IllegalArgumentException("concat方法需要两个参数"); + } + String str1 = params[0].toString(); + String str2 = params[1].toString(); + return concat(str1, str2); + }); + } + + private int add(int a, int b) { + return a + b; + } + + private String concat(String a, String b) { + return a + b; + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/PluginInfoServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/PluginInfoServiceImpl.java index 7c856447bc..77cf590a0c 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/PluginInfoServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/PluginInfoServiceImpl.java @@ -18,7 +18,6 @@ import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; import org.springframework.web.multipart.MultipartFile; -import javax.annotation.PostConstruct; import java.io.File; import java.io.IOException; import java.nio.file.*; diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml index 1ecf140a47..27c1d19a0a 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml @@ -150,5 +150,10 @@ netty-all 4.1.63.Final + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/HttpPluginSpringbootApplication.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/HttpPluginSpringbootApplication.java new file mode 100644 index 0000000000..6b553f92bf --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/HttpPluginSpringbootApplication.java @@ -0,0 +1,11 @@ +package cn.iocoder.yudao.module.iot; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class HttpPluginSpringbootApplication { + public static void main(String[] args) { + SpringApplication.run(HttpPluginSpringbootApplication.class, args); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java new file mode 100644 index 0000000000..a5175a7862 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java @@ -0,0 +1,31 @@ + +package cn.iocoder.yudao.module.iot.controller; + +import cn.iocoder.yudao.module.iot.mqttrpc.client.RpcClient; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.util.concurrent.CompletableFuture; + +@RestController +@RequestMapping("/rpc") +@RequiredArgsConstructor +public class RpcController { + + @Resource + private RpcClient rpcClient; + + @PostMapping("/add") + public CompletableFuture add(@RequestParam int a, @RequestParam int b) throws Exception { + return rpcClient.call("add", new Object[]{a, b}, 10); + } + + @PostMapping("/concat") + public CompletableFuture concat(@RequestParam String str1, @RequestParam String str2) throws Exception { + return rpcClient.call("concat", new Object[]{str1, str2}, 10); + } +} diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java new file mode 100644 index 0000000000..73c1d936ce --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java @@ -0,0 +1,95 @@ +package cn.iocoder.yudao.module.iot.mqttrpc.client; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest; +import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse; +import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils; +import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.UUID; +import java.util.concurrent.*; + +@Service +@Slf4j +public class RpcClient { + + private final MqttConfig mqttConfig; + private final MqttClient mqttClient; + private final ConcurrentMap> pendingRequests = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + public RpcClient(MqttConfig mqttConfig) throws MqttException { + this.mqttConfig = mqttConfig; + this.mqttClient = new MqttClient(mqttConfig.getBroker(), mqttConfig.getClientId(), new MemoryPersistence()); + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + options.setUserName(mqttConfig.getUsername()); + options.setPassword(mqttConfig.getPassword().toCharArray()); + this.mqttClient.connect(options); + } + + @PostConstruct + public void init() throws MqttException { + mqttClient.subscribe(mqttConfig.getResponseTopicPrefix() + "#", this::handleResponse); + log.info("RPC Client subscribed to topics: {}", mqttConfig.getResponseTopicPrefix() + "#"); + } + + private void handleResponse(String topic, MqttMessage message) { + String correlationId = topic.substring(mqttConfig.getResponseTopicPrefix().length()); + RpcResponse response = SerializationUtils.deserialize(new String(message.getPayload()), RpcResponse.class); + CompletableFuture future = pendingRequests.remove(correlationId); + if (future != null) { + if (response.getError() != null) { + future.completeExceptionally(new RuntimeException(response.getError())); + } else { + future.complete(response); + } + } else { + log.warn("Received response for unknown correlationId: {}", correlationId); + } + } + + public CompletableFuture call(String method, Object[] params, int timeoutSeconds) throws MqttException { + String correlationId = UUID.randomUUID().toString(); + String replyTo = mqttConfig.getResponseTopicPrefix() + correlationId; + + RpcRequest request = new RpcRequest(method, params, correlationId, replyTo); + String payload = SerializationUtils.serialize(request); + MqttMessage message = new MqttMessage(payload.getBytes()); + message.setQos(1); + mqttClient.publish(mqttConfig.getRequestTopic(), message); + + CompletableFuture futureResponse = new CompletableFuture<>(); + pendingRequests.put(correlationId, futureResponse); + + // 设置超时 + scheduler.schedule(() -> { + CompletableFuture removed = pendingRequests.remove(correlationId); + if (removed != null) { + removed.completeExceptionally(new TimeoutException("RPC call timed out")); + } + }, timeoutSeconds, TimeUnit.SECONDS); + + // 返回最终的结果 + return futureResponse.thenApply(RpcResponse::getResult); + } + + @PreDestroy + public void cleanup() throws MqttException { + mqttClient.disconnect(); + scheduler.shutdown(); + log.info("RPC Client disconnected"); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java new file mode 100644 index 0000000000..89569b0c3d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java @@ -0,0 +1,41 @@ +package cn.iocoder.yudao.module.iot.mqttrpc.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "mqtt") +public class MqttConfig { + + /** + * MQTT 代理地址 + */ + private String broker; + + /** + * MQTT 用户名 + */ + private String username; + + /** + * MQTT 密码 + */ + private String password; + + /** + * MQTT 客户端 ID + */ + private String clientId; + + /** + * MQTT 请求主题 + */ + private String requestTopic; + + /** + * MQTT 响应主题前缀 + */ + private String responseTopicPrefix; +} diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/resources/application.yml new file mode 100644 index 0000000000..ea2234f83e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/resources/application.yml @@ -0,0 +1,15 @@ +server: + port: 8092 + +spring: + application: + name: yudao-module-iot-http-plugin + +# MQTT-RPC 配置 +mqtt: + broker: tcp://chaojiniu.top:1883 + username: haohao + password: ahh@123456 + clientId: mqtt-rpc-client-${random.int} + requestTopic: rpc/request + responseTopicPrefix: rpc/response/ diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml index d5d48d09a4..9607e0f93c 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml @@ -145,10 +145,10 @@ ${lombok.version} provided + - io.netty - netty-all - 4.1.63.Final + org.eclipse.paho + org.eclipse.paho.client.mqttv3 \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java index 5b50c71241..b3749e4025 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java @@ -1,11 +1,12 @@ package cn.iocoder.yudao.module.iot.plugin; -import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; import cn.iocoder.yudao.module.iot.api.ServiceRegistry; +import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; import lombok.extern.slf4j.Slf4j; -import org.pf4j.PluginWrapper; import org.pf4j.Plugin; +import org.pf4j.PluginWrapper; +import javax.annotation.Resource; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -13,11 +14,11 @@ import java.util.concurrent.Executors; public class MqttPlugin extends Plugin { private ExecutorService executorService; + @Resource private DeviceDataApi deviceDataApi; public MqttPlugin(PluginWrapper wrapper) { super(wrapper); - // 初始化线程池 this.executorService = Executors.newSingleThreadExecutor(); } @@ -25,24 +26,20 @@ public class MqttPlugin extends Plugin { public void start() { log.info("MqttPlugin.start()"); - // 重新初始化线程池,确保它是活跃的 if (executorService.isShutdown() || executorService.isTerminated()) { executorService = Executors.newSingleThreadExecutor(); } - // 从 ServiceRegistry 中获取主程序暴露的 DeviceDataApi 接口实例 deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class); if (deviceDataApi == null) { log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!"); return; } + } @Override public void stop() { log.info("MqttPlugin.stop()"); - // 停止线程池 - executorService.shutdownNow(); } - } \ No newline at end of file