概述

监控也做了一段时间了,短短续续接触过一系列的存储与计算相关的组件,机缘巧合接触到了druid(并不是阿里的数据库连接池工具), 这是一款用于OLAP的存储组件,鉴于其高效的查询效率,在初步了解、使用以及弄清楚整体的架构之后,也对其源码产生了较大的兴趣, 由于官网上对其使用、介绍也是比较全面的,因此在此略讲这些东西,看后面有时间再补齐。

架构

下图是druid数据流图

对于上图简要的介绍一下:

  • router是用来分发请求的
  • broker是用来将查询结果合并的,通常建议可以和router合并在一起
  • middlemanager和historical是用来存放未发布的和已经发布的数据的
  • overlaod是用来保障高可用的
  • coordinator是用来管理segment(数据)的,包括了数据的发布、均衡等操作

官方推荐的部署架构如下图所示:

源码走读

在深入了解每一个模块之前,我们有必要对整个工程的启动、运行流程有一定的了解,不然就是老虎吃天,无从下口。

启动

万物皆有入口,哈哈哈!那么druid的入口在哪里呢?查看启动的脚本就会发现,入口就在Main类的main方法,多么自然,在程序中万物的运行始于 main函数。从代码开始看起吧!为了方便理解,我是将代码分成多段进行讲解了,具体的的步骤:

构建builder对象

构建builder对象,该对象是管理我们在工程中使用到的命令行的工具箱,我们可以将其想像成一个箱子。通俗点来讲, 我假设大家都是用过git工具的,这个builder对象就有点类似于git命令,git包含git remote 、git add等工具, 因此也可以将git看作一个箱子,这个箱子里面包含了各种各样的工具可以让我们使用,代码如下:

1
2
3
4
final Cli.CliBuilder<Runnable> builder = Cli.builder("druid");
builder.withDescription("Druid command-line runner.")
.withDefaultCommand(Help.class)
.withCommands(Help.class, Version.class);

添加分组

给我们的builder对象添加各种命令,在直接添加命令之前,这里先对其做了分组的工作,还是以git为例, git remoteremote其实就是一个分组的作用,其下面包含的有git remote add、git remote update 等命令,正如我们在上面的代码中看到的那样,druid也对命令做了多个分组: server、tools、index、internal, 也就是说我们要使用调用server下的某一个process的时候,需要加上server组名才可以调通。那么server能调用的命令 包含那些呢?要知道分组下的命令可以直接到对应的类上面看其@Command注解即可,我们还是以middleManager为例:

1
2
3
4
5
@Command(
name = "middleManager",
description = "Runs a Middle Manager, this is a \"task\" node used as part of the remote indexing service, see https://druid.apache.org/docs/latest/design/middlemanager.html for a description"
)
public class CliMiddleManager extends ServerRunnable

这也就是说,当我们想要运行middleManager的时候,向main函数传递的参数应该是 main server middleManager,类比其他的process 我们也就很好理解了,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
List<Class<? extends Runnable>> serverCommands = Arrays.asList(
CliCoordinator.class,
CliHistorical.class,
CliBroker.class,
CliOverlord.class,
CliIndexer.class,
CliMiddleManager.class,
CliRouter.class
);
// group可以认为是命令行的前缀
builder.withGroup("server")
.withDescription("Run one of the Druid server types.")
.withDefaultCommand(Help.class)
.withCommands(serverCommands);

List<Class<? extends Runnable>> toolCommands = Arrays.asList(
DruidJsonValidator.class,
PullDependencies.class,
CreateTables.class,
DumpSegment.class,
ResetCluster.class,
ValidateSegments.class,
ExportMetadata.class
);
builder.withGroup("tools")
.withDescription("Various tools for working with Druid")
.withDefaultCommand(Help.class)
.withCommands(toolCommands);

builder.withGroup("index")
.withDescription("Run indexing for druid")
.withDefaultCommand(Help.class)
.withCommands(CliHadoopIndexer.class);

builder.withGroup("internal")
.withDescription("Processes that Druid runs \"internally\", you should rarely use these directly")
.withDefaultCommand(Help.class)
.withCommands(CliPeon.class, CliInternalHadoopIndexer.class);

构建工厂

构建一个Injector,这里使用到了guice这款谷歌开源的IOC框架,相信用过spring的人都知道或者了解IOC的机制了,这里就不对IOC讨论了。 简单的类比一下guice中的概念和spring中的概念,injector可以类比为spring中的ApplicationContext,在spring中我们可以使用context来获取 某一个对象,那么injector也必然是可以用来获取某一个对象了!是的。injector的主要作用就是存放各种java bean。

1
final Injector injector = GuiceInjectors.makeStartupInjector();

获取扩展配置信息

获取ExtensionsConfig对象,这里是druid给我们提供的扩展机制,在main函数中,该类主要适用于命令行工具的扩展,具体代码如下:

1
2
3
4
5
6
7
8
9
10
final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class);
// 加载CliCommandCreator对应的实现类,默认情况下应该是没有的,这里加载也是采用了SPI的机制
final Collection<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(
config,
CliCommandCreator.class
);
for (CliCommandCreator creator : extensionCommands) {
creator.addCommands(builder);
}
final Cli<Runnable> cli = builder.build();

我们进CliCommandCreator看一下会发现是一个接口,并且没有任何实现,代码如下:

1
2
3
4
public interface CliCommandCreator
{
void addCommands(Cli.CliBuilder builder);
}

那为什么还要循环遍历这个集合呢?看的一脸懵逼吧,这里是java里面给我们提供的SPI机制来扩展命令行的,不过默认并没有任何实现,因此该段代码可以略过。

解析命令并执行process

解析命令并运行相关的process,代码如下:

1
2
3
4
5
6
7
final Runnable command = cli.parse(args);
if (!(command instanceof Help)) { // Hack to work around Help not liking being injected
// 这里会将command对应的对象进行注入进来
injector.injectMembers(command);
}
// 这里最终是会调用对应的组件,如middlemanager的run方法的
command.run();

到此位置我们将main函数的主要流程分析清楚了,接下来我们就从server组里的middlemanager这个command来看一下吧。 在main方法的最后,我们看到了command.run()这个方法的调用,那么这个方法最终会在哪里触发执行呢?这就和我们在 使用命令行的时候server后面跟的参数有关系了。还是以middlemanager为例来分析吧(其他的都一样)。

执行process

在讲解代码流程之前,我们先看一下该process对应的类图: 可以看到command.run()方法最终应该是会触发serverRunnable处的run方法的执行,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run()
{
// 构建本实例所需要的工厂对象,用于后面生成对象
final Injector injector = makeInjector();
final Lifecycle lifecycle = initLifecycle(injector);

try {
lifecycle.join();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

上面代码可以分为3步:

  • 构建当前process需要的injetor对象,可能有的人会比较疑惑,前面构建了一个injector,这里又构建了一个, 那么这两个injector有没有必要合并成一个,这样两个injector中的java bean共享一个上下文,这里 完全没必要担心,跟进代码你会发现,这里生成的injector就是使用上面生成的baseInjector来生成的, 这也就是说,上文中构建的injector缺少特定的process所需要的java bean,这里就是在原来的injector 基础之上又装配了当前这个process所需要的java bean。
  • 初始化injector的生命周期,这里的作用是实例化injector中非lazy类型的java bean,启动整个工程(可以认为是一个核心了)
  • 等待程序运行结束

接下来我们还是继续分析makeInjector()具体执行了什么操作吧,如下:

1
2
3
4
5
6
7
8
9
public Injector makeInjector()
{
try {
return Initialization.makeInjectorWithModules(baseInjector, getModules());
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

可以看到上面的方法就是给baseInjector注入process所需要的功能java bean。这里的baseInjector是什么呢?

1
2
3
4
5
@Inject
public void configure(Injector injector)
{
this.baseInjector = injector;
}

可以看到这里的baseInjector是使用guice的注解装配进来的injector,也就是我们最早定义的injector。 点进去getModules()可以发现这是一个接口,嗯,没错这里是使用到了模板模式!真正的实现放到了对应的process中去了。 进入makeInjectorWithModules方法可以看到使用了Guice.createInjector(Modules.override(oldmodules).with(newmodules)) 这种方式来获取新的injector。在将特定process的java bean装配到injector之后,接下来就是要执行程序的初始化了,也就是该创建的 对象创建,该开启的端口开启就完事了,我们来看一下具体的代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public Lifecycle initLifecycle(Injector injector)
{
try {
// 获取lifecycle实例,该实例会初始化工程所必须的对象
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
final StartupLoggingConfig startupLoggingConfig = injector.getInstance(StartupLoggingConfig.class);

Long directSizeBytes = null;
try {
directSizeBytes = JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
}
catch (UnsupportedOperationException ignore) {
// querying direct memory is not supported
}

log.info(
"Starting up with processors[%,d], memory[%,d], maxMemory[%,d]%s. Properties follow.",
JvmUtils.getRuntimeInfo().getAvailableProcessors(),
JvmUtils.getRuntimeInfo().getTotalHeapSizeBytes(),
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
directSizeBytes != null ? StringUtils.format(", directMemory[%,d]", directSizeBytes) : ""
);

if (startupLoggingConfig.isLogProperties()) {
final Set<String> maskProperties = Sets.newHashSet(startupLoggingConfig.getMaskProperties());
final Properties props = injector.getInstance(Properties.class);

for (String propertyName : Ordering.natural().sortedCopy(props.stringPropertyNames())) {
String property = props.getProperty(propertyName);
for (String masked : maskProperties) {
if (propertyName.contains(masked)) {
property = "<masked>";
break;
}
}
log.info("* %s: %s", propertyName, property);
}
}

try {
lifecycle.start();
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
}

return lifecycle;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

上面的代码比较长,不过总的来说只有两行有效代码: final Lifecycle lifecycle = injector.getInstance(Lifecycle.class)lifecycle.start() 这两行代码分别是从injector中获取lifecycle对象,调用lifecycle对象的start方法, 这里我们点进去LifeCycle的类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Lifecycle
{
........
public void start() throws Exception
{
startStopLock.lock();
try {
if (!state.get().equals(State.NOT_STARTED)) {
throw new ISE("Already started");
}
if (!state.compareAndSet(State.NOT_STARTED, State.RUNNING)) {
throw new ISE("stop() is called concurrently with start()");
}
for (Map.Entry<Stage, ? extends List<Handler>> e : handlers.entrySet()) {
currStage = e.getKey();
log.info("Starting lifecycle [%s] stage [%s]", name, currStage.name());
for (Handler handler : e.getValue()) {
handler.start();
}
}
log.info("Successfully started lifecycle [%s]", name);
}
finally {
startStopLock.unlock();
}
}
.........

如果你是看到了上面的代码,那么恭喜你,你找错方向了!仔细看上面,我们是调用了injector.getInstance(Lifecycle.class) 来获取lifecycle对象,也就你要从工厂中获取该对象,很明显上面的这个类并没有使用guice提供的 注解来将其装配到injector中,因此这里并不是我们想要的代码。那么真正的实现在哪里呢?在LifeCycleModule 类中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// @provides注解是用来生成java bean的一种方式,这里也就是生成整个工程所需要的lifecycle对象
@Provides @LazySingleton
public Lifecycle getLifecycle(final Injector injector)
{
final Key<Set<KeyHolder>> keyHolderKey = Key.get(new TypeLiteral<Set<KeyHolder>>(){}, Names.named("lifecycle"));
final Set<KeyHolder> eagerClasses = injector.getInstance(keyHolderKey);

Lifecycle lifecycle = new Lifecycle("module")
{
@Override
// 这里是所有的组件最终的入口,可以看到调用了lifecycle的start方法之后该实例化的对象都实例化了
public void start() throws Exception
{
for (KeyHolder<?> holder : eagerClasses) {
// 初始化工厂中的对象
injector.getInstance(holder.getKey()); // Pull the key so as to "eagerly" load up the class.
}
super.start();
}
};

// 这里是继续装配lifecycle对象
initScope.setLifecycle(lifecycle);
normalScope.setLifecycle(lifecycle);
serverScope.setLifecycle(lifecycle);
annoucementsScope.setLifecycle(lifecycle);

return lifecycle;
}

上面才是lifecycle.start方法执行的操作,这一步我们看到是有一个for循环的操作,该操作就是实例化我们当前process需要的 对象了,最终会调用join方法等待当前的process退出。这里首先是使用Key.get(new TypeLiteral<Set<KeyHolder>>(){}, Names.named("lifecycle")) 来获取一系列的Key值,这些Key值是Set<KeyHolder>类型的,并且被@Names("lifecycle")注解了的,guice中的Key值是什么概念呢?可以认为是spring中的 beanName,在spring中我们是可以通过beanName的方式来直接从容器中获取bean的,因此这里也是为了下面获取bean的而做的准备。不过我们可能会很好奇,这些个 被@Names("lifecycle")标注的过程是在哪里呢?这个就要返回到上面makeInjector方法了,在创建injector的时候,我们有提到过getModules方法,不过 是一笔略过的,现在就看一下这些个依赖是如何在启动的过程中被加进来的吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected List<? extends Module> getModules()
{
// Server.class和SelfDiscoveryResource.class注册到了生命周期的管理,会在项目启动的时候就实例化对应的对象
return ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
.......
LifecycleModule.register(binder, Server.class);

.......
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
}

这里我们看到对于CliMiddleManager这个process来说,在装配自定义的Module的时候,是注册了两个类的,我们来看一下这个注册的过程是什么样子的吧:

1
2
3
4
5
6
7
8
9
public static void registerKey(Binder binder, Key<?> key)
{
getEagerBinder(binder).addBinding().toInstance(new KeyHolder<Object>(key));
}

private static Multibinder<KeyHolder> getEagerBinder(Binder binder)
{
return Multibinder.newSetBinder(binder, KeyHolder.class, Names.named("lifecycle"));
}

上面就是注册过程中调用的方法链了,这里还是来解释一下:Multibinder.newSetBinder(binder, KeyHolder.class, Names.named("lifecycle")) .addBinding().toInstance(new KeyHolder<Object>(key)),整个方法的调用的话就如同上面这个样子,这里的含义是会将多个类都 存放到Set中来,每调用一次注册的方法,就会生成一个new KeyHolder<Object>(key)来持有我们希望在process初始化的过程中实例化的对象。

我们现在再回到上文看一下就不难发现Set<KeyHolder> eagerClasses = injector.getInstance(keyHolderKey)获取到的就是上面我们注册类进来的时候 生成的集合,不过这个集合现在持有的并不是实例化之后的对象,而是Set<KeyHolder>,这里的KeyHolder是会持有一个Class的,我们通过遍历这个集合类就 可以完成当前process在启动之处就需要实例化的类了。

geModules方法中我们看到,当前的process注册的类中有一个Server类,这个类必然会在lifecycle的start方法中被实例化,不过实例化的类在哪里呢?在JettyServerModule这个类中 提供了具体的实现,我们来看一下实例化一个Server对象的时候究竟发生了什么吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Provides
@LazySingleton
public Server getServer(
final Injector injector,
final Lifecycle lifecycle,
@Self final DruidNode node,
final ServerConfig config,
final TLSServerConfig TLSServerConfig
)
{
return makeAndInitializeServer(
injector,
lifecycle,
node,
config,
TLSServerConfig,
injector.getExistingBinding(Key.get(SslContextFactory.class)),
injector.getInstance(TLSCertificateChecker.class)
);
}

这里我们看到Server的创建依赖其他的五个对象,这些类在前面的module中都是已经被注入到injector中了,因此当我们获取Server对象的时候,这些 对象也就会通过深度遍历的方式给依次注入进来,这里我们也不做过多的讨论了,就看一下创建Server的过程吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
static Server makeAndInitializeServer(
Injector injector,
Lifecycle lifecycle,
DruidNode node,
ServerConfig config,
TLSServerConfig tlsServerConfig,
Binding<SslContextFactory> sslContextFactoryBinding,
TLSCertificateChecker certificateChecker
)
{
// adjusting to make config.getNumThreads() mean, "number of threads
// that concurrently handle the requests".
int numServerThreads = config.getNumThreads() + getMaxJettyAcceptorsSelectorsNum(node);

final QueuedThreadPool threadPool;
if (config.getQueueSize() == Integer.MAX_VALUE) {
threadPool = new QueuedThreadPool();
threadPool.setMinThreads(numServerThreads);
threadPool.setMaxThreads(numServerThreads);
} else {
threadPool = new QueuedThreadPool(
numServerThreads,
numServerThreads,
60000, // same default is used in other case when threadPool = new QueuedThreadPool()
new LinkedBlockingQueue<>(config.getQueueSize())
);
}

threadPool.setDaemon(true);

final Server server = new Server(threadPool);

// Without this bean set, the default ScheduledExecutorScheduler runs as non-daemon, causing lifecycle hooks to fail
// to fire on main exit. Related bug: https://github.com/apache/druid/pull/1627
server.addBean(new ScheduledExecutorScheduler("JettyScheduler", true), true);

final List<ServerConnector> serverConnectors = new ArrayList<>();

if (node.isEnablePlaintextPort()) {
log.info("Creating http connector with port [%d]", node.getPlaintextPort());
HttpConfiguration httpConfiguration = new HttpConfiguration();
if (config.isEnableForwardedRequestCustomizer()) {
httpConfiguration.addCustomizer(new ForwardedRequestCustomizer());
}

httpConfiguration.setRequestHeaderSize(config.getMaxRequestHeaderSize());
final ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration));
if (node.isBindOnHost()) {
connector.setHost(node.getHost());
}
connector.setPort(node.getPlaintextPort());
serverConnectors.add(connector);
}

.....

final ServerConnector[] connectors = new ServerConnector[serverConnectors.size()];
int index = 0;
for (ServerConnector connector : serverConnectors) {
connectors[index++] = connector;
connector.setIdleTimeout(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
// workaround suggested in -
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=435322#c66 for jetty half open connection issues during failovers
connector.setAcceptorPriorityDelta(-1);

List<ConnectionFactory> monitoredConnFactories = new ArrayList<>();
for (ConnectionFactory cf : connector.getConnectionFactories()) {
// we only want to monitor the first connection factory, since it will pass the connection to subsequent
// connection factories (in this case HTTP/1.1 after the connection is unencrypted for SSL)
if (cf.getProtocol().equals(connector.getDefaultProtocol())) {
monitoredConnFactories.add(new JettyMonitoringConnectionFactory(cf, ACTIVE_CONNECTIONS));
} else {
monitoredConnFactories.add(cf);
}
}
connector.setConnectionFactories(monitoredConnFactories);
}

server.setConnectors(connectors);
final long gracefulStop = config.getGracefulShutdownTimeout().toStandardDuration().getMillis();
if (gracefulStop > 0) {
server.setStopTimeout(gracefulStop);
}
server.addLifeCycleListener(new LifeCycle.Listener()
{
@Override
public void lifeCycleStarting(LifeCycle event)
{
log.debug("Jetty lifecycle starting [%s]", event.getClass());
}

@Override
public void lifeCycleStarted(LifeCycle event)
{
log.debug("Jetty lifeycle started [%s]", event.getClass());
}

@Override
public void lifeCycleFailure(LifeCycle event, Throwable cause)
{
log.error(cause, "Jetty lifecycle event failed [%s]", event.getClass());
}

@Override
public void lifeCycleStopping(LifeCycle event)
{
log.debug("Jetty lifecycle stopping [%s]", event.getClass());
}

@Override
public void lifeCycleStopped(LifeCycle event)
{
log.debug("Jetty lifecycle stopped [%s]", event.getClass());
}
});

// initialize server
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
try {
initializer.initialize(server, injector);
}
catch (Exception e) {
throw new RE(e, "server initialization exception");
}

lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.debug("Starting Jetty Server...");
server.start();
if (node.isEnableTlsPort()) {
// Perform validation
Preconditions.checkNotNull(sslContextFactory);
final SSLEngine sslEngine = sslContextFactory.newSSLEngine();
if (sslEngine.getEnabledCipherSuites() == null || sslEngine.getEnabledCipherSuites().length == 0) {
throw new ISE(
"No supported cipher suites found, supported suites [%s], configured suites include list: [%s] exclude list: [%s]",
Arrays.toString(sslEngine.getSupportedCipherSuites()),
tlsServerConfig.getIncludeCipherSuites(),
tlsServerConfig.getExcludeCipherSuites()
);
}
if (sslEngine.getEnabledProtocols() == null || sslEngine.getEnabledProtocols().length == 0) {
throw new ISE(
"No supported protocols found, supported protocols [%s], configured protocols include list: [%s] exclude list: [%s]",
Arrays.toString(sslEngine.getSupportedProtocols()),
tlsServerConfig.getIncludeProtocols(),
tlsServerConfig.getExcludeProtocols()
);
}
}
}

@Override
public void stop()
{
try {
final long unannounceDelay = config.getUnannouncePropagationDelay().toStandardDuration().getMillis();
if (unannounceDelay > 0) {
log.info("Sleeping %s ms for unannouncement to propagate.", unannounceDelay);
Thread.sleep(unannounceDelay);
} else {
log.debug("Skipping unannounce wait.");
}
log.debug("Stopping Jetty Server...");
server.stop();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RE(e, "Interrupted waiting for jetty shutdown.");
}
catch (Exception e) {
log.warn(e, "Unable to stop Jetty server.");
}
}
},
Lifecycle.Stage.SERVER
);

return server;
}

这段代码比较长,不过总结下来也就分以下三个步骤:

  • 创建web server:包含了配置server处理请求所需的线程池,server处理请求的逻辑connector,server在整个生命周期中需要的一些个listener(这个在tomcat中是比较常见的) ,以及会在initializer.initialize(server, injector);方法中初始化server拦截的请求、需要的filter等。
  • 添加handler到lifecycle的handlers中去,这一步的作用是一个hook的作用,也就是说把server的start方法挂到了lifecycle的start方法中了,这样当我们调用lifecycle的start方法 启动整个应用之后,自然而然我们的server也就启动了,我们在添加的handler中也可以看到server.start();方法的调用
  • 返回server实例

到此为止,我们看到了lifecycle的start方法,带动了server的创建,其中server的创建又配置了handler,这些个handler的一次调用是在哪里呢?是在parent的lifecycle中,如下:

说明一下,在整个process启动的过程中并不是只添加了一个server的handler,还有CuratorFramework,也就是负责和zookeeper保持通信的客户端 ,如果想要知道全部的handler,可以直接搜lifecycle.addHandler就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void start() throws Exception
{
startStopLock.lock();
try {
.....
for (Map.Entry<Stage, ? extends List<Handler>> e : handlers.entrySet()) {
currStage = e.getKey();
log.info("Starting lifecycle [%s] stage [%s]", name, currStage.name());
for (Handler handler : e.getValue()) {
handler.start();
}
}
log.info("Successfully started lifecycle [%s]", name);
}
finally {
startStopLock.unlock();
}
}

分析到这里我们就看到了完整的process的启动,带动了web server的启动。不过我们好像并没有看到这个web server可以处理的请求是怎么样给加进来的( 虽然我们在上面看到了配置web server拦截所有的请求,不过这只是默认的servlet而已,并不是我们在应用中自定义的resource)。添加自定义resource 是在JettyServerModule#configureServlets方法中加进来的,我们具体看一下这个过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void configureServlets()
{
Binder binder = binder();
.....
binder.bind(GuiceContainer.class).to(DruidGuiceContainer.class);
binder.bind(DruidGuiceContainer.class).in(Scopes.SINGLETON);
.....

serve("/*").with(DruidGuiceContainer.class);

Jerseys.addResource(binder, StatusResource.class);
binder.bind(StatusResource.class).in(LazySingleton.class);
.....
}

在这里我么看到serve("/*").with(DruidGuiceContainer.class)这么一行代码,查看DruidGuiceContainer的类图: 可以发现该类本身就是一个servlet,不过这里还是先想一下DruidGuiceContainer对象从何而来?如果你认为是通过无参的构造函数 反射获得的话,那么很明显又跑题了,要知道我们整个应用都是基于guice来完成的,因此,理所当然的,我们的DruidGuiceContainer 也必然是通过容器获取到的,证明就在上面两行代码,现在我们进到这个类里面来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class DruidGuiceContainer extends GuiceContainer
{
private final Set<Class<?>> resources;

@Inject
public DruidGuiceContainer(
Injector injector,
@JSR311Resource Set<Class<?>> resources
)
{
super(injector);
this.resources = resources;
}

@Override
protected ResourceConfig getDefaultResourceConfig(
Map<String, Object> props, WebConfig webConfig
)
{
return new DefaultResourceConfig(resources);
}
}

如上,可以看到构造函数上用@Inject来标注了,说明是采用构造器注入的方式来创建该对象的,构造器包含了两个参数,很明显第一个参数 就是我们在main函数中生成的injector,这个injector在整个工程中传来传去,目的就是构造一个统一的上下文。而第二个参数就比较关键了, 也是扩展web工程的关键,其所代表的含义是,DruidGuiceContainer需要一组使用了@JSR311Resource标注的类的集合来创建,这 也标志着我们可以将某些类以@JSR311Resource的方式注入到injector中,这些被注解标注了的并且注入到injector里面的类最终将会 作为DruidGuiceContainer这个servlet的初始化参数,servlet在拦截到请求之后会根据请求的参数路由到对应的resources进行处理。

接下来我们就来随便找一个例子来看一下这些个resource是怎么样装配到servlet上来的吧,我们在process的代码中基本上是随处可见这样的代码: Jerseys.addResource(binder, ShuffleResource.class);,我们继续跟下去:

1
2
3
4
5
6
7
public static void addResource(Binder binder, Class<?> resourceClazz)
{
LOG.debug("Adding Jersey resource: " + resourceClazz.getName());
Multibinder.newSetBinder(binder, new TypeLiteral<Class<?>>() {}, JSR311Resource.class)
.addBinding()
.toInstance(resourceClazz);
}

如上所示,可以看到这里是将我们指定的resourceClazz通过Multibinder注入到了injector中,而且在注入的过程中添加了JSR311Resource.class 的注解,那么根据上面的分析可以知道,这个resourceClazz将会作为一个resource,最终通过servlet的路由匹配到响应的请求。

至此,关于整个process启动的过程中web相关的模块我们已经基本理清了。接下来我们就来通过process所支持的功能来具体的分析一下process的具体 实现吧。

overlord源码走读

overlord控制着摄入任务的分配工作,这个分配工作是在提交摄入规则的时候触发的。

Overlord的故事从什么地方说起呢?前面我们看到了MiddleManager模块的web工程启动的过程,这一块的逻辑是通用的,因此就不再赘述了,不过可以看到 Lifecycle在工程中的作用,那就是一个钩子,在Overlord中也有类似的模块,在程序启动的时候会触发执行,不过在Overlord的modules中是使用注解来hook的,下面是 SupervisorManager中的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@LifecycleStart
public void start()
{
// 从数据库中加载对应的数据完成supervisor的启动
Preconditions.checkState(!started, "SupervisorManager already started");
log.info("Loading stored supervisors from database");

synchronized (lock) {
Map<String, SupervisorSpec> supervisors = metadataSupervisorManager.getLatest();
for (Map.Entry<String, SupervisorSpec> supervisor : supervisors.entrySet()) {
final SupervisorSpec spec = supervisor.getValue();
if (!(spec instanceof NoopSupervisorSpec)) {
try {
createAndStartSupervisorInternal(spec, false);
}
catch (Exception ex) {
log.error(ex, "Failed to start supervisor: [%s]", spec.getId());
}
}
}

started = true;
}
}

上面的代码逻辑分成两块:

  • 获取数据库中存放的摄入规则
  • 根据获取到的摄入规则创建supervisor

这里我们还是按照之前的策略来纵深穿插看一下这一块的逻辑吧,获取摄入规则是使用metadataSupervisorManager这个对象获取的, 这个对象是在构造函数中通过guice装配进来的:

1
2
3
4
5
@Inject
public SupervisorManager(MetadataSupervisorManager metadataSupervisorManager)
{
this.metadataSupervisorManager = metadataSupervisorManager;
}

多插一句,这里MetadataSupervisorManager注入到injector的方式并不是通过前面看到的bind注入进来的,而是通过SPI的机制注入进来的, 具体的代码可以跟进一下在Initialization#makeInjectorWithModules方法:

1
2
3
4
final ExtensionsConfig config = baseInjector.getInstance(ExtensionsConfig.class);
for (DruidModule module : Initialization.getFromExtensions(config, DruidModule.class)) {
extensionModules.addModule(module);
}

思路已经提供了,这一块的逻辑如果有时间的话后面会补齐,不过也已经很简单明了了。

我们继续看一下metadataSupervisorManager.getLatest()执行了什么操作:

1
2
3
4
5
6
7
8
9
handle.createQuery(
StringUtils.format(
"SELECT r.spec_id, r.payload "
+ "FROM %1$s r "
+ "INNER JOIN(SELECT spec_id, max(id) as id FROM %1$s GROUP BY spec_id) latest "
+ "ON r.id = latest.id",
getSupervisorsTable()
)
)

如上所示,分析这条sql语句可以看的出来,这里是将最新的摄入规则给取出来了,这里的摄入规则就是我们提交任务之前的json体。 接下来就是supervisor的创建了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean persistSpec)
{
String id = spec.getId();
if (supervisors.containsKey(id)) {
return false;
}

if (persistSpec) {
// 保存摄入规则的元数据
metadataSupervisorManager.insert(id, spec);
}

Supervisor supervisor;
try {
// 创建supervisor对象
supervisor = spec.createSupervisor();
supervisor.start();
}
catch (Exception e) {
// Supervisor creation or start failed write tombstone only when trying to start a new supervisor
if (persistSpec) {
metadataSupervisorManager.insert(id, new NoopSupervisorSpec(null, spec.getDataSources()));
}
throw new RuntimeException(e);
}

supervisors.put(id, Pair.of(supervisor, spec));
return true;
}

上面也分成了几个过程:

  • 获取该摄入规则的id(摄入规则的id是用datasource来区分的
  • 根据是否需要持久化摄入规则来确定将这些摄入规则持久化(很明显我们刚刚从数据库中取出来,肯定是不需要持久化的)
  • 创建并启动Supervisor,这里会根据摄入规则数据源类型的不同来创建不同类型的supervisor,并将supervisor相关的信息保存在内存中

我们接下来看一下start方法的调用过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public void start()
{
synchronized (stateChangeLock) {
Preconditions.checkState(!lifecycleStarted, "already started");
Preconditions.checkState(!exec.isShutdown(), "already stopped");

// Try normal initialization first, if that fails then schedule periodic initialization retries
try {
tryInit();
}
catch (Exception e) {
if (!started) {
log.warn(
"First initialization attempt failed for SeekableStreamSupervisor[%s], starting retries...",
dataSource
);

exec.submit(
() -> {
try {
RetryUtils.retry(
() -> {
tryInit();
return 0;
},
(throwable) -> !started,
0,
MAX_INITIALIZATION_RETRIES,
null,
null
);
}
catch (Exception e2) {
log.makeAlert(
"Failed to initialize after %s retries, aborting. Please resubmit the supervisor spec to restart this supervisor [%s]",
MAX_INITIALIZATION_RETRIES,
supervisorId
).emit();
throw new RuntimeException(e2);
}
}
);
}
}
lifecycleStarted = true;
}
}

上面这一坨代码其实就做了一件事,那就是调用tryInit()方法,只不过在调用抛出异常的时候增加了失败重试的机制。我们来看一下tryInit() 做了什么操作吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@VisibleForTesting
public void tryInit()
{
synchronized (stateChangeLock) {
if (started) {
log.warn("Supervisor was already started, skipping init");
return;
}

if (stopped) {
log.warn("Supervisor was already stopped, skipping init.");
return;
}

try {
recordSupplier = setupRecordSupplier();

exec.submit(
() -> {
try {
long pollTimeout = Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS);
while (!Thread.currentThread().isInterrupted() && !stopped) {
final Notice notice = notices.poll(pollTimeout, TimeUnit.MILLISECONDS);
if (notice == null) {
continue;
}

try {
// 这一步才是真正的触发任务执行的调用
notice.handle();
}
catch (Throwable e) {
stateManager.recordThrowableEvent(e);
log.makeAlert(e, "SeekableStreamSupervisor[%s] failed to handle notice", dataSource)
.addData("noticeClass", notice.getClass().getSimpleName())
.emit();
}
}
}
catch (InterruptedException e) {
stateManager.recordThrowableEvent(e);
log.info("SeekableStreamSupervisor[%s] interrupted, exiting", dataSource);
}
}
);
firstRunTime = DateTimes.nowUtc().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);

scheduleReporting(reportingExec);

started = true;
.........
}
}

上面的过程依次是:

  • 检查supervisor状态的合法性
  • 配置消费数据的客户端的属性(对于kafka类型的摄入规则来说,这一步就是创建consumer,我们接下来会具体分析一下)
  • 构建运行的任务并提交运行,接着使用摄入规则中定义的周期周期性的定时调度来构建运行的任务

这里我们看到有创建recordSupplier = setupRecordSupplier();,不过这也是一个模板方法,真正的实现我们就挑一个kafka的吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected RecordSupplier<Integer, Long> setupRecordSupplier()
{
return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper);
}

public KafkaRecordSupplier(
Map<String, Object> consumerProperties,
ObjectMapper sortingMapper
)
{
this(getKafkaConsumer(sortingMapper, consumerProperties));
}

@VisibleForTesting
public KafkaRecordSupplier(
KafkaConsumer<byte[], byte[]> consumer
)
{
this.consumer = consumer;
}

如上所示,recordSupplier里面是封装了一个kafkaConsumer,到此为止,我们也就不再过多的深究了。

接下来我们看一下构建的任务吧,具体入口就是上面的notice.handle(),跟进下来代码最终会走到下面这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private Runnable buildRunTask()
{
return () -> notices.add(new RunNotice());
}

public void runInternal()
{
try {
possiblyRegisterListener();

stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
// TODO 这里猜测应该就是低版本的kafka连接不上存在问题的原因
if (!updatePartitionDataFromStream() && !stateManager.isAtLeastOneSuccessfulRun()) {
return; // if we can't connect to the stream and this is the first run, stop and wait to retry the connection
}

stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
discoverTasks();

updateTaskStatus();

checkTaskDuration();

checkPendingCompletionTasks();

checkCurrentTaskState();

// if supervisor is not suspended, ensure required tasks are running
// if suspended, ensure tasks have been requested to gracefully stop
if (!spec.isSuspended()) {
log.info("[%s] supervisor is running.", dataSource);

stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);
createNewTasks();
} else {
log.info("[%s] supervisor is suspended.", dataSource);
gracefulShutdownInternal();
}

.......
}

上面就是根据摄入规则运行任务的整个过程,这里我们没有提到的是通过http的方式提交任务的过程,不过这个也不是太难,具体可以查看 http接口的方式找到入口,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req)
{
return asLeaderWithSupervisorManager(
manager -> {
.....
// 这一步会创建或者更新supervisor
manager.createOrUpdateAndStartSupervisor(spec);
return Response.ok(ImmutableMap.of("id", spec.getId())).build();
}
);
}

这里就不过多的赘述http的请求流程了,总而言之一句话,通过http的方式创建任务最终也会走到上面的代码逻辑中。在上面的runInternal方法 中我们可以看到有一行createNewTasks的代码,创建任务最终就是进入到这个方法中。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void createNewTasks() throws JsonProcessingException
{
......
// 由于是新创建的任务,因此这里的任务数量必定是0
// iterate through all the current task groups and make sure each one has the desired number of replica tasks
boolean createdTask = false;
for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) {
.....
if (ioConfig.getReplicas() > taskGroup.tasks.size()) {
......
createTasksForGroup(groupId, ioConfig.getReplicas() - taskGroup.tasks.size());
createdTask = true;
}
}

if (createdTask && firstRunTime.isBeforeNow()) {
// Schedule a run event after a short delay to update our internal data structures with the new tasks that were
// just created. This is mainly for the benefit of the status API in situations where the run period is lengthy.
scheduledExec.schedule(buildRunTask(), 5000, TimeUnit.MILLISECONDS);
}
}

上面的代码中我们省略了不必要的代码,可以看到创建任务走到createTasksForGroup这个方法中了,点开这个方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private void createTasksForGroup(int groupId, int replicas)
throws JsonProcessingException
{
TaskGroup group = activelyReadingTaskGroups.get(groupId);
Map<PartitionIdType, SequenceOffsetType> startPartitions = group.startingSequences;
Map<PartitionIdType, SequenceOffsetType> endPartitions = new HashMap<>();
for (PartitionIdType partition : startPartitions.keySet()) {
endPartitions.put(partition, getEndOfPartitionMarker());
}
Set<PartitionIdType> exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups
.get(groupId)
.exclusiveStartSequenceNumberPartitions;

DateTime minimumMessageTime = group.minimumMessageTime.orNull();
DateTime maximumMessageTime = group.maximumMessageTime.orNull();

SeekableStreamIndexTaskIOConfig newIoConfig = createTaskIoConfig(
groupId,
startPartitions,
endPartitions,
group.baseSequenceName,
minimumMessageTime,
maximumMessageTime,
exclusiveStartSequenceNumberPartitions,
ioConfig
);

List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> taskList = createIndexTasks(
replicas,
group.baseSequenceName,
sortingMapper,
group.checkpointSequences,
newIoConfig,
taskTuningConfig,
rowIngestionMetersFactory
);

for (SeekableStreamIndexTask indexTask : taskList) {
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
try {
// 添加task的时候如果有异常则说明添加失败,添加成功之后,会通过定期调度来执行znode节点的创建
taskQueue.get().add(indexTask);
}
catch (EntryExistsException e) {
stateManager.recordThrowableEvent(e);
log.error("Tried to add task [%s] but it already exists", indexTask.getId());
}
} else {
log.error("Failed to get task queue because I'm not the leader!");
}
}
}

上面的代码中可以看到首先是摄入任务的一些正常配置,接下来会获取到taskMaster的taskQueue,这里就是新建任务的核心内容了, 也是整个业务的关键,其主要思想就是通过taskQueue实现任务的生产和消费,这个也算是一种很常用的模式了,生产的话就是将新建的任务 存放到队列中,消费的话就是将任务从队列中取出来,并跟新到zookeeper节点下面,催生peon进程了,上面我们看到了生产任务的过程, 消费任务的话要从taskMatser看起了,不过taskMaster也仅仅是任务的入口,并不是真正消费任务的地方,其中的核心代码就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
this.leadershipListener = new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
{
giant.lock();

// I AM THE MASTER OF THE UNIVERSE.
log.info("By the power of Grayskull, I have the power!");

try {
taskLockbox.syncFromStorage();
taskRunner = runnerFactory.build();
taskQueue = new TaskQueue(
taskLockConfig,
taskQueueConfig,
taskStorage,
taskRunner,
taskActionClientFactory,
taskLockbox,
emitter
);

// Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle("task-master");
if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) {
log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition")
.emit();
}

leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);

leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start()
{
initialized = true;
serviceAnnouncer.announce(node);
}

@Override
public void stop()
{
serviceAnnouncer.unannounce(node);
}
}
);

leaderLifecycle.start();
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
giant.unlock();
}
}

@Override
public void stopBeingLeader()
{
giant.lock();
try {
initialized = false;
final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null);

if (leaderLifecycle != null) {
leaderLifecycle.stop();
}
}
finally {
giant.unlock();
}
}
}

上面的代码中我们可以看到taskMaster首先是使用了zookeeper进行选主操作,一旦选出来之后,就会执行一些初始化的操作,这些初始化的 操作是leaderLifecycle.addManagedInstance完成的,上文也有解释leaderLifecycle像是一个钩子,会自动的初始化一些操作, 这里我们可以跟进一下TaskQueue的start方法,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@LifecycleStart
public void start()
{
giant.lock();

try {
Preconditions.checkState(!active, "queue must be stopped");
active = true;
syncFromStorage();
managerExec.submit(
new Runnable()
{
@Override
public void run()
{
while (true) {
try {
manage();
break;
}
......
);
ScheduledExecutors.scheduleAtFixedRate(
storageSyncExec,
config.getStorageSyncRate(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
try {
syncFromStorage();
}
......
}
}
);
managementMayBeNecessary.signalAll();
}
finally {
giant.unlock();
}
}

上面的代码中我们看到主要是两块的业务逻辑manage()、syncFromStorage(),当前我们是跟进任务的创建,因此下面的代码可以暂时搁置。 我们看一下manage执行了什么操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
private void manage() throws InterruptedException
{
log.info("Beginning management in %s.", config.getStartDelay());
Thread.sleep(config.getStartDelay().getMillis());

// Ignore return value- we'll get the IDs and futures from getKnownTasks later.
taskRunner.restore();

while (active) {
giant.lock();

try {
// Task futures available from the taskRunner
final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();
for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
}
// Attain futures for all active tasks (assuming they are ready to run).
// Copy tasks list, as notifyStatus may modify it.
for (final Task task : ImmutableList.copyOf(tasks)) {
if (!taskFutures.containsKey(task.getId())) {
final ListenableFuture<TaskStatus> runnerTaskFuture;
if (runnerTaskFutures.containsKey(task.getId())) {
runnerTaskFuture = runnerTaskFutures.get(task.getId());
} else {
// Task should be running, so run it.
final boolean taskIsReady;
try {
taskIsReady = task.isReady(taskActionClientFactory.create(task));
}
catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());
continue;
}
if (taskIsReady) {
log.info("Asking taskRunner to run: %s", task.getId());
runnerTaskFuture = taskRunner.run(task);
} else {
continue;
}
}
taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
} else if (isTaskPending(task)) {
// if the taskFutures contain this task and this task is pending, also let the taskRunner
// to run it to guarantee it will be assigned to run
// see https://github.com/apache/druid/pull/6991
taskRunner.run(task);
}
}
// Kill tasks that shouldn't be running
final Set<String> tasksToKill = Sets.difference(
runnerTaskFutures.keySet(),
ImmutableSet.copyOf(
Lists.transform(
tasks,
new Function<Task, Object>()
{
@Override
public String apply(Task task)
{
return task.getId();
}
}
)
)
);
if (!tasksToKill.isEmpty()) {
log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
for (final String taskId : tasksToKill) {
try {
taskRunner.shutdown(
taskId,
"task is not in runnerTaskFutures[%s]",
runnerTaskFutures.keySet()
);
}
catch (Exception e) {
log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
}
}
}
// awaitNanos because management may become necessary without this condition signalling,
// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
}
finally {
giant.unlock();
}
}
}

上面的代码是一个死循环,主要的操作就是taskRunner.run(task);这一行代码,这里的taskRunner是一个接口,我们可以 任选一个实现累来看一下任务创建的过程是怎么样的,这里我选了RemoteTaskRunner,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
final RemoteTaskRunnerWorkItem completeTask, runningTask, pendingTask;
if ((pendingTask = pendingTasks.get(task.getId())) != null) {
log.info("Assigned a task[%s] that is already pending!", task.getId());
runPendingTasks();
return pendingTask.getResult();
} else if ((runningTask = runningTasks.get(task.getId())) != null) {
ZkWorker zkWorker = findWorkerRunningTask(task.getId());
if (zkWorker == null) {
log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());
} else {
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
if (announcement.getTaskStatus().isComplete()) {
taskComplete(runningTask, zkWorker, announcement.getTaskStatus());
}
}
return runningTask.getResult();
} else if ((completeTask = completeTasks.get(task.getId())) != null) {
return completeTask.getResult();
} else {
return addPendingTask(task).getResult();
}
}

上面有一行代码runPendingTasks,跟进该代码可以发现有一行代码tryAssignTask(task, taskRunnerWorkItem),该过程是一个阻塞的过程, 核心代码也只有一行:announceTask,该方法的目的就是在zookeeper上发布一个任务,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private boolean announceTask(
final Task task,
final ZkWorker theZkWorker,
final RemoteTaskRunnerWorkItem taskRunnerWorkItem
) throws Exception
{
final String worker = theZkWorker.getWorker().getHost();
synchronized (statusLock) {
if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {
// the worker might have been killed or marked as lazy
log.info("Not assigning task to already removed worker[%s]", worker);
return false;
}
log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());

/**
* 创建znode
*/
CuratorUtils.createIfNotExists(
cf,
JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),
CreateMode.EPHEMERAL,
jsonMapper.writeValueAsBytes(task),
config.getMaxZnodeBytes()
);

RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());
if (workItem == null) {
log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
.addData("taskId", task.getId())
.emit();
return false;
}

RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
runningTasks.put(task.getId(), newWorkItem);
log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));

// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
Stopwatch timeoutStopwatch = Stopwatch.createStarted();
while (!isWorkerRunningTask(theZkWorker, task.getId())) {
final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= waitMs) {
log.makeAlert(
"Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!",
worker,
task.getId(),
elapsed,
config.getTaskAssignmentTimeout()
).emit();
taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
break;
}
}
return true;
}
}

至此,提交摄入任务并在zookeeper上面发布任务的正流程就结束了,不过这个时候任务并没有被peon进程领取并执行,另外还包含一些细节,如 taskMaster故障之后重新选举任务的重建过程是怎么样的。这些就留着以后有时间再来分析了吧。

broker源码走读

上面的过程中我们简单的看了一下提交摄入任务的整体流程,也看到了任务是如何在zookeeper上进行发布的,也就是数据的摄入是如何执行的, 另外一个核心的问题就是数据的查询是什么流程了,这个我们可以采用同样的方式,从http接口的方式来分析具体的实现,具体入口如下:

MiddleManager源码走读

在真正走读代码之前我们还是先回顾一下MiddleManager的功能,并通过功能作为入口一点一点的深入代码的解析。首先从官网上我们知道middleManager 是用来摄入数据的,数据的摄入又分为以下几个步骤:

  • 对于一个append类型的任务来说,会调用Overload上的allocate接口来生成segment的唯一标识,对于一个覆盖写的任务来说这是通过锁定一个时间区间并 创建一个新的版本号来实现的
  • 如果是一个realtime类型的摄入任务的话,segment马上就可以查到了,但是当前的segment并未发布
  • 当完成了当前segment数据的摄入之后,将会把segment发不到深度存储中,并且记录当前segment的元数据,如果是realtime类型的task,那么还会 等待Historical进程来加载数据,否则的话segment会马上存在与historical进程所在的节点上

知道了MiddleManager的功能之后,我们就可以通过跟踪代码来看一下具体的流程了。

小结

时间关系,关于druid启动的流程我们的分析就到此为止了,后面会专门针对具体的process来查看具体的功能代码。

Druid中用到的一些技术

问题记录:

1、生产环境中realtime的内存设置为1G,结果发现对于两个字段的groupby操作经常会将实时摄入的节点打卦, 后面经过每秒上报数据评估以及SQL查询的条件估算实际,对应时间区间的内存大小 为3G,因此调整之后解决了查询数据量将节点打卦的情况,但是依然留存一个问题,就是查询效率,因为 在定制sql的时候,采用了两个字段的groupBy的操作,结果发现,相较于一个字段的group by, 两个字段进行groupby明显要慢很多。后面查阅资料发现,单个字段的groupBY会被优化成topn查询,因此,我 们的业务将需要进行两个字段的groupby采用transform的方式在摄入的时候将其拼接成一个字段,这样针对这一个字段进行 groupBY,效率明显提升上去了,大致从原来的20s提升为2s。

另外在实际操作的过程中,由于我们数据摄入是从kafka中获取到的,而kafka中消息是一个宽消息,对应为null或者为空 字符串的字段在数据摄入的时候会被转换成0(转换的过程中会抛出大量的process error,但是结果并不会受到影响), 由于查询需要,我这边需要把非0的数据过滤出来,因此在sql查询的时候是通过!=0的方式来过滤的,结果很奇怪,查询结果中 竟然包含了为0 的选项,因此我又试了一下>0的选项,结果发现这次筛选出来的数据完全满足要求了,最后我又筛选了=0的选项, 从整体的条数来判断,=0的数据量也是对的上的,因此,对于>0的选项还不清楚为什么这种查询条件会把=0的数据给查询出来。 有待跟进