【功能新增】IoT:基于 Quartz 实现 IotSchedulerManager!为了兼容 boot 和 cloud!

This commit is contained in:
YunaiV 2025-02-04 00:12:08 +08:00
parent 2109449a89
commit 8d0caaa16c
6 changed files with 260 additions and 10 deletions

View File

@ -60,8 +60,6 @@ public class IotRuleSceneDO extends TenantBaseDO {
@TableField(typeHandler = JacksonTypeHandler.class)
private List<TriggerConfig> triggers;
// TODO @芋艿需要调研下 https://help.aliyun.com/zh/iot/user-guide/scene-orchestration-1?spm=a2c4g.11186623.help-menu-30520.d_2_4_5_0.45413908fxCSVa
/**
* 执行器数组
*/
@ -97,7 +95,7 @@ public class IotRuleSceneDO extends TenantBaseDO {
/**
* 触发条件数组
*
* {@link #type} {@link IotRuleSceneTriggerTypeEnum#DEVICE} 必填
* 必填 {@link #type} {@link IotRuleSceneTriggerTypeEnum#DEVICE}
* 条件与条件之间的关系
*/
private List<TriggerCondition> conditions;
@ -105,7 +103,7 @@ public class IotRuleSceneDO extends TenantBaseDO {
/**
* CRON 表达式
*
* {@link #type} {@link IotRuleSceneTriggerTypeEnum#TIMER} 必填
* 必填 {@link #type} {@link IotRuleSceneTriggerTypeEnum#TIMER}
*/
private String cronExpression;
@ -185,15 +183,15 @@ public class IotRuleSceneDO extends TenantBaseDO {
/**
* 设备控制
*
* {@link #type} {@link IotRuleSceneActionTypeEnum#DEVICE_CONTROL} 必填
* 必填 {@link #type} {@link IotRuleSceneActionTypeEnum#DEVICE_CONTROL}
*/
private ActionDeviceControl deviceControl;
/**
* 数据桥接编号
*
* {@link #type} {@link IotRuleSceneActionTypeEnum#DATA_BRIDGE} 必填
* TODO 芋艿关联
* 必填 {@link #type} {@link IotRuleSceneActionTypeEnum#DATA_BRIDGE}
* 关联{@link IotDataBridgeDO#getId()}
*/
private Long dataBridgeId;

View File

@ -0,0 +1,24 @@
package cn.iocoder.yudao.module.iot.framework.job.config;
import cn.iocoder.yudao.module.iot.framework.job.core.IotSchedulerManager;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
/**
* IoT 模块的 Job 自动配置类
*
* @author 芋道源码
*/
@Configuration
public class IotJobConfiguration {
@Bean(initMethod = "start", destroyMethod = "stop")
public IotSchedulerManager iotSchedulerManager(DataSource dataSource,
ApplicationContext applicationContext) {
return new IotSchedulerManager(dataSource, applicationContext);
}
}

View File

@ -0,0 +1,191 @@
package cn.iocoder.yudao.module.iot.framework.job.core;
import cn.iocoder.yudao.framework.quartz.core.enums.JobDataKeyEnum;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Properties;
/**
* IoT 模块的 Scheduler 管理类基于 Quartz 实现
*
* 疑问为什么 IoT 模块不复用全局的 SchedulerManager
* 回复yudao-cloud 项目使用的是 XXL-Job 作为调度中心无法动态添加任务
*
* @author 芋道源码
*/
@Slf4j
public class IotSchedulerManager {
private static final String SCHEDULER_NAME = "iotScheduler";
private final SchedulerFactoryBean schedulerFactoryBean;
private Scheduler scheduler;
public IotSchedulerManager(DataSource dataSource,
ApplicationContext applicationContext) {
// 1. 参考 SchedulerFactoryBean
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
schedulerFactoryBean.setJobFactory(jobFactory);
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.setSchedulerName(SCHEDULER_NAME);
schedulerFactoryBean.setDataSource(dataSource);
schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(true);
Properties properties = new Properties();
schedulerFactoryBean.setQuartzProperties(properties);
// 2. 参考 application-local.yaml 配置文件
// 2.1 Scheduler 相关配置
properties.put("org.quartz.scheduler.instanceName", SCHEDULER_NAME);
properties.put("org.quartz.scheduler.instanceId", "AUTO");
// 2.2 JobStore 相关配置
properties.put("org.quartz.jobStore.class", "org.springframework.scheduling.quartz.LocalDataSourceJobStore");
properties.put("org.quartz.jobStore.isClustered", "true");
properties.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
properties.put("org.quartz.jobStore.misfireThreshold", "60000");
// 2.3 线程池相关配置
properties.put("org.quartz.threadPool.threadCount", "25");
properties.put("org.quartz.threadPool.threadPriority", "5");
properties.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
this.schedulerFactoryBean = schedulerFactoryBean;
}
public void start() throws Exception {
log.info("[start][Scheduler 初始化开始]");
// 初始化
schedulerFactoryBean.afterPropertiesSet();
schedulerFactoryBean.start();
// 获得 Scheduler 对象
this.scheduler = schedulerFactoryBean.getScheduler();
log.info("[start][Scheduler 初始化完成]");
}
public void stop() {
log.info("[stop][Scheduler 关闭开始]");
schedulerFactoryBean.stop();
this.scheduler = null;
log.info("[stop][Scheduler 关闭完成]");
}
// ========== 参考 SchedulerManager 实现 ==========
/**
* 添加或更新 Job Quartz
*
* @param jobClass 任务处理器的类
* @param jobHandlerName 任务处理器的名字
* @param cronExpression CRON 表达式
* @param jobDataMap 任务数据
* @throws SchedulerException 添加异常
*/
public void addOrUpdateJob(Class <? extends Job> jobClass, String jobHandlerName,
String cronExpression, Map<String, Object> jobDataMap)
throws SchedulerException {
if (scheduler.checkExists(new JobKey(jobHandlerName))) {
this.updateJob(jobHandlerName, cronExpression);
} else {
this.addJob(jobClass, jobHandlerName, cronExpression, jobDataMap);
}
}
/**
* 添加 Job Quartz
*
* @param jobClass 任务处理器的类
* @param jobHandlerName 任务处理器的名字
* @param cronExpression CRON 表达式
* @param jobDataMap 任务数据
* @throws SchedulerException 添加异常
*/
public void addJob(Class <? extends Job> jobClass, String jobHandlerName,
String cronExpression, Map<String, Object> jobDataMap)
throws SchedulerException {
// 创建 JobDetail 对象
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.usingJobData(new JobDataMap(jobDataMap))
.withIdentity(jobHandlerName).build();
// 创建 Trigger 对象
Trigger trigger = this.buildTrigger(jobHandlerName, cronExpression);
// 新增 Job 调度
scheduler.scheduleJob(jobDetail, trigger);
}
/**
* 更新 Job Quartz
*
* @param jobHandlerName 任务处理器的名字
* @param cronExpression CRON 表达式
* @throws SchedulerException 更新异常
*/
public void updateJob(String jobHandlerName, String cronExpression)
throws SchedulerException {
// 创建新 Trigger 对象
Trigger newTrigger = this.buildTrigger(jobHandlerName, cronExpression);
// 修改调度
scheduler.rescheduleJob(new TriggerKey(jobHandlerName), newTrigger);
}
/**
* 删除 Quartz 中的 Job
*
* @param jobHandlerName 任务处理器的名字
* @throws SchedulerException 删除异常
*/
public void deleteJob(String jobHandlerName) throws SchedulerException {
// 暂停 Trigger 对象
scheduler.pauseTrigger(new TriggerKey(jobHandlerName));
// 取消并删除 Job 调度
scheduler.unscheduleJob(new TriggerKey(jobHandlerName));
scheduler.deleteJob(new JobKey(jobHandlerName));
}
/**
* 暂停 Quartz 中的 Job
*
* @param jobHandlerName 任务处理器的名字
* @throws SchedulerException 暂停异常
*/
public void pauseJob(String jobHandlerName) throws SchedulerException {
scheduler.pauseJob(new JobKey(jobHandlerName));
}
/**
* 启动 Quartz 中的 Job
*
* @param jobHandlerName 任务处理器的名字
* @throws SchedulerException 启动异常
*/
public void resumeJob(String jobHandlerName) throws SchedulerException {
scheduler.resumeJob(new JobKey(jobHandlerName));
scheduler.resumeTrigger(new TriggerKey(jobHandlerName));
}
/**
* 立即触发一次 Quartz 中的 Job
*
* @param jobHandlerName 任务处理器的名字
* @throws SchedulerException 触发异常
*/
public void triggerJob(String jobHandlerName)
throws SchedulerException {
// 触发任务
JobDataMap data = new JobDataMap();
data.put(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName);
scheduler.triggerJob(new JobKey(jobHandlerName), data);
}
private Trigger buildTrigger(String jobHandlerName, String cronExpression) {
return TriggerBuilder.newTrigger()
.withIdentity(jobHandlerName)
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
}
}

View File

@ -23,6 +23,11 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.rule.action.IotRuleSceneAction;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
@ -330,4 +335,28 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService {
});
}
// TODO @芋艿测试思路代码记得删除
// 1. Job IotRuleSceneJob
// 2. 参数id
// 3. jobHandlerNameIotRuleSceneJob + id
// 新增addJob
// 修改不存在 addJob存在 updateJob
// + 禁用1存在停止2不存在不处理TODO 测试直接暂停是否可以结论可以
// + 开启1存在更新2不存在新增
// + 禁用开启1存在删除TODO 测试直接删除结论可以
public static void main2(String[] args) throws SchedulerException {
// System.out.println(QuartzJobBean.class);
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.start();
String jobHandlerName = "123";
// 暂停 Trigger 对象
scheduler.pauseTrigger(new TriggerKey(jobHandlerName));
// 取消并删除 Job 调度
scheduler.unscheduleJob(new TriggerKey(jobHandlerName));
scheduler.deleteJob(new JobKey(jobHandlerName));
}
}

View File

@ -4,6 +4,8 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import javax.annotation.Nullable;
/**
* IOT 规则场景的场景执行器接口
*
@ -14,10 +16,12 @@ public interface IotRuleSceneAction {
/**
* 执行场景
*
* @param message 消息
* @param message 消息允许空
* 1. 空的情况定时触发
* 2. 非空的情况设备触发
* @param config 配置
*/
void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config);
void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config);
/**
* 获得类型

View File

@ -43,7 +43,11 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
@Override
public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) {
// 1. 获得数据桥梁
// 1.1 如果消息为空直接返回
if (message == null) {
return;
}
// 1.2 获得数据桥梁
Assert.notNull(config.getDataBridgeId(), "数据桥梁编号不能为空");
IotDataBridgeDO dataBridge = dataBridgeService.getIotDataBridge(config.getDataBridgeId());
if (dataBridge == null || dataBridge.getConfig() == null) {