概述

最近在后台用到了线程池,之前的工作中现网也因为线程池的使用不当引发过多次的问题,因此抽时间总结了一下线程池 相关的内容,以备后用。

详解

源码走读

初始化及提交任务

线程池采用了池化的技术来实现线程的复用,并且还实现了队列用于保存提交的任务,从而将线程和任务进行了解偶。 我们先来看一下jdk给我们提供的线程池的实现,并具体的说明一下线程池的参数,代码如下:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
.....
}
  • corePoolSize:核心线程池
  • maximumPoolSize:最大线程池
  • keepAliveTime:超出核心线程数量之后的线程在没有使用的情况下的存活时间
  • unit:存活时间的单位
  • workQueue:提交任务的保存队列(注意并不是所有提交的任务一定会入队列)
  • threadFactory:创建新线程的工厂类
  • handler:饱和策略(当阻塞队列中已经满并且线程池中线程的数量达到最大线程池的情况下,对于新提交的任务的处理策略)

接下来看一下任务提交的整个过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

在解释上面代码之前,有必要先看一下集合核心的字段及方法,上面的代码中有一个核心的字段ctl,该字段是一个AtomicInteger类型的字段,是一个32位的变量,其中 低29位表示线程池中线程的数量,高三位则代表了线程池的状态,用来表征线程池状态的状态如下:

  • RUNNING = -1 << COUNT_BITS:高三位是111,低29位是0
  • SHUTDOWN = 0 << COUNT_BITS:高三位是000,低29位是0
  • STOP = 1 << COUNT_BITS:高三位是001,低29位是0
  • TIDYING = 2 << COUNT_BITS:高三位是010,低29位是0
  • TERMINATED = 3 << COUNT_BITS:高三位是100,低29位是0

上面可以看到的是running -> terminated是一个依次递增的过程。

比较重要的方法如下:

  • int runStateOf(int c) { return c & CAPACITY; }:CAPACITY高三位是1,其余是0,用来获取线程的状态信息
  • int workerCountOf(int c) { return c & CAPACITY; }:高三位是0,其余为1,用于获取线程池中线程数量的信息
  • int ctlOf(int rs, int wc) { return rs | wc; }:用于将高三位rs(runStatus)和低29位wc(workerCount)组合成ctl变量

在介绍完这些基础知识之后,我们来看一下,任务提交的过程中线程池框架的处理过程是什么样子的:

  • 如果线程池中线程的数量小于corePoolSize,这个时候就会创建一个新的线程来执行任务
  • 如果线程池中运行的线程的数量超过了corePoolSize则会判断当前线程池的状态,如果线程池处于运行状态,并且能够将任务添加到队列中 则会进行一次doubleCheck来看一下线程池是否处于运行状态,如果线程池已经不在运行的状态,则会尝试将任务移出队列,如果移出成功则会 使用提供的reject策略进行处理任务,否则会判断当前线程的数量是否为0,如果是0的话,则会通过提交一个空任务来创建一个线程。
  • 如果队列已经满了,则会将线程扩充到maxPoolSize来执行任务
  • 如果线程池中线程的数量已经到达了maxPoolSize,则会使用reject策略进行处理任务

下面我们简要的通过一个流程图来演示一下新任务提交的过程中,线程池是如何处理的: // TODO

增加线程:addWorker

上面的过程中我们看到线程池中核心的方法就是addWorker、reject,因此接下来我们来看一下线程池添加线程的过程 (说明一下这里addWorker有两个参数,其中firstTask是该worker执行的第一个任务,一旦线程创建成功之后就会直接从 队列中获取任务了,就不会再在这里传参数来构建worker并执行任务了),具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

........
}

上述代码中retry部分的判断比较复杂,线程池状态处于非运行状态并且线程终止、任务异常、队列为空任何一个条件满足的情况下则添加任务失败。 关于这个判断的话我们可以大致的总结一下(把括号内的条件拆解会比较好理解一点):

  • 当线程池处于或者更高的shutdown状态的时候,我们就不应该往队列中添加任务了
  • 如果线程池处于更高的状态并且提交的任务不是null的情况下,则说明此时线程池已经关闭了,因此也不能再往线程池中添加任务了
  • 如果线程池处于更高的状态并且线程池中的队列也已经为空了,我们也不应该提交任务了,这一步的分析要回到上面execute的方法中, 上面的execute的方法中我们会看到当线程池中线程的数量为空的时候是会通过提交一个空的任务来创建一个线程,并从queue中获取任务, 也就说明了线程池正常运行状态的时候肯定不是空,如果线程池中的线程已经是空的了,说明可能正在进行的操作是线程的销毁,因此这种 情况下也不应该新建线程,所以直接return false了。

如果不是上面这些情况的话,我们则会通过cas的操作来增加一个worker:compareAndIncrementWorkerCount(c),这里新增一个worker也是需要 一定条件的,具体如下:

  • 只有线程池中线程的数量少于CAPACITY并且线程的数量少于核心线程或者最大线程的时候才会创建,需要注意这里到底是小于核心线程 还是最大线程的数量是和传入的参数相关的。

上面的整个过程还只是针对线程池中线程的数量做自加的操作,到此为止线程池中线程的数量并没有发生实质的操作,接下来才是 真正的创建线程的过程,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;

上面的代码中可以看到首先定义了两个标记位:workerStarted、workerAdded,分别用来表征线程成功启动、创建线程成功。 接下来会首先创建一个worker对象,worker对象的创建伴随着一个线程的创建,具体可以参见worker的构造函数。我们可以获取worker里面的 线程。接下来我们会获取当前线程池的一把锁,这个是可重入锁,意味着一个线程可以向该线程池中添加多次任务(在当前线程抢到了锁的情况下)。 接下来我们会获取线程池的状态,并判断(或):

  • 线程池处于运行的状态
  • 线程处于shutdown的状态(此时queue中可能还有没有处理完的任务,这种情况下是要求创建没有任务的线程来消费剩下的任务的)

上面的条件二者满足其一的情况下则可以向线程池中添加新的线程,如果添加线程成功了则可以开始执行任务了,如果执行任务中失败了, 或者压根创建线程都没有成功的情况下则我们还应该还原现场:将线程的数量减一。

这里值得注意的是worker有点类似于一个包装器,用来包装任务和线程,并且其实现了AQS框架的lock操作。

拒绝策略:reject

在上面的代码中我们可以看到在一定的情况下我们需要拒绝新加入的任务,总结概括一下拒绝的策略,如下:

  • 任务队列满的时候

接下来我们通过具体的代码来分析一下拒绝的流程:

1
2
3
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

如上,这里拒绝的过程相对来说就比较简单了,直接是调用我们传入(或是缺省默认的情况)的拒绝策略来执行了。

到这里还缺失了一个步骤没有看到:当线程池中线程的数量大于corepoolsize之后,队列还没有满的情况下需要将新的任务添加到队列中, 当队列满之后就会创建新的线程来执行任务,那么加入队列的任务是什么时候被线程池中的线程执行的呢?这些都要从ThreadPoolExecutor 的内部类Worker来说起了。在介绍worker之前,先来简单的对其做一下介绍: worker实现了AQS框架,并继承了Runnable接口,我们知道 AQS的作用主要是实现加解锁的工具,既然是继承了AQS,那么worker肯定是实现了自己的加解锁的过程,之所以要实现加解锁是为了防止 在任务执行的过程中,线程接收到中断的信号,那么这种情况下会导致任务的执行失败,通过加一个不可重入的排它锁,当一个任务在执行的 时候,外部想要中断这个任务就必须要获取锁,而现实的情况是锁不可重入且是排它的,因此就必须等待当前任务执行完毕。那么什么情况 下会发生中断任务的请求呢?这个也可以在源码的注释中找到答案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 /* When a new task is submitted in method {@link #execute(Runnable)},
* and fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle. If there are more than corePoolSize but less than
* maximumPoolSize threads running, a new thread will be created only
* if the queue is full. By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {@link #setCorePoolSize} and {@link
* #setMaximumPoolSize}. </dd>
*/


public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}

上面代码的注释中可以清晰的看到我们是可以在线程池处于运行状态的时候动态的调整线程池的大小的,而这个动态调整的一个过程 就会触发中断的信号,查看其他核心参数的调整也都可以看到中断的过程。接下来看一下Work的构造函数:

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

上面的介绍中我们可以知道,task和worker都实现了Runnable的接口,再看一下worker的构造函数,是否会发现这其实就是一个代理的过程呢? 这里需要简单的介绍一下worker的构造函数:

  • state:state是一个计数器,用来表征AQS的同步状态,也即是当前worker是否已经加锁了
  • firstTask:用户提交的任务,这里需要注意,虽然task实现了Runnable接口,但是并不是会使用firstTask来完成线程的创建的, 其只是用来执行任务的,可能这句话不太好理解,不过线程池的目标就是用来复用线程的,如果我们用task来创建线程,那么势必造成 再来一个任务就没有办法复用这个task所创建的线程了。
  • thread:线程池中真正执行任务的线程

当任务被执行的时候,最终会走到runWorker方法中,这里代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

上面的过程中可以看到,执行任务的时候会首先对worker进行unlock,此时任务还没有执行,因此是可以中断的状态,接下来就进入一个while循环了, 可以看到该while循环是不停的获取任务的过程,一旦获取到任务,接下来就会对当前的worker进行上锁,并执行任务,如果任务执行失败就会 抛出异常,并最终执行processWorkerExit(w, completedAbruptly);方法。这里还有两个过程需要分析:getTaskprocessWorkerExit ,我们接下来分别看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

上面就是获取任务的整个过程,这里首先会线程池状态是否异常,如果是则进入一个销毁线程池的过程,具体参见decrementWorkerCount, 接下来会判断线程池中线程的数量是否超过允许的最大值或者获取任务是否超时。接下来进入一个任务获取的过程。

最后是任务执行完之后一个退出的过程,该过程是线程池中没有任务的情况下线程池中线程的销毁策略,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void processWorkerExit(Worker w, boolean completedAbruptly) {

// 任务异常终止的情况下需要将worker的数量-1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

// 移除worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

// 除了任务执行完之后,队列中存放的任务为空的情况下,什么情况下还会进行线程的销毁呢?
// 那就是整个线程池都销毁的时候,因此这一步是判断是否有必要终止线程池
tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 执行任务的过程中有任何失败导致线程数量(并不是真正线程的个数)回退,就会通过添加一个初始任务为空的worker
addWorker(null, false);
}
}

线程池的种类

上面我们从源码的层面上分析了一下线程池的原理,接下来看一下当前较常用的线程池工具类,这些常用的线程池的生成是在Executors 里面,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}


public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

上面是从源代码中摘抄的(除了上面的这些个种类的线程池,还包含了ForkJoin类型的线程池),从上面的定义中可以看到这些个 种类的线程池都是通过ThreadPoolExecutor来生成的,唯一的区分是线程池的参数会有所不同:

  • newFixedThreadPool:这里我们可以看到当前线程池的核心线程池大小和最大线程池大小是一致的,使用的队列是一个无界的阻塞队列
  • newCachedThreadPool:这里是一个缓存线程池,可以看到这里的核心线程池是0,并且接收的队列是一个同步阻塞队列,试想一下如果我们 使用这种线程池,如果我只提交一个任务,那么由于核心线程池中线程的数量是0,将会导致firstTask无法被触发,最终会导致没有办法消费队列中的 任务。这个问题看起来很奇怪,根本原因出在我们所使用的用于接收任务的队列上面SynchronousQueue,下面会针对队列进行分析。
  • ScheduledThreadPoolExecutor:创建一个定长的线程池,DelayedWorkQueue也是一个阻塞队列

线程池中的队列(此处存疑)

在介绍上面线程池种类的时候有提到线程池使用的阻塞队列,我们可以先对阻塞队列进行一个分类:

  • 有界队列:顾名思义,有界队列是队列的大小是有限制的,常用的有FIFO的ArrayBlockQueue、具备优先级的PriorityBlockingQueue
  • 无界队列:队列的大小不受限制,这个可能会导致积压大量的任务并最终引发OOM,常见的有LinkedBlockingQueue
  • 同步移交队列:这是一个比较特殊的队列,SynchronizedQueue并不是一个队列,而是线程之间的一种移交策略,当我们将任务 放入队列时候必须要有一个线程等待接收这个任务。只有在使用无界队列或者有饱和策略(常见的就是由提交线程处理任务)的时候才使用这个队列。

关于队列的还有很多东西可以讲,后面会专门的抽出一个小结来说明线程池中的队列。

饱和策略

所谓饱和策略是指当我们没有办法往队列中添加任务的时候,针对新提交的任务的处理策略,具体可以参考如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

上面的代码中可以看到,饱和策略大致可以分为四种:

  • CallerRunsPolicy:这种方式我们是直接使用的r.run的方式,因此可以知道这种方式其实是使用的阻塞提交者的方式来执行任务的
  • AbortPolicy:抛出异常并终止执行的策略
  • DiscardOldestPolicy:丢弃最旧的任务的策略
  • DiscardPolicy:简单的丢弃策略,什么都不做

案例分析

由于我们是做监控业务的,会有告警数据的处理,有一些告警策略需要针对某一种或者某一个类型的监控对象进行连续检测,由于我们 生成的异常数据是写入到kafka,而做这种连续检测的是放在后台进行检测,我们的后台是部署了多个server端,不过由于需要连续检测 ,并且由于消息入队列的时候是没有key值的,因此kafka的分区只有一个,所以我们的后端其实是采用主备的方式进行消费数据的, 这里我们使用到了线程池去消费,最初的时候我们使用的是一个无界队列,这导致我们的服务端隔一段时间会出现告警的积压,并且 后端出现OOM,因此我们后面采用的是有界队列,并且是丢弃旧的数据的策略,毕竟告警有一定的时效性,才算稳定现网。

不过这也明显存在一个缺陷,因此可以采用的策略是:

  • 使用实时计算组件通过groupByKey,并增加kafka的分区来进行处理
  • 增加kafka的分区,并将数据按照一定的规则写入redis,后台实现redis key值的切分完成并行的处理数据

小结

每天进步一点点,加油!

cached pool 能够创建线程的原因

TODO scheduler取消调度线程的问题?????remove无效