!1269 【代码优化】IoT: 优化数据桥梁

Merge pull request !1269 from puhui999/iot
This commit is contained in:
芋道源码 2025-03-13 00:18:30 +00:00 committed by Gitee
commit 27e08266e0
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
30 changed files with 859 additions and 471 deletions

View File

@ -15,5 +15,7 @@ public class DictTypeConstants {
public static final String VALIDATE_TYPE = "iot_validate_type";
public static final String DEVICE_STATE = "iot_device_state";
public static final String IOT_DATA_BRIDGE_DIRECTION_ENUM = "iot_data_bridge_direction_enum";
public static final String IOT_DATA_BRIDGE_TYPE_ENUM = "iot_data_bridge_type_enum";
}

View File

@ -70,4 +70,7 @@ public interface ErrorCodeConstants {
// ========== MQTT 通信相关 1-050-009-000 ==========
ErrorCode MQTT_TOPIC_ILLEGAL = new ErrorCode(1_050_009_000, "topic illegal");
// ========== IoT 数据桥梁 1-050-010-000 ==========
ErrorCode DATA_BRIDGE_NOT_EXISTS = new ErrorCode(1_050_010_000, "IoT 数据桥梁不存在");
}

View File

@ -13,14 +13,14 @@ import java.util.Arrays;
*/
@RequiredArgsConstructor
@Getter
public enum IotDataBridgDirectionEnum implements ArrayValuable<Integer> {
public enum IotDataBridgeDirectionEnum implements ArrayValuable<Integer> {
INPUT(1), // 输入
OUTPUT(2); // 输出
private final Integer type;
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgDirectionEnum::getType).toArray(Integer[]::new);
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgeDirectionEnum::getType).toArray(Integer[]::new);
@Override
public Integer[] array() {

View File

@ -13,25 +13,26 @@ import java.util.Arrays;
*/
@RequiredArgsConstructor
@Getter
public enum IotDataBridgTypeEnum implements ArrayValuable<Integer> {
public enum IotDataBridgeTypeEnum implements ArrayValuable<Integer> {
HTTP(1),
TCP(2),
WEBSOCKET(3),
HTTP(1, "HTTP"),
TCP(2, "TCP"),
WEBSOCKET(3, "WEBSOCKET"),
MQTT(10),
MQTT(10, "MQTT"),
DATABASE(20),
REDIS_STREAM(21),
DATABASE(20, "DATABASE"),
REDIS_STREAM(21, "REDIS_STREAM"),
ROCKETMQ(30),
RABBITMQ(31),
KAFKA(32)
;
ROCKETMQ(30, "ROCKETMQ"),
RABBITMQ(31, "RABBITMQ"),
KAFKA(32, "KAFKA");
private final Integer type;
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgTypeEnum::getType).toArray(Integer[]::new);
private final String name;
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgeTypeEnum::getType).toArray(Integer[]::new);
@Override
public Integer[] array() {

View File

@ -0,0 +1,93 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule;
import cn.iocoder.yudao.framework.apilog.core.annotation.ApiAccessLog;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.framework.excel.core.util.ExcelUtils;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeRespVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.validation.Valid;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.List;
import static cn.iocoder.yudao.framework.apilog.core.enums.OperateTypeEnum.EXPORT;
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
@Tag(name = "管理后台 - IoT 数据桥梁")
@RestController
@RequestMapping("/iot/data-bridge")
@Validated
public class IotDataBridgeController {
@Resource
private IotDataBridgeService dataBridgeService;
@PostMapping("/create")
@Operation(summary = "创建IoT 数据桥梁")
@PreAuthorize("@ss.hasPermission('iot:data-bridge:create')")
public CommonResult<Long> createDataBridge(@Valid @RequestBody IotDataBridgeSaveReqVO createReqVO) {
return success(dataBridgeService.createDataBridge(createReqVO));
}
@PutMapping("/update")
@Operation(summary = "更新IoT 数据桥梁")
@PreAuthorize("@ss.hasPermission('iot:data-bridge:update')")
public CommonResult<Boolean> updateDataBridge(@Valid @RequestBody IotDataBridgeSaveReqVO updateReqVO) {
dataBridgeService.updateDataBridge(updateReqVO);
return success(true);
}
@DeleteMapping("/delete")
@Operation(summary = "删除IoT 数据桥梁")
@Parameter(name = "id", description = "编号", required = true)
@PreAuthorize("@ss.hasPermission('iot:data-bridge:delete')")
public CommonResult<Boolean> deleteDataBridge(@RequestParam("id") Long id) {
dataBridgeService.deleteDataBridge(id);
return success(true);
}
@GetMapping("/get")
@Operation(summary = "获得IoT 数据桥梁")
@Parameter(name = "id", description = "编号", required = true, example = "1024")
@PreAuthorize("@ss.hasPermission('iot:data-bridge:query')")
public CommonResult<IotDataBridgeRespVO> getDataBridge(@RequestParam("id") Long id) {
IotDataBridgeDO dataBridge = dataBridgeService.getDataBridge(id);
return success(BeanUtils.toBean(dataBridge, IotDataBridgeRespVO.class));
}
@GetMapping("/page")
@Operation(summary = "获得IoT 数据桥梁分页")
@PreAuthorize("@ss.hasPermission('iot:data-bridge:query')")
public CommonResult<PageResult<IotDataBridgeRespVO>> getDataBridgePage(@Valid IotDataBridgePageReqVO pageReqVO) {
PageResult<IotDataBridgeDO> pageResult = dataBridgeService.getDataBridgePage(pageReqVO);
return success(BeanUtils.toBean(pageResult, IotDataBridgeRespVO.class));
}
@GetMapping("/export-excel")
@Operation(summary = "导出IoT 数据桥梁 Excel")
@PreAuthorize("@ss.hasPermission('iot:data-bridge:export')")
@ApiAccessLog(operateType = EXPORT)
public void exportDataBridgeExcel(@Valid IotDataBridgePageReqVO pageReqVO,
HttpServletResponse response) throws IOException {
pageReqVO.setPageSize(PageParam.PAGE_SIZE_NONE);
List<IotDataBridgeDO> list = dataBridgeService.getDataBridgePage(pageReqVO).getList();
// 导出 Excel
ExcelUtils.write(response, "IoT 数据桥梁.xls", "数据", IotDataBridgeRespVO.class,
BeanUtils.toBean(list, IotDataBridgeRespVO.class));
}
}

View File

@ -0,0 +1,39 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.springframework.format.annotation.DateTimeFormat;
import java.time.LocalDateTime;
import static cn.iocoder.yudao.framework.common.util.date.DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND;
@Schema(description = "管理后台 - IoT 数据桥梁分页 Request VO")
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class IotDataBridgePageReqVO extends PageParam {
@Schema(description = "桥梁名称", example = "赵六")
private String name;
@Schema(description = "桥梁描述", example = "随便")
private String description;
@Schema(description = "桥梁状态", example = "2")
private Integer status;
@Schema(description = "桥梁方向")
private Integer direction;
@Schema(description = "桥梁类型", example = "1")
private Integer type;
@Schema(description = "创建时间")
@DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND)
private LocalDateTime[] createTime;
}

View File

@ -0,0 +1,57 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge;
import cn.iocoder.yudao.framework.excel.core.annotations.DictFormat;
import cn.iocoder.yudao.framework.excel.core.convert.DictConvert;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig;
import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;
import com.alibaba.excel.annotation.ExcelProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
import static cn.iocoder.yudao.module.iot.enums.DictTypeConstants.IOT_DATA_BRIDGE_DIRECTION_ENUM;
import static cn.iocoder.yudao.module.iot.enums.DictTypeConstants.IOT_DATA_BRIDGE_TYPE_ENUM;
import static cn.iocoder.yudao.module.system.enums.DictTypeConstants.COMMON_STATUS;
@Schema(description = "管理后台 - IoT 数据桥梁 Response VO")
@Data
@ExcelIgnoreUnannotated
public class IotDataBridgeRespVO {
@Schema(description = "桥梁编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "18564")
@ExcelProperty("桥梁编号")
private Long id;
@Schema(description = "桥梁名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "赵六")
@ExcelProperty("桥梁名称")
private String name;
@Schema(description = "桥梁描述", example = "随便")
@ExcelProperty("桥梁描述")
private String description;
@Schema(description = "桥梁状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "2")
@ExcelProperty(value = "桥梁状态", converter = DictConvert.class)
@DictFormat(COMMON_STATUS)
private Integer status;
@Schema(description = "桥梁方向", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty(value = "桥梁方向", converter = DictConvert.class)
@DictFormat(IOT_DATA_BRIDGE_DIRECTION_ENUM)
private Integer direction;
@Schema(description = "桥梁类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@ExcelProperty(value = "桥梁类型", converter = DictConvert.class)
@DictFormat(IOT_DATA_BRIDGE_TYPE_ENUM)
private Integer type;
@Schema(description = "桥梁配置")
@ExcelProperty("桥梁配置")
private IotDataBridgeConfig config;
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("创建时间")
private LocalDateTime createTime;
}

View File

@ -0,0 +1,39 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@Schema(description = "管理后台 - IoT 数据桥梁新增/修改 Request VO")
@Data
public class IotDataBridgeSaveReqVO {
@Schema(description = "桥梁编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "18564")
private Long id;
@Schema(description = "桥梁名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "赵六")
@NotEmpty(message = "桥梁名称不能为空")
private String name;
@Schema(description = "桥梁描述", example = "随便")
private String description;
@Schema(description = "桥梁状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "2")
@NotNull(message = "桥梁状态不能为空")
private Integer status;
@Schema(description = "桥梁方向", requiredMode = Schema.RequiredMode.REQUIRED)
@NotNull(message = "桥梁方向不能为空")
private Integer direction;
@Schema(description = "桥梁类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@NotNull(message = "桥梁类型不能为空")
private Integer type;
@Schema(description = "桥梁配置")
@NotNull(message = "桥梁配置不能为空")
private IotDataBridgeConfig config;
}

View File

@ -0,0 +1,32 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
/**
* 抽象类 IotDataBridgeConfig
*
* 用于表示数据桥梁配置数据的通用类型根据具体的 "type" 字段动态映射到对应的子类
* 提供多态支持适用于不同类型的数据结构序列化和反序列化场景
*
* @author HUIHUI
*/
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = IotDataBridgeHttpConfig.class, name = "HTTP"),
@JsonSubTypes.Type(value = IotDataBridgeKafkaMQConfig.class, name = "KAFKA"),
@JsonSubTypes.Type(value = IotDataBridgeMqttConfig.class, name = "MQTT"),
@JsonSubTypes.Type(value = IotDataBridgeRabbitMQConfig.class, name = "RABBITMQ"),
@JsonSubTypes.Type(value = IotDataBridgeRedisStreamMQConfig.class, name = "REDIS_STREAM"),
@JsonSubTypes.Type(value = IotDataBridgeRocketMQConfig.class, name = "ROCKETMQ"),
})
public abstract class IotDataBridgeConfig {
/**
* 配置类型
*/
private String type;
}

View File

@ -0,0 +1,36 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
import java.util.Map;
/**
* HTTP 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeHttpConfig extends IotDataBridgeConfig {
/**
* 请求 URL
*/
private String url;
/**
* 请求方法
*/
private String method;
/**
* 请求头
*/
private Map<String, String> headers;
/**
* 请求参数
*/
private Map<String, String> query;
/**
* 请求体
*/
private String body;
}

View File

@ -0,0 +1,35 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* Kafka 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeKafkaMQConfig extends IotDataBridgeConfig {
/**
* Kafka 服务器地址
*/
private String bootstrapServers;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 是否启用 SSL
*/
private Boolean ssl;
/**
* 主题
*/
private String topic;
}

View File

@ -0,0 +1,34 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* MQTT 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeMqttConfig extends IotDataBridgeConfig {
/**
* MQTT 服务器地址
*/
private String url;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 客户端编号
*/
private String clientId;
/**
* 主题
*/
private String topic;
}

View File

@ -0,0 +1,46 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* RabbitMQ 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeRabbitMQConfig extends IotDataBridgeConfig {
/**
* RabbitMQ 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 虚拟主机
*/
private String virtualHost;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 交换机名称
*/
private String exchange;
/**
* 路由键
*/
private String routingKey;
/**
* 队列名称
*/
private String queue;
}

View File

@ -0,0 +1,34 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* Redis Stream MQ 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeRedisStreamMQConfig extends IotDataBridgeConfig {
/**
* Redis 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 密码
*/
private String password;
/**
* 数据库索引
*/
private Integer database;
/**
* 主题
*/
private String topic;
}

View File

@ -0,0 +1,39 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* RocketMQ 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeRocketMQConfig extends IotDataBridgeConfig {
/**
* RocketMQ 名称服务器地址
*/
private String nameServer;
/**
* 访问密钥
*/
private String accessKey;
/**
* 秘密钥匙
*/
private String secretKey;
/**
* 生产者组
*/
private String group;
/**
* 主题
*/
private String topic;
/**
* 标签
*/
private String tags;
}

View File

@ -0,0 +1 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo;

View File

@ -1,22 +1,22 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.rule;
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeDirectionEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.*;
import java.util.Map;
/**
* IoT 数据桥梁 DO
*
* @author 芋道源码
*/
@TableName("iot_data_bridge")
@TableName(value = "iot_data_bridge", autoResultMap = true)
@KeySequence("iot_data_bridge_seq") // 用于 OraclePostgreSQLKingbaseDB2H2 数据库的主键自增如果是 MySQL 等数据库可不写
@Data
@EqualsAndHashCode(callSuper = true)
@ -48,14 +48,14 @@ public class IotDataBridgeDO extends BaseDO {
/**
* 桥梁方向
*
* 枚举 {@link cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgDirectionEnum}
* 枚举 {@link IotDataBridgeDirectionEnum}
*/
private Integer direction;
/**
* 桥梁类型
*
* 枚举 {@link cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum}
* 枚举 {@link IotDataBridgeTypeEnum}
*/
private Integer type;
@ -63,211 +63,6 @@ public class IotDataBridgeDO extends BaseDO {
* 桥梁配置
*/
@TableField(typeHandler = JacksonTypeHandler.class)
private Config config;
/**
* 文件客户端的配置
* 不同实现的客户端需要不同的配置通过子类来定义
*
* @author 芋道源码
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
// @JsonTypeInfo 注解的作用Jackson 多态
// 1. 序列化到时数据库时增加 @class 属性
// 2. 反序列化到内存对象时通过 @class 属性可以创建出正确的类型
public interface Config {
}
/**
* HTTP 配置
*/
@Data
public static class HttpConfig implements Config {
/**
* 请求 URL
*/
private String url;
/**
* 请求方法
*/
private String method;
/**
* 请求头
*/
private Map<String, String> headers;
/**
* 请求参数
*/
private Map<String, String> query;
/**
* 请求体
*/
private String body;
}
/**
* MQTT 配置
*/
@Data
public static class MqttConfig implements Config {
/**
* MQTT 服务器地址
*/
private String url;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 客户端编号
*/
private String clientId;
/**
* 主题
*/
private String topic;
}
/**
* RocketMQ 配置
*/
@Data
public static class RocketMQConfig implements Config {
/**
* RocketMQ 名称服务器地址
*/
private String nameServer;
/**
* 访问密钥
*/
private String accessKey;
/**
* 秘密钥匙
*/
private String secretKey;
/**
* 生产者组
*/
private String group;
/**
* 主题
*/
private String topic;
/**
* 标签
*/
private String tags;
}
/**
* Kafka 配置
*/
@Data
public static class KafkaMQConfig implements Config {
/**
* Kafka 服务器地址
*/
private String bootstrapServers;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 是否启用 SSL
*/
private Boolean ssl;
/**
* 主题
*/
private String topic;
}
/**
* RabbitMQ 配置
*/
@Data
public static class RabbitMQConfig implements Config {
/**
* RabbitMQ 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 虚拟主机
*/
private String virtualHost;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 交换机名称
*/
private String exchange;
/**
* 路由键
*/
private String routingKey;
/**
* 队列名称
*/
private String queue;
}
/**
* Redis Stream MQ 配置
*/
@Data
public static class RedisStreamMQConfig implements Config {
/**
* Redis 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 密码
*/
private String password;
/**
* 数据库索引
*/
private Integer database;
/**
* 主题
*/
private String topic;
}
private IotDataBridgeConfig config;
}

View File

@ -1,9 +1,29 @@
package cn.iocoder.yudao.module.iot.dal.mysql.rule;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import org.apache.ibatis.annotations.Mapper;
/**
* IoT 数据桥梁 Mapper
*
* @author HUIHUI
*/
@Mapper
public interface IotDataBridgeMapper extends BaseMapperX<IotDataBridgeDO> {
default PageResult<IotDataBridgeDO> selectPage(IotDataBridgePageReqVO reqVO) {
return selectPage(reqVO, new LambdaQueryWrapperX<IotDataBridgeDO>()
.likeIfPresent(IotDataBridgeDO::getName, reqVO.getName())
.likeIfPresent(IotDataBridgeDO::getDescription, reqVO.getDescription())
.eqIfPresent(IotDataBridgeDO::getStatus, reqVO.getStatus())
.eqIfPresent(IotDataBridgeDO::getDirection, reqVO.getDirection())
.eqIfPresent(IotDataBridgeDO::getType, reqVO.getType())
.betweenIfPresent(IotDataBridgeDO::getCreateTime, reqVO.getCreateTime())
.orderByDesc(IotDataBridgeDO::getId));
}
}

View File

@ -1,20 +1,54 @@
package cn.iocoder.yudao.module.iot.service.rule;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import jakarta.validation.Valid;
/**
* IoT 数据桥梁 Service 接口
* IoT 数据桥梁 Service 接口
*
* @author 芋道源码
* @author HUIHUI
*/
public interface IotDataBridgeService {
/**
* 获得指定数据桥梁
* 创建IoT 数据桥梁
*
* @param id 数据桥梁编号
* @return 数据桥梁
* @param createReqVO 创建信息
* @return 编号
*/
IotDataBridgeDO getIotDataBridge(Long id);
Long createDataBridge(@Valid IotDataBridgeSaveReqVO createReqVO);
/**
* 更新IoT 数据桥梁
*
* @param updateReqVO 更新信息
*/
void updateDataBridge(@Valid IotDataBridgeSaveReqVO updateReqVO);
/**
* 删除IoT 数据桥梁
*
* @param id 编号
*/
void deleteDataBridge(Long id);
/**
* 获得IoT 数据桥梁
*
* @param id 编号
* @return IoT 数据桥梁
*/
IotDataBridgeDO getDataBridge(Long id);
/**
* 获得IoT 数据桥梁分页
*
* @param pageReqVO 分页查询
* @return IoT 数据桥梁分页
*/
PageResult<IotDataBridgeDO> getDataBridgePage(IotDataBridgePageReqVO pageReqVO);
}

View File

@ -1,45 +1,70 @@
package cn.iocoder.yudao.module.iot.service.rule;
import cn.hutool.core.map.MapUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotDataBridgeMapper;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import java.util.Objects;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_BRIDGE_NOT_EXISTS;
/**
* IoT 数据桥梁 Service 实现类
* IoT 数据桥梁 Service 实现类
*
* @author 芋道源码
* @author HUIHUI
*/
@Service
@Validated
@Slf4j
public class IotDataBridgeServiceImpl implements IotDataBridgeService {
@Resource
private IotDataBridgeMapper dataBridgeMapper;
// TODO @芋艿临时测试
@Override
public IotDataBridgeDO getIotDataBridge(Long id) {
if (Objects.equals(id, 1L)) {
IotDataBridgeDO.HttpConfig config = new IotDataBridgeDO.HttpConfig()
.setUrl("http://127.0.0.1:48080/test")
// .setMethod("POST")
.setMethod("GET")
.setQuery(MapUtil.of("aaa", "bbb"))
.setHeaders(MapUtil.of("ccc", "ddd"))
.setBody(JsonUtils.toJsonString(MapUtil.of("eee", "fff")));
return IotDataBridgeDO.builder().id(1L).name("芋道").description("芋道源码").status(0).direction(1)
.type(IotDataBridgTypeEnum.HTTP.getType()).config(config).build();
public Long createDataBridge(IotDataBridgeSaveReqVO createReqVO) {
// 插入
IotDataBridgeDO dataBridge = BeanUtils.toBean(createReqVO, IotDataBridgeDO.class);
dataBridgeMapper.insert(dataBridge);
// 返回
return dataBridge.getId();
}
@Override
public void updateDataBridge(IotDataBridgeSaveReqVO updateReqVO) {
// 校验存在
validateDataBridgeExists(updateReqVO.getId());
// 更新
IotDataBridgeDO updateObj = BeanUtils.toBean(updateReqVO, IotDataBridgeDO.class);
dataBridgeMapper.updateById(updateObj);
}
@Override
public void deleteDataBridge(Long id) {
// 校验存在
validateDataBridgeExists(id);
// 删除
dataBridgeMapper.deleteById(id);
}
private void validateDataBridgeExists(Long id) {
if (dataBridgeMapper.selectById(id) == null) {
throw exception(DATA_BRIDGE_NOT_EXISTS);
}
}
@Override
public IotDataBridgeDO getDataBridge(Long id) {
return dataBridgeMapper.selectById(id);
}
@Override
public PageResult<IotDataBridgeDO> getDataBridgePage(IotDataBridgePageReqVO pageReqVO) {
return dataBridgeMapper.selectPage(pageReqVO);
}
}

View File

@ -22,7 +22,7 @@ public interface IotRuleSceneAction {
* 2. 非空的情况设备触发
* @param config 配置
*/
void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config);
void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception;
/**
* 获得类型

View File

@ -26,17 +26,17 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
@Resource
private IotDataBridgeService dataBridgeService;
@Resource
private List<IotDataBridgeExecute> dataBridgeExecutes;
private List<IotDataBridgeExecute<?>> dataBridgeExecutes;
@Override
public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) {
public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception {
// 1.1 如果消息为空直接返回
if (message == null) {
return;
}
// 1.2 获得数据桥梁
Assert.notNull(config.getDataBridgeId(), "数据桥梁编号不能为空");
IotDataBridgeDO dataBridge = dataBridgeService.getIotDataBridge(config.getDataBridgeId());
IotDataBridgeDO dataBridge = dataBridgeService.getDataBridge(config.getDataBridgeId());
if (dataBridge == null || dataBridge.getConfig() == null) {
log.error("[execute][message({}) config({}) 对应的数据桥梁不存在]", message, config);
return;
@ -47,7 +47,9 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
}
// 2. 执行数据桥接操作
dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge));
for (IotDataBridgeExecute<?> execute : dataBridgeExecutes) {
execute.execute(message, dataBridge);
}
}
@Override

View File

@ -1,8 +1,11 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
@ -15,22 +18,36 @@ import java.time.Duration;
/**
* 带缓存功能的数据桥梁执行器抽象类
*
* 该类提供了一个通用的缓存机制用于管理各类数据桥接的生产者(Producer)实例
*
* 主要特点:
* - 基于Guava Cache实现高效的生产者实例缓存管理
* - 自动处理生产者的生命周期创建获取关闭
* - 支持30分钟未访问自动过期清理机制
* - 异常处理与日志记录便于问题排查
*
* 子类需要实现:
* - initProducer(Config) - 初始化特定类型的生产者实例
* - closeProducer(Producer) - 关闭生产者实例并释放资源
*
* @param <Config> 配置信息类型用于初始化生产者
* @param <Producer> 生产者类型负责将数据发送到目标系统
* @author HUIHUI
*/
@Slf4j
public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute {
public abstract class AbstractCacheableDataBridgeExecute<Config, Producer> implements IotDataBridgeExecute<Config> {
// TODO @huihuiAbstractCacheableDataBridgeExecute<Producer> 这样下面的 Object, Object 就有了类型另外 IotDataBridgeDO.Config 可以替代一个 Object
/**
* Producer 缓存
*/
private final LoadingCache<Object, Object> PRODUCER_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(30))
.removalListener(notification -> {
Object producer = notification.getValue();
private final LoadingCache<Config, Producer> PRODUCER_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(30)) // 30 分钟未访问就提前过期
.removalListener((RemovalListener<Config, Producer>) notification -> {
Producer producer = notification.getValue();
if (producer == null) {
return;
}
try {
closeProducer(producer);
log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey());
@ -38,15 +55,18 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg
log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e);
}
})
.build(new CacheLoader<Object, Object>() {
.build(new CacheLoader<Config, Producer>() {
@Override
public Object load(Object config) throws Exception {
Object producer = initProducer(config);
log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
return producer;
public Producer load(Config config) throws Exception {
try {
Producer producer = initProducer(config);
log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
return producer;
} catch (Exception e) {
log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 创建启动失败]", config, e);
throw e; // 抛出异常触发缓存加载失败机制
}
}
});
/**
@ -55,7 +75,7 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg
* @param config 配置信息
* @return 生产者对象
*/
protected Object getProducer(Object config) throws Exception {
protected Producer getProducer(Config config) throws Exception {
return PRODUCER_CACHE.get(config);
}
@ -66,13 +86,29 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg
* @return 生产者对象
* @throws Exception 如果初始化失败
*/
protected abstract Object initProducer(Object config) throws Exception;
protected abstract Producer initProducer(Config config) throws Exception;
/**
* 关闭生产者
*
* @param producer 生产者对象
*/
protected abstract void closeProducer(Object producer);
protected abstract void closeProducer(Producer producer) throws Exception;
@Override
@SuppressWarnings({"unchecked"})
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
// 1.1 校验数据桥梁类型
if (!getType().equals(dataBridge.getType())) {
return;
}
// 1.2 执行对应的数据桥梁发送消息
try {
execute0(message, (Config) dataBridge.getConfig());
} catch (Exception e) {
log.error("[execute][桥梁配置 config({}) 对应的 message({}) 发送异常]", dataBridge.getConfig(), message, e);
}
}
}

View File

@ -9,9 +9,14 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
*
* @author HUIHUI
*/
public interface IotDataBridgeExecute {
public interface IotDataBridgeExecute<Config> {
// TODO @huihui要不搞个 getType然后 execute0 由子类实现这样子类的 executeRedisStream 其实就是 execute0
/**
* 获取数据桥梁类型
*
* @return 数据桥梁类型
*/
Integer getType();
/**
* 执行数据桥梁操作
@ -19,6 +24,23 @@ public interface IotDataBridgeExecute {
* @param message 设备消息
* @param dataBridge 数据桥梁
*/
void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge);
@SuppressWarnings({"unchecked"})
default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) throws Exception {
// 1.1 校验数据桥梁类型
if (!getType().equals(dataBridge.getType())) {
return;
}
// 1.2 执行对应的数据桥梁发送消息
execute0(message, (Config) dataBridge.getConfig());
}
/**
* 真正执行数据桥梁操作
*
* @param message 设备消息
* @param config 桥梁配置
*/
void execute0(IotDeviceMessage message, Config config) throws Exception;
}

View File

@ -3,8 +3,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeHttpConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@ -25,23 +25,19 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_
*/
@Component
@Slf4j
public class IotHttpDataBridgeExecute implements IotDataBridgeExecute {
public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataBridgeHttpConfig> {
@Resource
private RestTemplate restTemplate;
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
// 1.1 校验数据桥梁的类型 == HTTP
if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) {
return;
}
// 1.2 执行 HTTP 请求
executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig());
public Integer getType() {
return IotDataBridgeTypeEnum.HTTP.getType();
}
@Override
@SuppressWarnings({"unchecked", "deprecation"})
private void executeHttp(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) {
public void execute0(IotDeviceMessage message, IotDataBridgeHttpConfig config) {
String url = null;
HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
HttpEntity<String> requestEntity = null;

View File

@ -1,11 +1,13 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeKafkaMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@ -15,65 +17,53 @@ import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Kafka {@link IotDataBridgeExecute} 实现类
*
* @author HUIHUI
*/
@ConditionalOnClass(name = "org.springframework.kafka.core.KafkaTemplate")
@Component
@Slf4j
public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
public class IotKafkaMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeKafkaMQConfig, KafkaTemplate<String, String>> {
private static final Duration SEND_TIMEOUT = Duration.ofMillis(10);
private static final Duration SEND_TIMEOUT = Duration.ofMillis(10000); // 10 秒超时时间
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
// 1. 校验数据桥梁的类型 == KAFKA
if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) {
return;
}
// 2. 执行 Kafka 发送消息
executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig());
}
@SuppressWarnings("unchecked")
private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) {
try {
// 1. 获取或创建 KafkaTemplate
KafkaTemplate<String, String> kafkaTemplate = (KafkaTemplate<String, String>) getProducer(config);
// 2. 发送消息并等待结果
kafkaTemplate.send(config.getTopic(), message.toString())
.get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待
log.info("[executeKafka][message({}) 发送成功]", message);
} catch (TimeoutException e) {
log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e);
} catch (Exception e) {
log.error("[executeKafka][message({}) config({}) 发送异常]", message, config, e);
}
public Integer getType() {
return IotDataBridgeTypeEnum.KAFKA.getType();
}
@Override
protected Object initProducer(Object config) {
IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config;
public void execute0(IotDeviceMessage message, IotDataBridgeKafkaMQConfig config) throws Exception {
// 1. 获取或创建 KafkaTemplate
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
// 2. 发送消息并等待结果
kafkaTemplate.send(config.getTopic(), message.toString())
.get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待
log.info("[execute0][message({}) 发送成功]", message);
}
@Override
protected KafkaTemplate<String, String> initProducer(IotDataBridgeKafkaMQConfig config) {
// 1.1 构建生产者配置
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 1.2 如果配置了认证信息
if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) {
if (config.getUsername() != null && config.getPassword() != null) {
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";");
+ config.getUsername() + "\" password=\"" + config.getPassword() + "\";");
}
// 1.3 如果启用 SSL
if (Boolean.TRUE.equals(kafkaConfig.getSsl())) {
if (Boolean.TRUE.equals(config.getSsl())) {
props.put("security.protocol", "SSL");
}
@ -83,10 +73,8 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
}
@Override
protected void closeProducer(Object producer) {
if (producer instanceof KafkaTemplate) {
((KafkaTemplate<?, ?>) producer).destroy();
}
protected void closeProducer(KafkaTemplate<String, String> producer) {
producer.destroy();
}
// TODO @芋艿测试代码后续清理
@ -95,7 +83,7 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute();
// 2. 创建共享的配置
IotDataBridgeDO.KafkaMQConfig config = new IotDataBridgeDO.KafkaMQConfig();
IotDataBridgeKafkaMQConfig config = new IotDataBridgeKafkaMQConfig();
config.setBootstrapServers("127.0.0.1:9092");
config.setTopic("test-topic");
config.setSsl(false);
@ -117,10 +105,10 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
// 4. 执行两次测试验证缓存
log.info("[main][第一次执行,应该会创建新的 producer]");
action.executeKafka(message, config);
action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
log.info("[main][第二次执行,应该会复用缓存的 producer]");
action.executeKafka(message, config);
action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
}
}

View File

@ -1,12 +1,14 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRabbitMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@ -17,51 +19,44 @@ import java.time.LocalDateTime;
*
* @author HUIHUI
*/
@ConditionalOnClass(name = "com.rabbitmq.client.Channel")
@Component
@Slf4j
public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
public class IotRabbitMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeRabbitMQConfig, Channel> {
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
// 1.1 校验数据桥梁的类型 == RABBITMQ
if (!IotDataBridgTypeEnum.RABBITMQ.getType().equals(dataBridge.getType())) {
return;
}
// 1.2 执行 RabbitMQ 发送消息
executeRabbitMQ(message, (IotDataBridgeDO.RabbitMQConfig) dataBridge.getConfig());
public Integer getType() {
return IotDataBridgeTypeEnum.RABBITMQ.getType();
}
private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) {
try {
// 1. 获取或创建 Channel
Channel channel = (Channel) getProducer(config);
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeRabbitMQConfig config) throws Exception {
// 1. 获取或创建 Channel
Channel channel = getProducer(config);
// 2.1 声明交换机队列和绑定关系
channel.exchangeDeclare(config.getExchange(), "direct", true);
channel.queueDeclare(config.getQueue(), true, false, false, null);
channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey());
// 2.1 声明交换机队列和绑定关系
channel.exchangeDeclare(config.getExchange(), "direct", true);
channel.queueDeclare(config.getQueue(), true, false, false, null);
channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey());
// 2.2 发送消息
channel.basicPublish(config.getExchange(), config.getRoutingKey(), null,
message.toString().getBytes(StandardCharsets.UTF_8));
log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config);
} catch (Exception e) {
log.error("[executeRabbitMQ][message({}) config({}) 发送异常]", message, config, e);
}
// 2.2 发送消息
channel.basicPublish(config.getExchange(), config.getRoutingKey(), null,
message.toString().getBytes(StandardCharsets.UTF_8));
log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config);
}
@Override
@SuppressWarnings("resource")
protected Object initProducer(Object config) throws Exception {
IotDataBridgeDO.RabbitMQConfig rabbitConfig = (IotDataBridgeDO.RabbitMQConfig) config;
protected Channel initProducer(IotDataBridgeRabbitMQConfig config) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitConfig.getHost());
factory.setPort(rabbitConfig.getPort());
factory.setVirtualHost(rabbitConfig.getVirtualHost());
factory.setUsername(rabbitConfig.getUsername());
factory.setPassword(rabbitConfig.getPassword());
factory.setHost(config.getHost());
factory.setPort(config.getPort());
factory.setVirtualHost(config.getVirtualHost());
factory.setUsername(config.getUsername());
factory.setPassword(config.getPassword());
// 2. 创建连接
Connection connection = factory.newConnection();
@ -71,20 +66,13 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
}
@Override
protected void closeProducer(Object producer) {
if (producer instanceof Channel) {
try {
Channel channel = (Channel) producer;
if (channel.isOpen()) {
channel.close();
}
Connection connection = channel.getConnection();
if (connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
log.error("[closeProducer][关闭 RabbitMQ 连接异常]", e);
}
protected void closeProducer(Channel channel) throws Exception {
if (channel.isOpen()) {
channel.close();
}
Connection connection = channel.getConnection();
if (connection.isOpen()) {
connection.close();
}
}
@ -94,7 +82,7 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute();
// 2. 创建共享的配置
IotDataBridgeDO.RabbitMQConfig config = new IotDataBridgeDO.RabbitMQConfig();
IotDataBridgeRabbitMQConfig config = new IotDataBridgeRabbitMQConfig();
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("/");
@ -118,10 +106,12 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
.build();
// 4. 执行两次测试验证缓存
log.info("[main][第一次执行,应该会创建新的 channel]");
action.executeRabbitMQ(message, config);
// 4. 执行两次测试验证缓存
log.info("[main][第一次执行,应该会创建新的 producer]");
action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
log.info("[main][第二次执行,应该会复用缓存的 channel]");
action.executeRabbitMQ(message, config);
log.info("[main][第二次执行,应该会复用缓存的 producer]");
action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
}
}

View File

@ -2,8 +2,9 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRedisStreamMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
@ -29,47 +30,36 @@ import java.time.LocalDateTime;
*/
@Component
@Slf4j
public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
public class IotRedisStreamMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeRedisStreamMQConfig, RedisTemplate<String, Object>> {
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
// 1.1 校验数据桥梁类型
if (!IotDataBridgTypeEnum.REDIS_STREAM.getType().equals(dataBridge.getType())) {
return;
}
// 1.2 执行消息发送
executeRedisStream(message, (IotDataBridgeDO.RedisStreamMQConfig) dataBridge.getConfig());
}
@SuppressWarnings("unchecked")
// TODO @huihuitry catch 交给父类来做子类不处理异常
private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) {
try {
// 1. 获取 RedisTemplate
RedisTemplate<String, Object> redisTemplate = (RedisTemplate<String, Object>) getProducer(config);
// 2. 创建并发送 Stream 记录
ObjectRecord<String, IotDeviceMessage> record = StreamRecords.newRecord()
.ofObject(message).withStreamKey(config.getTopic());
String recordId = String.valueOf(redisTemplate.opsForStream().add(record));
log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config);
} catch (Exception e) {
log.error("[executeRedisStream][消息发送失败] message: {}, config: {}", message, config, e);
}
public Integer getType() {
return IotDataBridgeTypeEnum.REDIS_STREAM.getType();
}
@Override
protected Object initProducer(Object config) {
IotDataBridgeDO.RedisStreamMQConfig redisConfig = (IotDataBridgeDO.RedisStreamMQConfig) config;
public void execute0(IotDeviceMessage message, IotDataBridgeRedisStreamMQConfig config) throws Exception {
// 1. 获取 RedisTemplate
RedisTemplate<String, Object> redisTemplate = getProducer(config);
// 2. 创建并发送 Stream 记录
ObjectRecord<String, IotDeviceMessage> record = StreamRecords.newRecord()
.ofObject(message).withStreamKey(config.getTopic());
String recordId = String.valueOf(redisTemplate.opsForStream().add(record));
log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config);
}
@Override
protected RedisTemplate<String, Object> initProducer(IotDataBridgeRedisStreamMQConfig config) {
// 1.1 创建 Redisson 配置
Config redissonConfig = new Config();
SingleServerConfig serverConfig = redissonConfig.useSingleServer()
.setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort())
.setDatabase(redisConfig.getDatabase());
.setAddress("redis://" + config.getHost() + ":" + config.getPort())
.setDatabase(config.getDatabase());
// 1.2 设置密码如果有
if (StrUtil.isNotBlank(redisConfig.getPassword())) {
serverConfig.setPassword(redisConfig.getPassword());
if (StrUtil.isNotBlank(config.getPassword())) {
serverConfig.setPassword(config.getPassword());
}
// TODO @huihui看看能不能简化一些按道理说不用这么多的哈
@ -90,17 +80,10 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid
}
@Override
protected void closeProducer(Object producer) {
// TODO @huihuitry catch 交给父类来做子类不处理异常
if (producer instanceof RedisTemplate) {
RedisConnectionFactory factory = ((RedisTemplate<?, ?>) producer).getConnectionFactory();
try {
if (factory != null) {
((RedissonConnectionFactory) factory).destroy();
}
} catch (Exception e) {
log.error("[closeProducer][关闭 redisson 连接异常]", e);
}
protected void closeProducer(RedisTemplate<String, Object> producer) throws Exception {
RedisConnectionFactory factory = producer.getConnectionFactory();
if (factory != null) {
((RedissonConnectionFactory) factory).destroy();
}
}
@ -119,7 +102,7 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid
IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute();
// 2. 创建共享的配置
IotDataBridgeDO.RedisStreamMQConfig config = new IotDataBridgeDO.RedisStreamMQConfig();
IotDataBridgeRedisStreamMQConfig config = new IotDataBridgeRedisStreamMQConfig();
config.setHost("127.0.0.1");
config.setPort(6379);
config.setDatabase(0);
@ -140,11 +123,11 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid
.build();
// 4. 执行两次测试验证缓存
log.info("[main][第一次执行,应该会创建新的 RedisTemplate]");
action.executeRedisStream(message, config);
log.info("[main][第一次执行,应该会创建新的 producer]");
action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]");
action.executeRedisStream(message, config);
log.info("[main][第二次执行,应该会复用缓存的 producer]");
action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
}
}

View File

@ -1,7 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRocketMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@ -9,6 +10,7 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@ -18,58 +20,49 @@ import java.time.LocalDateTime;
*
* @author HUIHUI
*/
@ConditionalOnClass(name = "org.apache.rocketmq.client.producer.DefaultMQProducer")
@Component
@Slf4j
public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
public class IotRocketMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeRocketMQConfig, DefaultMQProducer> {
@Override
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
// 1.1 校验数据桥梁的类型 == ROCKETMQ
if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) {
return;
}
// 1.2 执行 RocketMQ 发送消息
executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig());
public Integer getType() {
return IotDataBridgeTypeEnum.ROCKETMQ.getType();
}
private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
try {
// 1. 获取或创建 Producer
DefaultMQProducer producer = (DefaultMQProducer) getProducer(config);
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeRocketMQConfig config) throws Exception {
// 1. 获取或创建 Producer
DefaultMQProducer producer = getProducer(config);
// 2.1 创建消息对象指定TopicTag和消息体
Message msg = new Message(
config.getTopic(),
config.getTags(),
message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 2.2 发送同步消息并处理结果
SendResult sendResult = producer.send(msg);
// 2.3 处理发送结果
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult);
} else {
log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult);
}
} catch (Exception e) {
log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e);
// 2.1 创建消息对象指定TopicTag和消息体
Message msg = new Message(
config.getTopic(),
config.getTags(),
message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 2.2 发送同步消息并处理结果
SendResult sendResult = producer.send(msg);
// 2.3 处理发送结果
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult);
} else {
log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult);
}
}
@Override
protected Object initProducer(Object config) throws Exception {
IotDataBridgeDO.RocketMQConfig rocketMQConfig = (IotDataBridgeDO.RocketMQConfig) config;
DefaultMQProducer producer = new DefaultMQProducer(rocketMQConfig.getGroup());
producer.setNamesrvAddr(rocketMQConfig.getNameServer());
protected DefaultMQProducer initProducer(IotDataBridgeRocketMQConfig config) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
producer.setNamesrvAddr(config.getNameServer());
producer.start();
return producer;
}
@Override
protected void closeProducer(Object producer) {
if (producer instanceof DefaultMQProducer) {
((DefaultMQProducer) producer).shutdown();
}
protected void closeProducer(DefaultMQProducer producer) {
producer.shutdown();
}
// TODO @芋艿测试代码后续清理
@ -78,7 +71,7 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute();
// 2. 创建共享的配置
IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig();
IotDataBridgeRocketMQConfig config = new IotDataBridgeRocketMQConfig();
config.setNameServer("127.0.0.1:9876");
config.setGroup("test-group");
config.setTopic("test-topic");
@ -99,10 +92,10 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
// 4. 执行两次测试验证缓存
log.info("[main][第一次执行,应该会创建新的 producer]");
action.executeRocketMQ(message, config);
action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
log.info("[main][第二次执行,应该会复用缓存的 producer]");
action.executeRocketMQ(message, config);
action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
}
}

View File

@ -114,6 +114,19 @@
<artifactId>yudao-module-iot-biz</artifactId>
<version>${revision}</version>
</dependency>
<!-- IoT 数据桥梁的执行器所需消息队列。如果您只需要使用 rocketmq 那么则注释掉其它消息队列即可 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- spring boot 配置所需依赖 -->
<dependency>