diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml
index 23859c7834..7bdba64627 100644
--- a/yudao-dependencies/pom.xml
+++ b/yudao-dependencies/pom.xml
@@ -35,6 +35,7 @@
3.3.3
2.3.2
+ 3.3.3
2.2.7
@@ -285,13 +286,16 @@
yudao-spring-boot-starter-mq
${revision}
-
org.apache.rocketmq
rocketmq-spring-boot-starter
${rocketmq-spring.version}
-
+
+ org.springframework.kafka
+ spring-kafka
+ ${kafka-spring.version}
+
cn.iocoder.boot
diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml
index ea1dde86fe..1c07e49408 100644
--- a/yudao-module-iot/yudao-module-iot-biz/pom.xml
+++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml
@@ -81,6 +81,11 @@
rocketmq-spring-boot-starter
true
+
+ org.springframework.kafka
+ spring-kafka
+ true
+
org.pf4j
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java
index 213b0cda10..220edef718 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java
@@ -170,4 +170,38 @@ public class IotDataBridgeDO extends BaseDO {
}
+ /**
+ * Kafka 配置
+ */
+ @Data
+ public static class KafkaMQConfig implements Config {
+
+ /**
+ * Kafka 服务器地址
+ */
+ private String bootstrapServers;
+ /**
+ * 用户名
+ */
+ private String username;
+ /**
+ * 密码
+ */
+ private String password;
+ /**
+ * 是否启用 SSL
+ */
+ private Boolean ssl;
+
+ /**
+ * 生产者组 ID
+ */
+ private String groupId;
+ /**
+ * 主题
+ */
+ private String topic;
+
+ }
+
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java
index 264bce553e..e8fbb0ccb3 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java
@@ -8,7 +8,7 @@ import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
/**
- * 带缓存功能的数据桥接执行器抽象类
+ * 带缓存功能的数据桥梁执行器抽象类
*
* @author HUIHUI
*/
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java
index cd00f4f3e7..1617c3b091 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java
@@ -12,7 +12,7 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
public interface IotDataBridgeExecute {
/**
- * 执行数据桥接操作
+ * 执行数据桥梁操作
*
* @param message 设备消息
* @param dataBridge 数据桥梁
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java
index 76f1b793f6..27b8bc6bba 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java
@@ -32,7 +32,7 @@ public class IotHttpDataBridgeExecute implements IotDataBridgeExecute {
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
- // 1.1 校验数据桥接的类型 == HTTP
+ // 1.1 校验数据桥梁的类型 == HTTP
if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) {
return;
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java
new file mode 100644
index 0000000000..8c0ef2b038
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java
@@ -0,0 +1,124 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Kafka 的 {@link IotDataBridgeExecute} 实现类
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
+
+ @Override
+ public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
+ // 1.1 校验数据桥梁的类型 == KAFKA
+ if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) {
+ return;
+ }
+ // 1.2 执行 Kafka 发送消息
+ executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig());
+ }
+
+ private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) {
+ try {
+ // 1. 获取或创建 KafkaTemplate
+ KafkaTemplate kafkaTemplate = (KafkaTemplate) getProducer(config);
+
+ // 2. 发送消息并等待结果
+ kafkaTemplate.send(config.getTopic(), message.toString())
+ .get(10, TimeUnit.SECONDS); // 添加超时等待
+ log.info("[executeKafka][message({}) 发送成功]", message);
+ } catch (TimeoutException e) {
+ log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e);
+ } catch (Exception e) {
+ log.error("[executeKafka][message({}) config({}) 发送异常]", message, config, e);
+ }
+ }
+
+ @Override
+ protected Object initProducer(Object config) {
+ IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config;
+
+ // 1.1 构建生产者配置
+ Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ // 1.2 如果配置了认证信息
+ if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) {
+ props.put("security.protocol", "SASL_PLAINTEXT");
+ props.put("sasl.mechanism", "PLAIN");
+ props.put("sasl.jaas.config",
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ + kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";");
+ }
+
+ // 1.3 如果启用 SSL
+ if (Boolean.TRUE.equals(kafkaConfig.getSsl())) {
+ props.put("security.protocol", "SSL");
+ }
+
+ // 2. 创建 KafkaTemplate
+ DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(props);
+ return new KafkaTemplate<>(producerFactory);
+ }
+
+ @Override
+ protected void closeProducer(Object producer) {
+ if (producer instanceof KafkaTemplate) {
+ ((KafkaTemplate, ?>) producer).destroy();
+ }
+ }
+
+ // TODO @芋艿:测试代码,后续清理
+ public static void main(String[] args) {
+ // 1. 创建一个共享的实例
+ IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute();
+
+ // 2. 创建共享的配置
+ IotDataBridgeDO.KafkaMQConfig config = new IotDataBridgeDO.KafkaMQConfig();
+ config.setBootstrapServers("127.0.0.1:9092");
+ config.setTopic("test-topic");
+ config.setSsl(false);
+ config.setUsername(null);
+ config.setPassword(null);
+
+ // 3. 创建共享的消息
+ IotDeviceMessage message = IotDeviceMessage.builder()
+ .requestId("TEST-001")
+ .productKey("testProduct")
+ .deviceName("testDevice")
+ .deviceKey("testDeviceKey")
+ .type("property")
+ .identifier("temperature")
+ .data("{\"value\": 60}")
+ .reportTime(LocalDateTime.now())
+ .tenantId(1L)
+ .build();
+
+ // 4. 执行两次测试,验证缓存
+ log.info("[main][第一次执行,应该会创建新的 producer]");
+ action.executeKafka(message, config);
+
+ log.info("[main][第二次执行,应该会复用缓存的 producer]");
+ action.executeKafka(message, config);
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java
index af701cd903..3b53252539 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java
@@ -24,7 +24,7 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
- // 1.1 校验数据桥接的类型 == ROCKETMQ
+ // 1.1 校验数据桥梁的类型 == ROCKETMQ
if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) {
return;
}