From 39fb31d4002fd19beca1d4d1341d73cc3823b0e5 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 25 May 2025 11:19:35 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E5=A2=9E=E5=8A=A0=E6=B6=88=E6=81=AF=E6=80=BB?= =?UTF-8?q?=E7=BA=BF=EF=BC=88messagebus=EF=BC=89=E7=9A=84=20local=20?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-module-iot/pom.xml | 1 + .../yudao-module-iot-core/pom.xml | 24 +++ .../yudao-module-iot-message-bus/pom.xml | 65 +++++++ .../IotMessageBusAutoConfiguration.java | 37 ++++ .../config/IotMessageBusProperties.java | 28 +++ .../core/AbstractIotMessageBus.java | 69 ++++++++ .../iot/messagebus/core/IotMessageBus.java | 28 +++ .../core/IotMessageBusSubscriber.java | 27 +++ .../core/local/LocalIotMessage.java | 14 ++ .../core/local/LocalIotMessageBus.java | 39 +++++ .../main/resources/META-INF/spring.factories | 2 + .../LocalIotMessageBusIntegrationTest.java | 162 ++++++++++++++++++ .../src/test/resources/application-test.yml | 4 + 13 files changed, 500 insertions(+) create mode 100644 yudao-module-iot/yudao-module-iot-core/pom.xml create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/pom.xml create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusProperties.java create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessage.java create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/resources/META-INF/spring.factories create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java create mode 100644 yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml diff --git a/yudao-module-iot/pom.xml b/yudao-module-iot/pom.xml index 6251887c46..8b4192662e 100644 --- a/yudao-module-iot/pom.xml +++ b/yudao-module-iot/pom.xml @@ -11,6 +11,7 @@ yudao-module-iot-biz yudao-module-iot-net-components yudao-module-iot-protocol + yudao-module-iot-core 4.0.0 diff --git a/yudao-module-iot/yudao-module-iot-core/pom.xml b/yudao-module-iot/yudao-module-iot-core/pom.xml new file mode 100644 index 0000000000..2da96dc8e9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/pom.xml @@ -0,0 +1,24 @@ + + + + yudao-module-iot + cn.iocoder.boot + ${revision} + + + yudao-module-iot-message-bus + + 4.0.0 + + yudao-module-iot-core + pom + + ${project.artifactId} + + iot 模块下,提供 biz 和 gateway-server 模块的核心功能。 + 例如说:消息总线、消息协议(编解码)等。 + + + \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/pom.xml b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/pom.xml new file mode 100644 index 0000000000..436ec9ec67 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/pom.xml @@ -0,0 +1,65 @@ + + + + yudao-module-iot-core + cn.iocoder.boot + ${revision} + + 4.0.0 + yudao-module-iot-message-bus + jar + + ${project.artifactId} + + iot 模块下,提供消息总线的功能。 + 可选择使用 spring event、redis stream、rocketmq、kafka、rabbitmq 等。 + + + + + cn.iocoder.boot + yudao-common + + + + + org.springframework.boot + spring-boot-starter + + + + + org.springframework.data + spring-data-redis + true + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + true + + + + org.springframework.amqp + spring-rabbit + true + + + + org.springframework.kafka + spring-kafka + true + + + + + org.springframework.boot + spring-boot-starter-test + test + + + + \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java new file mode 100644 index 0000000000..2bd9d82d5f --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.module.iot.messagebus.config; + +import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.messagebus.core.local.LocalIotMessageBus; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * IoT 消息总线自动配置 + * + * @author 芋道源码 + */ +@AutoConfiguration +@EnableConfigurationProperties(IotMessageBusProperties.class) +@Slf4j +public class IotMessageBusAutoConfiguration { + + // ==================== Local 实现 ==================== + + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "local", matchIfMissing = true) + public static class LocalIotMessageBusConfiguration { + + @Bean + public IotMessageBus localIotMessageBus(ApplicationContext applicationContext) { + log.info("[localIotMessageBus][创建 Local IoT 消息总线]"); + return new LocalIotMessageBus(applicationContext); + } + + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusProperties.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusProperties.java new file mode 100644 index 0000000000..eac974ee54 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusProperties.java @@ -0,0 +1,28 @@ +package cn.iocoder.yudao.module.iot.messagebus.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +/** + * IoT 消息总线配置属性 + * + * @author 芋道源码 + */ +@ConfigurationProperties("yudao.iot.message-bus") +@Data +@Validated +public class IotMessageBusProperties { + + /** + * 消息总线类型 + * + * 可选值:local、redis、rocketmq、rabbitmq + */ + @NotNull(message = "IoT 消息总线类型不能为空") + private String type = "local"; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java new file mode 100644 index 0000000000..bc4cd91840 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java @@ -0,0 +1,69 @@ +package cn.iocoder.yudao.module.iot.messagebus.core; + +import cn.hutool.core.collection.CollUtil; +import lombok.extern.slf4j.Slf4j; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * IoT 消息总线抽象基类 + * + * 提供通用的订阅者管理功能 + * + * @author 芋道源码 + */ +@Slf4j +public abstract class AbstractIotMessageBus implements IotMessageBus { + + /** + * 订阅者映射表 + * Key: topic + */ + private final Map>> subscribers = new ConcurrentHashMap<>(); + + @Override + public void register(String topic, IotMessageBusSubscriber subscriber) { + // 执行注册 + doRegister(topic, subscriber); + + // 添加订阅者映射 + List> topicSubscribers = subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()); + topicSubscribers.add(subscriber); + topicSubscribers.sort(Comparator.comparingInt(IotMessageBusSubscriber::order)); + log.info("[register][topic({}) 注册订阅者({})成功]", topic, subscriber.getClass().getName()); + } + + /** + * 注册订阅者 + * + * @param topic 主题 + * @param subscriber 订阅者 + */ + protected abstract void doRegister(String topic, IotMessageBusSubscriber subscriber); + + /** + * 通知订阅者 + * + * @param message 消息 + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + protected void notifySubscribers(String topic, Object message) { + List> topicSubscribers = subscribers.get(topic); + if (CollUtil.isEmpty(topicSubscribers)) { + return; + } + for (IotMessageBusSubscriber subscriber : topicSubscribers) { + try { + subscriber.onMessage(topic, message); + } catch (Exception e) { + log.error("[notifySubscribers][topic({}) message({}) 通知订阅者({})失败]", + topic, e, subscriber.getClass().getName()); + } + } + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java new file mode 100644 index 0000000000..de8bc067a7 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java @@ -0,0 +1,28 @@ +package cn.iocoder.yudao.module.iot.messagebus.core; + +/** + * IoT 消息总线接口 + * + * 用于在 IoT 系统中发布和订阅消息,支持多种消息中间件实现 + * + * @author 芋道源码 + */ +public interface IotMessageBus { + + /** + * 发布消息到消息总线 + * + * @param topic 主题 + * @param message 消息内容 + */ + void post(String topic, Object message); + + /** + * 注册消息订阅者 + * + * @param topic 主题 + * @param subscriber 订阅者 + */ + void register(String topic, IotMessageBusSubscriber subscriber); + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java new file mode 100644 index 0000000000..aec1b3b3e2 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java @@ -0,0 +1,27 @@ +package cn.iocoder.yudao.module.iot.messagebus.core; + +/** + * IoT 消息总线订阅者接口 + * + * 用于处理从消息总线接收到的消息 + * + * @author 芋道源码 + */ +public interface IotMessageBusSubscriber { + + /** + * 处理接收到的消息 + * + * @param topic 主题 + * @param message 消息内容 + */ + void onMessage(String topic, T message); + + /** + * 获取订阅者的顺序 + * + * @return 顺序值 + */ + int order(); + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessage.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessage.java new file mode 100644 index 0000000000..bd048e558a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessage.java @@ -0,0 +1,14 @@ +package cn.iocoder.yudao.module.iot.messagebus.core.local; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class LocalIotMessage { + + private String topic; + + private Object message; + +} diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java new file mode 100644 index 0000000000..eb09ac7de6 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java @@ -0,0 +1,39 @@ +package cn.iocoder.yudao.module.iot.messagebus.core.local; + +import cn.iocoder.yudao.module.iot.messagebus.core.AbstractIotMessageBus; +import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; +import org.springframework.context.event.EventListener; + +/** + * 本地的 {@link IotMessageBus} 实现类 + * + * 注意:仅适用于单机场景!!! + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class LocalIotMessageBus extends AbstractIotMessageBus { + + private final ApplicationContext applicationContext; + + @Override + public void post(String topic, Object message) { + applicationContext.publishEvent(new LocalIotMessage(topic, message)); + } + + @Override + protected void doRegister(String topic, IotMessageBusSubscriber subscriber) { + // 无需实现,交给 Spring @EventListener 监听 + } + + @EventListener + public void onMessage(LocalIotMessage message) { + notifySubscribers(message.getTopic(), message.getMessage()); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/resources/META-INF/spring.factories b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..04096de530 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +cn.iocoder.yudao.module.iot.messagebus.config.IotMessageBusAutoConfiguration \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java new file mode 100644 index 0000000000..cf6a1cd142 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java @@ -0,0 +1,162 @@ +package cn.iocoder.yudao.module.iot.messagebus.core.local; + +import cn.iocoder.yudao.module.iot.messagebus.config.IotMessageBusAutoConfiguration; +import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.TestPropertySource; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link LocalIotMessageBus} 集成测试 + * + * @author 芋道源码 + */ +@SpringBootTest(classes = LocalIotMessageBusIntegrationTest.class) +@Import(IotMessageBusAutoConfiguration.class) +@TestPropertySource(properties = { + "yudao.iot.message-bus.type=local" +}) +@Slf4j +public class LocalIotMessageBusIntegrationTest { + + @Resource + private IotMessageBus messageBus; + + /** + * 1 topic 2 subscriber + */ + @Test + public void testSendMessageWithTwoSubscribers() throws InterruptedException { + // 准备 + String topic = "test-topic"; + String testMessage = "Hello IoT Message Bus!"; + // 用于等待消息处理完成 + CountDownLatch latch = new CountDownLatch(2); + // 用于记录接收到的消息 + AtomicInteger subscriber1Count = new AtomicInteger(0); + AtomicInteger subscriber2Count = new AtomicInteger(0); + + // 创建第一个订阅者 + IotMessageBusSubscriber subscriber1 = new IotMessageBusSubscriber<>() { + + @Override + public void onMessage(String topic, String message) { + log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", topic, message); + subscriber1Count.incrementAndGet(); + assertEquals("test-topic", topic); + assertEquals(testMessage, message); + latch.countDown(); + } + + @Override + public int order() { + return 1; + } + + }; + // 创建第二个订阅者 + IotMessageBusSubscriber subscriber2 = new IotMessageBusSubscriber<>() { + + @Override + public void onMessage(String topic, String message) { + log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", topic, message); + subscriber2Count.incrementAndGet(); + assertEquals("test-topic", topic); + assertEquals(testMessage, message); + latch.countDown(); + } + + @Override + public int order() { + return 0; + } + + }; + // 注册订阅者 + messageBus.register(topic, subscriber1); + messageBus.register(topic, subscriber2); + + // 发送消息 + log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage); + messageBus.post(topic, testMessage); + // 等待消息处理完成(最多等待5秒) + boolean completed = latch.await(5, TimeUnit.SECONDS); + + // 验证结果 + assertTrue(completed, "消息处理超时"); + assertEquals(1, subscriber1Count.get(), "订阅者1应该收到1条消息"); + assertEquals(1, subscriber2Count.get(), "订阅者2应该收到1条消息"); + log.info("[测试] 测试完成 - 订阅者1收到{}条消息,订阅者2收到{}条消息", subscriber1Count.get(), subscriber2Count.get()); + } + + /** + * 2 topic 2 subscriber + */ + @Test + public void testMultipleTopics() throws InterruptedException { + // 准备 + String topic1 = "device-status"; + String topic2 = "device-data"; + String message1 = "设备在线"; + String message2 = "温度:25°C"; + CountDownLatch latch = new CountDownLatch(2); + + // 创建订阅者 1 - 只订阅设备状态 + IotMessageBusSubscriber statusSubscriber = new IotMessageBusSubscriber<>() { + + @Override + public void onMessage(String topic, String message) { + log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", topic, message); + assertEquals(topic1, topic); + assertEquals(message1, message); + latch.countDown(); + } + + @Override + public int order() { + return 0; + } + + }; + // 创建订阅者 2 - 只订阅设备数据 + IotMessageBusSubscriber dataSubscriber = new IotMessageBusSubscriber<>() { + + @Override + public void onMessage(String topic, String message) { + log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", topic, message); + assertEquals(topic2, topic); + assertEquals(message2, message); + latch.countDown(); + } + + @Override + public int order() { + return 1; + } + + }; + // 注册订阅者到不同主题 + messageBus.register(topic1, statusSubscriber); + messageBus.register(topic2, dataSubscriber); + + // 发送消息到不同主题 + messageBus.post(topic1, message1); + messageBus.post(topic2, message2); + // 等待消息处理完成 + boolean completed = latch.await(5, TimeUnit.SECONDS); + assertTrue(completed, "消息处理超时"); + log.info("[测试] 多主题测试完成"); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml new file mode 100644 index 0000000000..59571cd4dd --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml @@ -0,0 +1,4 @@ +yudao: + iot: + message-bus: + type: local \ No newline at end of file