diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml
index 23859c7834..fa54eef606 100644
--- a/yudao-dependencies/pom.xml
+++ b/yudao-dependencies/pom.xml
@@ -35,6 +35,8 @@
3.3.3
2.3.2
+ 3.3.3
+ 3.4.3
2.2.7
@@ -285,12 +287,21 @@
yudao-spring-boot-starter-mq
${revision}
-
org.apache.rocketmq
rocketmq-spring-boot-starter
${rocketmq-spring.version}
+
+ org.springframework.kafka
+ spring-kafka
+ ${kafka-spring.version}
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+ ${rabbitmq-spring.version}
+
diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java
index cdec8d7979..295f35cffd 100644
--- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java
+++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java
@@ -22,7 +22,7 @@ public enum IotDataBridgTypeEnum implements ArrayValuable {
MQTT(10),
DATABASE(20),
- REDIS(21),
+ REDIS_STREAM(21),
ROCKETMQ(30),
RABBITMQ(31),
diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml
index ea1dde86fe..1b897c5d7f 100644
--- a/yudao-module-iot/yudao-module-iot-biz/pom.xml
+++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml
@@ -81,6 +81,16 @@
rocketmq-spring-boot-starter
true
+
+ org.springframework.kafka
+ spring-kafka
+ true
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+ true
+
org.pf4j
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java
index 213b0cda10..bb0623a1ff 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java
@@ -170,4 +170,104 @@ public class IotDataBridgeDO extends BaseDO {
}
+ /**
+ * Kafka 配置
+ */
+ @Data
+ public static class KafkaMQConfig implements Config {
+
+ /**
+ * Kafka 服务器地址
+ */
+ private String bootstrapServers;
+ /**
+ * 用户名
+ */
+ private String username;
+ /**
+ * 密码
+ */
+ private String password;
+ /**
+ * 是否启用 SSL
+ */
+ private Boolean ssl;
+
+ /**
+ * 主题
+ */
+ private String topic;
+
+ }
+
+ /**
+ * RabbitMQ 配置
+ */
+ @Data
+ public static class RabbitMQConfig implements Config {
+
+ /**
+ * RabbitMQ 服务器地址
+ */
+ private String host;
+ /**
+ * 端口
+ */
+ private Integer port;
+ /**
+ * 虚拟主机
+ */
+ private String virtualHost;
+ /**
+ * 用户名
+ */
+ private String username;
+ /**
+ * 密码
+ */
+ private String password;
+
+ /**
+ * 交换机名称
+ */
+ private String exchange;
+ /**
+ * 路由键
+ */
+ private String routingKey;
+ /**
+ * 队列名称
+ */
+ private String queue;
+ }
+
+ /**
+ * Redis Stream MQ 配置
+ */
+ @Data
+ public static class RedisStreamMQConfig implements Config {
+
+ /**
+ * Redis 服务器地址
+ */
+ private String host;
+ /**
+ * 端口
+ */
+ private Integer port;
+ /**
+ * 密码
+ */
+ private String password;
+ /**
+ * 数据库索引
+ */
+ private Integer database;
+
+ /**
+ * 主题
+ */
+ private String topic;
+ }
+
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java
new file mode 100644
index 0000000000..96b1edd330
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java
@@ -0,0 +1,80 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Duration;
+
+// TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟;
+// TODO @芋艿:mq-redis
+// TODO @芋艿:mq-数据库
+// TODO @芋艿:kafka
+// TODO @芋艿:rocketmq
+// TODO @芋艿:rabbitmq
+// TODO @芋艿:mqtt
+// TODO @芋艿:tcp
+// TODO @芋艿:websocket
+
+/**
+ * 带缓存功能的数据桥梁执行器抽象类
+ *
+ * @author HUIHUI
+ */
+@Slf4j
+public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute {
+
+ /**
+ * Producer 缓存
+ */
+ private final LoadingCache