概述

工作的过程中有使用过quartz进行作业的调度,通常情况下我们是可以使用jdk自带的调度来实现一些简单的调度,不过 当需要使用比较高级的调度功能的时候,jdk自带的调度功能就没有办法满足了,比如我们希望只有周末或者月末不进行调度,节假日 不进行调度等。使用之余自然也对其调度的原理比较感兴趣,因此花了点时间来进行学习源码。

详解

quickstart

我们首先通过一个示例来演示一下quartz如何实现作业的调度,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.hw;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

public class QuickStart {

public static void main(String[] args) throws Exception {

Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
JobDetail jobDetail = JobBuilder.newJob(StartJob.class).withIdentity("quick start", "wes").build();
Trigger trigger = TriggerBuilder.newTrigger().withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(10)).startNow().build();
scheduler.start();
scheduler.scheduleJob(jobDetail, trigger);
Thread.currentThread().join();
}

public static class StartJob implements Job {

@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("quick start!");
}
}
}

上面的代码最终是以10s中执行一次StartJob,具体输出如下:

1
2
3
4
5
quick start!
quick start!
quick start!
quick start!
quick start!

框架下的实体,及实体之间的关系

在介绍quartz之前我们先来看一下其中存在的一些实体概念:

  • JobDetail表示一个具体的可执行的调度程序,Job 是这个可执行程调度程序所要执行的内容,另外 JobDetail 还包含了这个任务调度的方案和策略。
  • Trigger代表一个调度参数的配置,什么时候去调。
  • Scheduler代表一个调度容器,一个调度容器中可以注册多个 JobDetail 和 Trigger。当 Trigger 与 JobDetail 组合,就可以被 Scheduler 容器调度了。

下面我们用一张图来演示一下这些实体之间的关系(此图来源于网上),如下:

上图中最核心的应该就是scheduler了,这是整个调度的核心,触发调度的核心就是scheduler.start(),接下来的分析就从这里开始。

quartz调度的实现

scheduler.start()方法执行会进入如下代码:

1
2
3
4
5
private QuartzScheduler sched;

public void start() throws SchedulerException {
sched.start();
}

上面的代码我们看到stdScheduler会调用QuartzScheduler的start方法,这可以说明我们的StdScheduler其实是QuartzScheduler的一个代理。 接下来我们看一下QuartzScheduler的start方法执行了什么操作吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void start() throws SchedulerException {

....
// 在启动QuartzScheduler之前,通知到当前调度器的listener
notifySchedulerListenersStarting();

....
// 将pause标志位改为false
schedThread.togglePause(false);

...
// 通知listener,scheduler完成启动
notifySchedulerListenersStarted();
}

上面的代码我省略了部分不必要的代码,将流程大致分为三个步骤,分别是:

  • 通知当前scheduler的listener该scheduler开始启动
  • 将调度线程的pause标志位改为false
  • 最后在完成启动的之后再次通知对应的listener已经完成调度器的启动

这里的listener和tomcat的listener机制很相似,其为框架提供了一种可扩展的机制。上面代码中的核心步骤就是 schedThread.togglePause(false)这一行,接下来我们跟进一下,看这一步究竟做了什么工作吧:

1
2
3
4
5
6
7
8
9
10
11
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;

if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
}

上面的代码中我们首先会获取当前对象的一个锁,然后sigLock来实现多线程的同步操作。在初始阶段,代码的执行会走到下面sigLock.notifyAll(); ,这一步的操作就是通知所有等待在该对象上的线程进行一些操作,我们接下来看一下在该对象上等待的线程在被唤醒前和唤醒后分别执行了什么操作吧, 接下来我们就不绕弯子了,直接进入正题QuartzScheduler的run方法,代码有点长:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public void run() {
int acquiresFailed = 0;

// 等待scheduler将pause置为false,这个是通过scheduler.start方法来设定的
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}

// reset failure counter when paused, so that we don't
// wait again after unpausing
acquiresFailed = 0;
}

if (halted.get()) {
break;
}
}

.....

// 阻塞的方式获取当前可用的线程的数量,如果获取不到则会一直阻塞
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

List<OperableTrigger> triggers;

long now = System.currentTimeMillis();

clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
}

if (triggers != null && !triggers.isEmpty()) {

.....

// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
// 获取当前trigers对应的作业的详细信息
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
}
......
}

for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();

.....

// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// 使用JobShell封装代执行的作业
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
// 将作业丢到线程池中执行并返回,注意这里并不是阻塞调用,有返回值主要是为了判断将作业提交到线程池的时候是否有异常
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}

}

continue; // while (!halted)
}
}
.....
}

上面代码整体有点长,这里我删除了不算太重要的内容,从流程上来看的话也还算比较简单,代码中已经备注了相关的信息:

  • 等待scheduler将pause置为false,这个是通过scheduler.start方法来设定的
  • 阻塞的方式获取当前可用的线程的数量,如果获取不到则会一直阻塞
  • 获取当前triggers,以及其对应的作业的详细信息
  • 使用JobShell封装待执行的作业,并将该作业丢到线程之中执行

上面分析了scheduler启动的过程,在上面这个过程中的最后是将作业提交到线程池中进行执行,具体作业是怎么执行的我们还没有看到, 接下来我们继续跟进代码来分析一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}

synchronized (nextRunnableLock) {
.....
// 从availWorkers获取对应的资源用于执行任务
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
}
nextRunnableLock.notifyAll();
handoffPending = false;
}

return true;
}

上面的代码中我们可以看到有availWorkers和busyWorkers两个列表,这种模式在池化资源的时候非常有用,总结其目的无非就是 实现线程的复用和线程体量的控制,作业的具体执行还在于提交的Runnable的run方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void run() {
qs.addInternalSchedulerListener(this);

try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();

do {

JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();

......
// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getKey());
job.execute(jec);
endTime = System.currentTimeMillis();
}

jec.setJobRunTime(endTime - startTime);

// notify all job listeners
if (!notifyJobListenersComplete(jec, jobExEx)) {
break;
}

....

try {
complete(true);
} ....

qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
break;
} while (true);

} finally {
qs.removeInternalSchedulerListener(this);
}
}

在上面的代码中具体的流程可以大致分为几个步骤:

  • 使用jobFactory通过反射的方式获取job实例
  • 执行job对象的execute方法
  • 通知作业监听器当前作业完成情况

到此整个作业调度的流程我们大致看完了,不过quartz实现的作业调度的方式远不止这么简单,其本身是具备分布式调度的能力,至于如何实现分布式 调度,那肯定是需要协调者存在的,通常可以使用数据库,另外作业相关的信息也是可以持久化到数据库中的,不过这些暂时没有用到,因此也就不整理了。 另外分布式的调度也有基于quartz基础之上实现的,业内使用的比较广泛的应该就是elastic-job了,其中协调者使用的是zookeeper来完成的。接下来 我们来演示一个稍微复杂的例子,通过quartz来构建一个DAG(列表简化版的DAG)

DAG的实现

基于quartz实现时间层面上的调度显然是没有问题的,不过如果job之间有依赖关系的话应该如何实现呢?这个在具体的实施方案上有两种, 一种是将作业的dag存放在jobDataMap上,通过构建起始节点并将DAG传入起始节点,并通过JobExecutionContext在作业之间传播DAG即可, 如下:

JobDataMap实现DAG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.hw;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

/**
*
* 基于jobDataMap 实现作业之间有依赖关系的调度
*/
public class JobDataMapDAG {



public static void main(String[] args) throws Exception {

Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

/**
* 这一步是模拟一个作业转换图,当前是job之间的关系的绑定,还可以使用触发trigger的方式,使用trigger来完成多个作业的触发
*/
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put(StartJob.class.getName(), TransformJob.class.getName());
jobDataMap.put(TransformJob.class.getName(), EndJob.class.getName());

JobDetail startJob = JobBuilder.newJob(StartJob.class)
.withDescription("this is description")
.withIdentity("hello", "wes")
.usingJobData(jobDataMap)
.build();

Trigger trigger = TriggerBuilder.newTrigger()
.withDescription("this is trigger")
.withIdentity("trigger", "wes")
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(20))
.startNow()
.build();

scheduler.scheduleJob(startJob, trigger);
scheduler.start();

Thread.currentThread().join();
}

public static class StartJob implements Job {

@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {

System.out.print("start ---------------> ");

// 完成start job之后调度到下面一个作业中
String nextJob = jobExecutionContext.getJobDetail().getJobDataMap().getString(this.getClass().getName());
try {
((Job)(Class.forName(nextJob).newInstance())).execute(jobExecutionContext);
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static class TransformJob implements Job {

@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.print("transform ---------------> ");

// 完成transform job之后调度到下面一个作业中
String nextJob = jobExecutionContext.getJobDetail().getJobDataMap().getString(this.getClass().getName());
try {
((Job)(Class.forName(nextJob).newInstance())).execute(jobExecutionContext);
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static class EndJob implements Job {

@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {

System.out.println("end");
}
}
}

程序的最终输出如下:

1
2
3
4
5
6
7
8
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
start ---------------> transform ---------------> end
start ---------------> transform ---------------> end
start ---------------> transform ---------------> end
start ---------------> transform ---------------> end
start ---------------> transform ---------------> end
start ---------------> transform ---------------> end

上面这种方式会存在一个问题,那就是一个父节点对应多个子节点的情况下这种方式的调度是串行的的,无法同时执行多个后继节点的作业调度, 不过这个问题是可以通过开启多个线程来实现子节点任务的并发执行。一种更好的方式是job执行完之后,触发后继节点的trigger,由trigger 对后继节点进行点火操作,这样可以复用框架层面上给我们提供的线程池的能力。另外,如果我们要kill一个作业,是不是也会考虑大量的善后工作呢? 既然如此,我们最好还是通过框架层面上给我们提供的工具来实现这种作业的调度为好。这里由于时间关系暂时不深究了,另外,构建DAG也可以使用 listener来实现。

小结

quartz作为业内一款优秀的定时调度框架,应用范围很广,因此阅读一下源码还是很有必要的,上面的构建DAG的例子就真真的是我们工程的一个需求, 当时是要实现ETL工具的开发,由于时间关系,这里不再赘述,后面看工作需要有时间再补齐吧。