From 8d0caaa16c68e9fe9718f524a2145051c4aed46c Mon Sep 17 00:00:00 2001 From: YunaiV Date: Tue, 4 Feb 2025 00:12:08 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E3=80=91IoT=EF=BC=9A=E5=9F=BA=E4=BA=8E=20Quartz=20=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=20IotSchedulerManager=EF=BC=81=E4=B8=BA=E4=BA=86?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=20boot=20=E5=92=8C=20cloud=EF=BC=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dal/dataobject/rule/IotRuleSceneDO.java | 12 +- .../job/config/IotJobConfiguration.java | 24 +++ .../job/core/IotSchedulerManager.java | 191 ++++++++++++++++++ .../service/rule/IotRuleSceneServiceImpl.java | 29 +++ .../rule/action/IotRuleSceneAction.java | 8 +- .../action/IotRuleSceneDataBridgeAction.java | 6 +- 6 files changed, 260 insertions(+), 10 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/job/config/IotJobConfiguration.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/job/core/IotSchedulerManager.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotRuleSceneDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotRuleSceneDO.java index 88df1946e0..3c6ae6288e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotRuleSceneDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotRuleSceneDO.java @@ -60,8 +60,6 @@ public class IotRuleSceneDO extends TenantBaseDO { @TableField(typeHandler = JacksonTypeHandler.class) private List triggers; - // TODO @芋艿:需要调研下 https://help.aliyun.com/zh/iot/user-guide/scene-orchestration-1?spm=a2c4g.11186623.help-menu-30520.d_2_4_5_0.45413908fxCSVa - /** * 执行器数组 */ @@ -97,7 +95,7 @@ public class IotRuleSceneDO extends TenantBaseDO { /** * 触发条件数组 * - * 当 {@link #type} 为 {@link IotRuleSceneTriggerTypeEnum#DEVICE} 时,必填 + * 必填:当 {@link #type} 为 {@link IotRuleSceneTriggerTypeEnum#DEVICE} 时 * 条件与条件之间,是“或”的关系 */ private List conditions; @@ -105,7 +103,7 @@ public class IotRuleSceneDO extends TenantBaseDO { /** * CRON 表达式 * - * 当 {@link #type} 为 {@link IotRuleSceneTriggerTypeEnum#TIMER} 时,必填 + * 必填:当 {@link #type} 为 {@link IotRuleSceneTriggerTypeEnum#TIMER} 时 */ private String cronExpression; @@ -185,15 +183,15 @@ public class IotRuleSceneDO extends TenantBaseDO { /** * 设备控制 * - * 当 {@link #type} 为 {@link IotRuleSceneActionTypeEnum#DEVICE_CONTROL} 时,必填 + * 必填:当 {@link #type} 为 {@link IotRuleSceneActionTypeEnum#DEVICE_CONTROL} 时 */ private ActionDeviceControl deviceControl; /** * 数据桥接编号 * - * 当 {@link #type} 为 {@link IotRuleSceneActionTypeEnum#DATA_BRIDGE} 时,必填 - * TODO 芋艿:关联 + * 必填:当 {@link #type} 为 {@link IotRuleSceneActionTypeEnum#DATA_BRIDGE} 时 + * 关联:{@link IotDataBridgeDO#getId()} */ private Long dataBridgeId; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/job/config/IotJobConfiguration.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/job/config/IotJobConfiguration.java new file mode 100644 index 0000000000..7cd6f0961a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/job/config/IotJobConfiguration.java @@ -0,0 +1,24 @@ +package cn.iocoder.yudao.module.iot.framework.job.config; + +import cn.iocoder.yudao.module.iot.framework.job.core.IotSchedulerManager; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.sql.DataSource; + +/** + * IoT 模块的 Job 自动配置类 + * + * @author 芋道源码 + */ +@Configuration +public class IotJobConfiguration { + + @Bean(initMethod = "start", destroyMethod = "stop") + public IotSchedulerManager iotSchedulerManager(DataSource dataSource, + ApplicationContext applicationContext) { + return new IotSchedulerManager(dataSource, applicationContext); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/job/core/IotSchedulerManager.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/job/core/IotSchedulerManager.java new file mode 100644 index 0000000000..89eaaed1f4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/job/core/IotSchedulerManager.java @@ -0,0 +1,191 @@ +package cn.iocoder.yudao.module.iot.framework.job.core; + +import cn.iocoder.yudao.framework.quartz.core.enums.JobDataKeyEnum; +import lombok.extern.slf4j.Slf4j; +import org.quartz.*; +import org.springframework.context.ApplicationContext; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.scheduling.quartz.SpringBeanJobFactory; + +import javax.sql.DataSource; +import java.util.Map; +import java.util.Properties; + +/** + * IoT 模块的 Scheduler 管理类,基于 Quartz 实现 + * + * 疑问:为什么 IoT 模块不复用全局的 SchedulerManager 呢? + * 回复:yudao-cloud 项目,使用的是 XXL-Job 作为调度中心,无法动态添加任务。 + * + * @author 芋道源码 + */ +@Slf4j +public class IotSchedulerManager { + + private static final String SCHEDULER_NAME = "iotScheduler"; + + private final SchedulerFactoryBean schedulerFactoryBean; + + private Scheduler scheduler; + + public IotSchedulerManager(DataSource dataSource, + ApplicationContext applicationContext) { + // 1. 参考 SchedulerFactoryBean 类 + SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); + SpringBeanJobFactory jobFactory = new SpringBeanJobFactory(); + jobFactory.setApplicationContext(applicationContext); + schedulerFactoryBean.setJobFactory(jobFactory); + schedulerFactoryBean.setAutoStartup(true); + schedulerFactoryBean.setSchedulerName(SCHEDULER_NAME); + schedulerFactoryBean.setDataSource(dataSource); + schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(true); + Properties properties = new Properties(); + schedulerFactoryBean.setQuartzProperties(properties); + // 2. 参考 application-local.yaml 配置文件 + // 2.1 Scheduler 相关配置 + properties.put("org.quartz.scheduler.instanceName", SCHEDULER_NAME); + properties.put("org.quartz.scheduler.instanceId", "AUTO"); + // 2.2 JobStore 相关配置 + properties.put("org.quartz.jobStore.class", "org.springframework.scheduling.quartz.LocalDataSourceJobStore"); + properties.put("org.quartz.jobStore.isClustered", "true"); + properties.put("org.quartz.jobStore.clusterCheckinInterval", "15000"); + properties.put("org.quartz.jobStore.misfireThreshold", "60000"); + // 2.3 线程池相关配置 + properties.put("org.quartz.threadPool.threadCount", "25"); + properties.put("org.quartz.threadPool.threadPriority", "5"); + properties.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); + this.schedulerFactoryBean = schedulerFactoryBean; + } + + public void start() throws Exception { + log.info("[start][Scheduler 初始化开始]"); + // 初始化 + schedulerFactoryBean.afterPropertiesSet(); + schedulerFactoryBean.start(); + // 获得 Scheduler 对象 + this.scheduler = schedulerFactoryBean.getScheduler(); + log.info("[start][Scheduler 初始化完成]"); + } + + public void stop() { + log.info("[stop][Scheduler 关闭开始]"); + schedulerFactoryBean.stop(); + this.scheduler = null; + log.info("[stop][Scheduler 关闭完成]"); + } + + // ========== 参考 SchedulerManager 实现 ========== + + /** + * 添加或更新 Job 到 Quartz 中 + * + * @param jobClass 任务处理器的类 + * @param jobHandlerName 任务处理器的名字 + * @param cronExpression CRON 表达式 + * @param jobDataMap 任务数据 + * @throws SchedulerException 添加异常 + */ + public void addOrUpdateJob(Class jobClass, String jobHandlerName, + String cronExpression, Map jobDataMap) + throws SchedulerException { + if (scheduler.checkExists(new JobKey(jobHandlerName))) { + this.updateJob(jobHandlerName, cronExpression); + } else { + this.addJob(jobClass, jobHandlerName, cronExpression, jobDataMap); + } + } + + /** + * 添加 Job 到 Quartz 中 + * + * @param jobClass 任务处理器的类 + * @param jobHandlerName 任务处理器的名字 + * @param cronExpression CRON 表达式 + * @param jobDataMap 任务数据 + * @throws SchedulerException 添加异常 + */ + public void addJob(Class jobClass, String jobHandlerName, + String cronExpression, Map jobDataMap) + throws SchedulerException { + // 创建 JobDetail 对象 + JobDetail jobDetail = JobBuilder.newJob(jobClass) + .usingJobData(new JobDataMap(jobDataMap)) + .withIdentity(jobHandlerName).build(); + // 创建 Trigger 对象 + Trigger trigger = this.buildTrigger(jobHandlerName, cronExpression); + // 新增 Job 调度 + scheduler.scheduleJob(jobDetail, trigger); + } + + /** + * 更新 Job 到 Quartz + * + * @param jobHandlerName 任务处理器的名字 + * @param cronExpression CRON 表达式 + * @throws SchedulerException 更新异常 + */ + public void updateJob(String jobHandlerName, String cronExpression) + throws SchedulerException { + // 创建新 Trigger 对象 + Trigger newTrigger = this.buildTrigger(jobHandlerName, cronExpression); + // 修改调度 + scheduler.rescheduleJob(new TriggerKey(jobHandlerName), newTrigger); + } + + /** + * 删除 Quartz 中的 Job + * + * @param jobHandlerName 任务处理器的名字 + * @throws SchedulerException 删除异常 + */ + public void deleteJob(String jobHandlerName) throws SchedulerException { + // 暂停 Trigger 对象 + scheduler.pauseTrigger(new TriggerKey(jobHandlerName)); + // 取消并删除 Job 调度 + scheduler.unscheduleJob(new TriggerKey(jobHandlerName)); + scheduler.deleteJob(new JobKey(jobHandlerName)); + } + + /** + * 暂停 Quartz 中的 Job + * + * @param jobHandlerName 任务处理器的名字 + * @throws SchedulerException 暂停异常 + */ + public void pauseJob(String jobHandlerName) throws SchedulerException { + scheduler.pauseJob(new JobKey(jobHandlerName)); + } + + /** + * 启动 Quartz 中的 Job + * + * @param jobHandlerName 任务处理器的名字 + * @throws SchedulerException 启动异常 + */ + public void resumeJob(String jobHandlerName) throws SchedulerException { + scheduler.resumeJob(new JobKey(jobHandlerName)); + scheduler.resumeTrigger(new TriggerKey(jobHandlerName)); + } + + /** + * 立即触发一次 Quartz 中的 Job + * + * @param jobHandlerName 任务处理器的名字 + * @throws SchedulerException 触发异常 + */ + public void triggerJob(String jobHandlerName) + throws SchedulerException { + // 触发任务 + JobDataMap data = new JobDataMap(); + data.put(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName); + scheduler.triggerJob(new JobKey(jobHandlerName), data); + } + + private Trigger buildTrigger(String jobHandlerName, String cronExpression) { + return TriggerBuilder.newTrigger() + .withIdentity(jobHandlerName) + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) + .build(); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneServiceImpl.java index fe9cd636b0..71c080d6a0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneServiceImpl.java @@ -23,6 +23,11 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.rule.action.IotRuleSceneAction; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.TriggerKey; +import org.quartz.impl.StdSchedulerFactory; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; @@ -330,4 +335,28 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService { }); } + // TODO @芋艿:测试思路代码,记得删除!!! + // 1. Job 类:IotRuleSceneJob + // 2. 参数:id + // 3. jobHandlerName:IotRuleSceneJob + id + + // 新增:addJob + // 修改:不存在 addJob、存在 updateJob + // 有 + 禁用:1)存在、停止;2)不存在:不处理;TODO 测试:直接暂停,是否可以???(结论:可以) + // 有 + 开启:1)存在,更新;2)不存在,新增; + // 无 + 禁用、开启:1)存在,删除;TODO 测试:直接删除???(结论:可以) + + public static void main2(String[] args) throws SchedulerException { +// System.out.println(QuartzJobBean.class); + Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); + scheduler.start(); + + String jobHandlerName = "123"; + // 暂停 Trigger 对象 + scheduler.pauseTrigger(new TriggerKey(jobHandlerName)); + // 取消并删除 Job 调度 + scheduler.unscheduleJob(new TriggerKey(jobHandlerName)); + scheduler.deleteJob(new JobKey(jobHandlerName)); + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java index 04020c1760..1cb17c9b44 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java @@ -4,6 +4,8 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import javax.annotation.Nullable; + /** * IOT 规则场景的场景执行器接口 * @@ -14,10 +16,12 @@ public interface IotRuleSceneAction { /** * 执行场景 * - * @param message 消息 + * @param message 消息,允许空 + * 1. 空的情况:定时触发 + * 2. 非空的情况:设备触发 * @param config 配置 */ - void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config); + void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config); /** * 获得类型 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index eb50fb5df1..f7f3747e78 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -43,7 +43,11 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { @Override public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) { - // 1. 获得数据桥梁 + // 1.1 如果消息为空,直接返回 + if (message == null) { + return; + } + // 1.2 获得数据桥梁 Assert.notNull(config.getDataBridgeId(), "数据桥梁编号不能为空"); IotDataBridgeDO dataBridge = dataBridgeService.getIotDataBridge(config.getDataBridgeId()); if (dataBridge == null || dataBridge.getConfig() == null) {