diff --git a/yudao-module-iot/pom.xml b/yudao-module-iot/pom.xml
index 6251887c46..8b4192662e 100644
--- a/yudao-module-iot/pom.xml
+++ b/yudao-module-iot/pom.xml
@@ -11,6 +11,7 @@
yudao-module-iot-biz
yudao-module-iot-net-components
yudao-module-iot-protocol
+ yudao-module-iot-core
4.0.0
diff --git a/yudao-module-iot/yudao-module-iot-core/pom.xml b/yudao-module-iot/yudao-module-iot-core/pom.xml
new file mode 100644
index 0000000000..2da96dc8e9
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/pom.xml
@@ -0,0 +1,24 @@
+
+
+
+ yudao-module-iot
+ cn.iocoder.boot
+ ${revision}
+
+
+ yudao-module-iot-message-bus
+
+ 4.0.0
+
+ yudao-module-iot-core
+ pom
+
+ ${project.artifactId}
+
+ iot 模块下,提供 biz 和 gateway-server 模块的核心功能。
+ 例如说:消息总线、消息协议(编解码)等。
+
+
+
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/pom.xml b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/pom.xml
new file mode 100644
index 0000000000..436ec9ec67
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/pom.xml
@@ -0,0 +1,65 @@
+
+
+
+ yudao-module-iot-core
+ cn.iocoder.boot
+ ${revision}
+
+ 4.0.0
+ yudao-module-iot-message-bus
+ jar
+
+ ${project.artifactId}
+
+ iot 模块下,提供消息总线的功能。
+ 可选择使用 spring event、redis stream、rocketmq、kafka、rabbitmq 等。
+
+
+
+
+ cn.iocoder.boot
+ yudao-common
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+
+ org.springframework.data
+ spring-data-redis
+ true
+
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ true
+
+
+
+ org.springframework.amqp
+ spring-rabbit
+ true
+
+
+
+ org.springframework.kafka
+ spring-kafka
+ true
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java
new file mode 100644
index 0000000000..2bd9d82d5f
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java
@@ -0,0 +1,37 @@
+package cn.iocoder.yudao.module.iot.messagebus.config;
+
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.local.LocalIotMessageBus;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * IoT 消息总线自动配置
+ *
+ * @author 芋道源码
+ */
+@AutoConfiguration
+@EnableConfigurationProperties(IotMessageBusProperties.class)
+@Slf4j
+public class IotMessageBusAutoConfiguration {
+
+ // ==================== Local 实现 ====================
+
+ @Configuration
+ @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "local", matchIfMissing = true)
+ public static class LocalIotMessageBusConfiguration {
+
+ @Bean
+ public IotMessageBus localIotMessageBus(ApplicationContext applicationContext) {
+ log.info("[localIotMessageBus][创建 Local IoT 消息总线]");
+ return new LocalIotMessageBus(applicationContext);
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusProperties.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusProperties.java
new file mode 100644
index 0000000000..eac974ee54
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusProperties.java
@@ -0,0 +1,28 @@
+package cn.iocoder.yudao.module.iot.messagebus.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+import jakarta.validation.constraints.NotEmpty;
+import jakarta.validation.constraints.NotNull;
+
+/**
+ * IoT 消息总线配置属性
+ *
+ * @author 芋道源码
+ */
+@ConfigurationProperties("yudao.iot.message-bus")
+@Data
+@Validated
+public class IotMessageBusProperties {
+
+ /**
+ * 消息总线类型
+ *
+ * 可选值:local、redis、rocketmq、rabbitmq
+ */
+ @NotNull(message = "IoT 消息总线类型不能为空")
+ private String type = "local";
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java
new file mode 100644
index 0000000000..bc4cd91840
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java
@@ -0,0 +1,69 @@
+package cn.iocoder.yudao.module.iot.messagebus.core;
+
+import cn.hutool.core.collection.CollUtil;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * IoT 消息总线抽象基类
+ *
+ * 提供通用的订阅者管理功能
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public abstract class AbstractIotMessageBus implements IotMessageBus {
+
+ /**
+ * 订阅者映射表
+ * Key: topic
+ */
+ private final Map>> subscribers = new ConcurrentHashMap<>();
+
+ @Override
+ public void register(String topic, IotMessageBusSubscriber> subscriber) {
+ // 执行注册
+ doRegister(topic, subscriber);
+
+ // 添加订阅者映射
+ List> topicSubscribers = subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>());
+ topicSubscribers.add(subscriber);
+ topicSubscribers.sort(Comparator.comparingInt(IotMessageBusSubscriber::order));
+ log.info("[register][topic({}) 注册订阅者({})成功]", topic, subscriber.getClass().getName());
+ }
+
+ /**
+ * 注册订阅者
+ *
+ * @param topic 主题
+ * @param subscriber 订阅者
+ */
+ protected abstract void doRegister(String topic, IotMessageBusSubscriber> subscriber);
+
+ /**
+ * 通知订阅者
+ *
+ * @param message 消息
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ protected void notifySubscribers(String topic, Object message) {
+ List> topicSubscribers = subscribers.get(topic);
+ if (CollUtil.isEmpty(topicSubscribers)) {
+ return;
+ }
+ for (IotMessageBusSubscriber subscriber : topicSubscribers) {
+ try {
+ subscriber.onMessage(topic, message);
+ } catch (Exception e) {
+ log.error("[notifySubscribers][topic({}) message({}) 通知订阅者({})失败]",
+ topic, e, subscriber.getClass().getName());
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java
new file mode 100644
index 0000000000..de8bc067a7
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java
@@ -0,0 +1,28 @@
+package cn.iocoder.yudao.module.iot.messagebus.core;
+
+/**
+ * IoT 消息总线接口
+ *
+ * 用于在 IoT 系统中发布和订阅消息,支持多种消息中间件实现
+ *
+ * @author 芋道源码
+ */
+public interface IotMessageBus {
+
+ /**
+ * 发布消息到消息总线
+ *
+ * @param topic 主题
+ * @param message 消息内容
+ */
+ void post(String topic, Object message);
+
+ /**
+ * 注册消息订阅者
+ *
+ * @param topic 主题
+ * @param subscriber 订阅者
+ */
+ void register(String topic, IotMessageBusSubscriber> subscriber);
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java
new file mode 100644
index 0000000000..aec1b3b3e2
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java
@@ -0,0 +1,27 @@
+package cn.iocoder.yudao.module.iot.messagebus.core;
+
+/**
+ * IoT 消息总线订阅者接口
+ *
+ * 用于处理从消息总线接收到的消息
+ *
+ * @author 芋道源码
+ */
+public interface IotMessageBusSubscriber {
+
+ /**
+ * 处理接收到的消息
+ *
+ * @param topic 主题
+ * @param message 消息内容
+ */
+ void onMessage(String topic, T message);
+
+ /**
+ * 获取订阅者的顺序
+ *
+ * @return 顺序值
+ */
+ int order();
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessage.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessage.java
new file mode 100644
index 0000000000..bd048e558a
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessage.java
@@ -0,0 +1,14 @@
+package cn.iocoder.yudao.module.iot.messagebus.core.local;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class LocalIotMessage {
+
+ private String topic;
+
+ private Object message;
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java
new file mode 100644
index 0000000000..eb09ac7de6
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java
@@ -0,0 +1,39 @@
+package cn.iocoder.yudao.module.iot.messagebus.core.local;
+
+import cn.iocoder.yudao.module.iot.messagebus.core.AbstractIotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.event.EventListener;
+
+/**
+ * 本地的 {@link IotMessageBus} 实现类
+ *
+ * 注意:仅适用于单机场景!!!
+ *
+ * @author 芋道源码
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class LocalIotMessageBus extends AbstractIotMessageBus {
+
+ private final ApplicationContext applicationContext;
+
+ @Override
+ public void post(String topic, Object message) {
+ applicationContext.publishEvent(new LocalIotMessage(topic, message));
+ }
+
+ @Override
+ protected void doRegister(String topic, IotMessageBusSubscriber> subscriber) {
+ // 无需实现,交给 Spring @EventListener 监听
+ }
+
+ @EventListener
+ public void onMessage(LocalIotMessage message) {
+ notifySubscribers(message.getTopic(), message.getMessage());
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/resources/META-INF/spring.factories b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000000..04096de530
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+cn.iocoder.yudao.module.iot.messagebus.config.IotMessageBusAutoConfiguration
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java
new file mode 100644
index 0000000000..cf6a1cd142
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java
@@ -0,0 +1,162 @@
+package cn.iocoder.yudao.module.iot.messagebus.core.local;
+
+import cn.iocoder.yudao.module.iot.messagebus.config.IotMessageBusAutoConfiguration;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Import;
+import org.springframework.test.context.TestPropertySource;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link LocalIotMessageBus} 集成测试
+ *
+ * @author 芋道源码
+ */
+@SpringBootTest(classes = LocalIotMessageBusIntegrationTest.class)
+@Import(IotMessageBusAutoConfiguration.class)
+@TestPropertySource(properties = {
+ "yudao.iot.message-bus.type=local"
+})
+@Slf4j
+public class LocalIotMessageBusIntegrationTest {
+
+ @Resource
+ private IotMessageBus messageBus;
+
+ /**
+ * 1 topic 2 subscriber
+ */
+ @Test
+ public void testSendMessageWithTwoSubscribers() throws InterruptedException {
+ // 准备
+ String topic = "test-topic";
+ String testMessage = "Hello IoT Message Bus!";
+ // 用于等待消息处理完成
+ CountDownLatch latch = new CountDownLatch(2);
+ // 用于记录接收到的消息
+ AtomicInteger subscriber1Count = new AtomicInteger(0);
+ AtomicInteger subscriber2Count = new AtomicInteger(0);
+
+ // 创建第一个订阅者
+ IotMessageBusSubscriber subscriber1 = new IotMessageBusSubscriber<>() {
+
+ @Override
+ public void onMessage(String topic, String message) {
+ log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", topic, message);
+ subscriber1Count.incrementAndGet();
+ assertEquals("test-topic", topic);
+ assertEquals(testMessage, message);
+ latch.countDown();
+ }
+
+ @Override
+ public int order() {
+ return 1;
+ }
+
+ };
+ // 创建第二个订阅者
+ IotMessageBusSubscriber subscriber2 = new IotMessageBusSubscriber<>() {
+
+ @Override
+ public void onMessage(String topic, String message) {
+ log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", topic, message);
+ subscriber2Count.incrementAndGet();
+ assertEquals("test-topic", topic);
+ assertEquals(testMessage, message);
+ latch.countDown();
+ }
+
+ @Override
+ public int order() {
+ return 0;
+ }
+
+ };
+ // 注册订阅者
+ messageBus.register(topic, subscriber1);
+ messageBus.register(topic, subscriber2);
+
+ // 发送消息
+ log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
+ messageBus.post(topic, testMessage);
+ // 等待消息处理完成(最多等待5秒)
+ boolean completed = latch.await(5, TimeUnit.SECONDS);
+
+ // 验证结果
+ assertTrue(completed, "消息处理超时");
+ assertEquals(1, subscriber1Count.get(), "订阅者1应该收到1条消息");
+ assertEquals(1, subscriber2Count.get(), "订阅者2应该收到1条消息");
+ log.info("[测试] 测试完成 - 订阅者1收到{}条消息,订阅者2收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
+ }
+
+ /**
+ * 2 topic 2 subscriber
+ */
+ @Test
+ public void testMultipleTopics() throws InterruptedException {
+ // 准备
+ String topic1 = "device-status";
+ String topic2 = "device-data";
+ String message1 = "设备在线";
+ String message2 = "温度:25°C";
+ CountDownLatch latch = new CountDownLatch(2);
+
+ // 创建订阅者 1 - 只订阅设备状态
+ IotMessageBusSubscriber statusSubscriber = new IotMessageBusSubscriber<>() {
+
+ @Override
+ public void onMessage(String topic, String message) {
+ log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", topic, message);
+ assertEquals(topic1, topic);
+ assertEquals(message1, message);
+ latch.countDown();
+ }
+
+ @Override
+ public int order() {
+ return 0;
+ }
+
+ };
+ // 创建订阅者 2 - 只订阅设备数据
+ IotMessageBusSubscriber dataSubscriber = new IotMessageBusSubscriber<>() {
+
+ @Override
+ public void onMessage(String topic, String message) {
+ log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", topic, message);
+ assertEquals(topic2, topic);
+ assertEquals(message2, message);
+ latch.countDown();
+ }
+
+ @Override
+ public int order() {
+ return 1;
+ }
+
+ };
+ // 注册订阅者到不同主题
+ messageBus.register(topic1, statusSubscriber);
+ messageBus.register(topic2, dataSubscriber);
+
+ // 发送消息到不同主题
+ messageBus.post(topic1, message1);
+ messageBus.post(topic2, message2);
+ // 等待消息处理完成
+ boolean completed = latch.await(5, TimeUnit.SECONDS);
+ assertTrue(completed, "消息处理超时");
+ log.info("[测试] 多主题测试完成");
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml
new file mode 100644
index 0000000000..59571cd4dd
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml
@@ -0,0 +1,4 @@
+yudao:
+ iot:
+ message-bus:
+ type: local
\ No newline at end of file