使用ThreadPoolTaskScheduler动态修改调度时间
2024-09-03 08:29:47
用
SchedulingConfigurer
接口只能统一修改,要分开控制的话有多少个job就要有多少个实现。比较麻烦
配置线程池ThreadPoolTaskScheduler
@Configuration
public class JobConfig {
@Bean("taskExecutor")
public ThreadPoolTaskScheduler taskExecutor() {
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
executor.setPoolSize(20);
executor.setThreadNamePrefix("taskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(300);
return executor;
}
}
封装实现
@Component
@Slf4j
public class Jobs {
/**
* key josId
* value ScheduledFuture object
*/
private final ConcurrentHashMap<Integer, ScheduledFuture> jobsMap = new ConcurrentHashMap<>(8);
private final ApplicationContext applicationContext;
private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
private final EcoScheduleJobMapper scheduleJobMapper;
public Jobs(ApplicationContext applicationContext, EcoScheduleJobMapper scheduleJobMapper, ThreadPoolTaskScheduler threadPoolTaskScheduler) {
this.applicationContext = applicationContext;
this.scheduleJobMapper = scheduleJobMapper;
this.threadPoolTaskScheduler = threadPoolTaskScheduler;
init();
}
public Map<Integer, ScheduledFuture> getJobsMap() {
return jobsMap;
}
public void start(Integer jobId) {
if (jobsMap.containsKey(jobId)) {
log.debug("job:{} is running", jobId);
// running...
return;
}
executeJob(selectJobsDetail(jobId));
}
public void stop(Integer jobId) {
ScheduledFuture scheduledFuture = jobsMap.get(jobId);
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
log.debug("job cancel:{}", scheduledFuture);
} else
log.debug("jobId:{} is not running ", jobId);
}
private EcoScheduleJob selectJobsDetail(Integer jobId) {
return scheduleJobMapper.selectByPrimaryKey(jobId);
}
// 修改corn配置,先取消再重新启动
public void reset(Integer jobId) {
ScheduledFuture scheduledFuture = jobsMap.get(jobId);
EcoScheduleJob scheduleJob = selectJobsDetail(jobId);
if (Objects.nonNull(scheduledFuture)) {
scheduledFuture.cancel(true);
log.debug("job:{} cancel success", scheduleJob);
}
if (Objects.nonNull(scheduleJob)) {
executeJob(scheduleJob);
log.debug("job:{} start success", scheduleJob);
}
}
// 启动初始化启动所有job
private void init() {
// 从数据库获取任务信息
List<EcoScheduleJob> jobs = scheduleJobMapper.selectJobs();
if (CollectionUtils.isNotEmpty(jobs)) {
jobs.forEach(this::executeJob);
}
}
private void executeJob(EcoScheduleJob job) {
Runnable runnable = null;
try {
Class<?> clazz = Class.forName(job.getClassName());
Constructor<?>[] declaredConstructors = clazz.getDeclaredConstructors();
if (!Arrays.isNullOrEmpty(declaredConstructors)) {
// 只取一个构造器
if (declaredConstructors.length > 1) {
log.warn("{} 存在多个构造器,只能有一个", clazz);
return;
}
Constructor<?> constructor = declaredConstructors[0];
Class<?>[] parameterTypes = constructor.getParameterTypes();
if (parameterTypes.length == 0) {
log.warn("{} 无参构造器", clazz);
return;
}
Class<?> parameterType = parameterTypes[0];
Object bean = applicationContext.getBean(parameterType);
runnable = (Runnable) constructor.newInstance(bean);
}
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
log.error("任务调度失败:{}", e);
}
if (Objects.nonNull(runnable)) {
ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(runnable, new CronTrigger(job.getCron()));
jobsMap.put(job.getJobId(), schedule);
log.debug("job:{} is running...", job);
} else {
log.error("任务调度失败 job Class is null");
}
}
}
pojo
@Table(name = "`eco_schedule_job`")
@Data
public class EcoScheduleJob implements Serializable {
/**
* pk
*/
@Id
@Column(name = "`job_id`")
private Integer jobId;
/**
* 类名
*/
@Column(name = "`class_name`")
private String className;
/**
* cron表达式
*/
@Column(name = "`cron`")
private String cron;
/**
* 状态 1 停用 0 启动
*/
@Column(name = "`status`")
private String status;
}
controller
@RequestMapping("/job")
@RestController
public class ScheduleJobController {
@Reference
private ScheduleJobService scheduleJobService;
@GetMapping("/status.do")
public Object status() {
return scheduleJobService.status();
}
@GetMapping("/start.do")
public Object start(@RequestParam Integer jobId) {
scheduleJobService.start(jobId);
return Response.OK();
}
@GetMapping("/stop.do")
public Object stop(@RequestParam Integer jobId) {
scheduleJobService.stop(jobId);
return Response.OK();
}
@GetMapping("/update.do")
public Object update(@RequestParam Integer jobId, @RequestParam String cron) {
scheduleJobService.update(jobId, cron);
return Response.OK();
}
}
service
@Service
public class ScheduleJobServiceImpl implements ScheduleJobService {
private final EcoScheduleJobMapper ecoScheduleJobMapper;
private final Jobs jobs;
public ScheduleJobServiceImpl(EcoScheduleJobMapper ecoScheduleJobMapper , Jobs jobs) {
this.ecoScheduleJobMapper = ecoScheduleJobMapper;
this.jobs = jobs;
}
@Override
public void update(Integer jobId, String cron) {
EcoScheduleJob scheduleJob = new EcoScheduleJob();
scheduleJob.setJobId(jobId);
scheduleJob.setCron(cron);
ecoScheduleJobMapper.updateByPrimaryKeySelective(scheduleJob);
jobs.reset(jobId);
}
@Override
public void stop(Integer jobId) {
jobs.stop(jobId);
}
@Override
public void start(Integer jobId) {
jobs.start(jobId);
}
@Override
public List<EcoScheduleJob> status() {
Set<Integer> integers = jobs.getJobsMap().keySet();
String join = StringUtils.join(integers, ",");
return ecoScheduleJobMapper.selectByIds(join);
}
}
最新文章
- 兼容javascript和C#的RSA加密解密算法,对web提交的数据进行加密传输
- Android监听系统短信数据库变化-提取短信内容
- python selenium中使用ddt进行数据驱动测试
- 【转载】为什么V8引擎这么快?
- zabbix 3.0 安装 ubuntu环境
- org.apache.cxf.transport.servlet.CXFServlet cannot be cast to javax.servlet.Servlet
- IIS 7.0 下 httpMoudle 失效的问题
- EF如何正确的进行实体中修改
- (转载)Linux中cp直接覆盖不提示的方法
- CSDN CODE平台,中国版Github简要使用说明
- C#实现无边框窗体点击任务栏图标正常最小化和还原
- 多线程面试题系列(8):经典线程同步 信号量Semaphore
- spring mvc 中自定义404页面在IE中无法显示favicon.ico问题的解决方法。
- Chocolatey 和 Redis windows安装记录
- 关于springMVC中component-scan的问题以及springmvc.xml整理
- DataPipeline | 享物说产品负责人夏凯:数据驱动的用户增长实战
- [CodeForces - 614B] B - Gena&#39;s Code
- 【亲测有效】Win10家庭版Microsoft Edge页面出现乱码的两种解决方案及gpedit.msc命令无法使用的解决策略
- PHP 数值处理的几种常用的方法
- SQL AND &;amp; OR 运算符