@Command( name = "middleManager", description = "Runs a Middle Manager, this is a \"task\" node used as part of the remote indexing service, see https://druid.apache.org/docs/latest/design/middlemanager.html for a description" ) publicclassCliMiddleManagerextendsServerRunnable
这也就是说,当我们想要运行middleManager的时候,向main函数传递的参数应该是 main server middleManager,类比其他的process
我们也就很好理解了,具体代码如下:
List<Class<? extends Runnable>> serverCommands = Arrays.asList( CliCoordinator.class, CliHistorical.class, CliBroker.class, CliOverlord.class, CliIndexer.class, CliMiddleManager.class, CliRouter.class ); // group可以认为是命令行的前缀 builder.withGroup("server") .withDescription("Run one of the Druid server types.") .withDefaultCommand(Help.class) .withCommands(serverCommands);
List<Class<? extends Runnable>> toolCommands = Arrays.asList( DruidJsonValidator.class, PullDependencies.class, CreateTables.class, DumpSegment.class, ResetCluster.class, ValidateSegments.class, ExportMetadata.class ); builder.withGroup("tools") .withDescription("Various tools for working with Druid") .withDefaultCommand(Help.class) .withCommands(toolCommands);
builder.withGroup("index") .withDescription("Run indexing for druid") .withDefaultCommand(Help.class) .withCommands(CliHadoopIndexer.class);
builder.withGroup("internal") .withDescription("Processes that Druid runs \"internally\", you should rarely use these directly") .withDefaultCommand(Help.class) .withCommands(CliPeon.class, CliInternalHadoopIndexer.class);
final Runnable command = cli.parse(args); if (!(command instanceof Help)) { // Hack to work around Help not liking being injected // 这里会将command对应的对象进行注入进来 injector.injectMembers(command); } // 这里最终是会调用对应的组件,如middlemanager的run方法的 command.run();
if (startupLoggingConfig.isLogProperties()) { final Set<String> maskProperties = Sets.newHashSet(startupLoggingConfig.getMaskProperties()); final Properties props = injector.getInstance(Properties.class);
// @provides注解是用来生成java bean的一种方式,这里也就是生成整个工程所需要的lifecycle对象 @Provides@LazySingleton public Lifecycle getLifecycle(final Injector injector) { final Key<Set<KeyHolder>> keyHolderKey = Key.get(new TypeLiteral<Set<KeyHolder>>(){}, Names.named("lifecycle")); final Set<KeyHolder> eagerClasses = injector.getInstance(keyHolderKey);
Lifecycle lifecycle = new Lifecycle("module") { @Override // 这里是所有的组件最终的入口,可以看到调用了lifecycle的start方法之后该实例化的对象都实例化了 publicvoidstart()throws Exception { for (KeyHolder<?> holder : eagerClasses) { // 初始化工厂中的对象 injector.getInstance(holder.getKey()); // Pull the key so as to "eagerly" load up the class. } super.start(); } };
@Provides @LazySingleton public Server getServer( final Injector injector, final Lifecycle lifecycle, @Self final DruidNode node, final ServerConfig config, final TLSServerConfig TLSServerConfig ) { return makeAndInitializeServer( injector, lifecycle, node, config, TLSServerConfig, injector.getExistingBinding(Key.get(SslContextFactory.class)), injector.getInstance(TLSCertificateChecker.class) ); }
static Server makeAndInitializeServer( Injector injector, Lifecycle lifecycle, DruidNode node, ServerConfig config, TLSServerConfig tlsServerConfig, Binding<SslContextFactory> sslContextFactoryBinding, TLSCertificateChecker certificateChecker ) { // adjusting to make config.getNumThreads() mean, "number of threads // that concurrently handle the requests". int numServerThreads = config.getNumThreads() + getMaxJettyAcceptorsSelectorsNum(node);
final QueuedThreadPool threadPool; if (config.getQueueSize() == Integer.MAX_VALUE) { threadPool = new QueuedThreadPool(); threadPool.setMinThreads(numServerThreads); threadPool.setMaxThreads(numServerThreads); } else { threadPool = new QueuedThreadPool( numServerThreads, numServerThreads, 60000, // same default is used in other case when threadPool = new QueuedThreadPool() new LinkedBlockingQueue<>(config.getQueueSize()) ); }
threadPool.setDaemon(true);
final Server server = new Server(threadPool);
// Without this bean set, the default ScheduledExecutorScheduler runs as non-daemon, causing lifecycle hooks to fail // to fire on main exit. Related bug: https://github.com/apache/druid/pull/1627 server.addBean(new ScheduledExecutorScheduler("JettyScheduler", true), true);
final List<ServerConnector> serverConnectors = new ArrayList<>();
if (node.isEnablePlaintextPort()) { log.info("Creating http connector with port [%d]", node.getPlaintextPort()); HttpConfiguration httpConfiguration = new HttpConfiguration(); if (config.isEnableForwardedRequestCustomizer()) { httpConfiguration.addCustomizer(new ForwardedRequestCustomizer()); }
httpConfiguration.setRequestHeaderSize(config.getMaxRequestHeaderSize()); final ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration)); if (node.isBindOnHost()) { connector.setHost(node.getHost()); } connector.setPort(node.getPlaintextPort()); serverConnectors.add(connector); } .....
final ServerConnector[] connectors = new ServerConnector[serverConnectors.size()]; int index = 0; for (ServerConnector connector : serverConnectors) { connectors[index++] = connector; connector.setIdleTimeout(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis())); // workaround suggested in - // https://bugs.eclipse.org/bugs/show_bug.cgi?id=435322#c66 for jetty half open connection issues during failovers connector.setAcceptorPriorityDelta(-1);
List<ConnectionFactory> monitoredConnFactories = new ArrayList<>(); for (ConnectionFactory cf : connector.getConnectionFactories()) { // we only want to monitor the first connection factory, since it will pass the connection to subsequent // connection factories (in this case HTTP/1.1 after the connection is unencrypted for SSL) if (cf.getProtocol().equals(connector.getDefaultProtocol())) { monitoredConnFactories.add(new JettyMonitoringConnectionFactory(cf, ACTIVE_CONNECTIONS)); } else { monitoredConnFactories.add(cf); } } connector.setConnectionFactories(monitoredConnFactories); }
stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM); // TODO 这里猜测应该就是低版本的kafka连接不上存在问题的原因 if (!updatePartitionDataFromStream() && !stateManager.isAtLeastOneSuccessfulRun()) { return; // if we can't connect to the stream and this is the first run, stop and wait to retry the connection }
// if supervisor is not suspended, ensure required tasks are running // if suspended, ensure tasks have been requested to gracefully stop if (!spec.isSuspended()) { log.info("[%s] supervisor is running.", dataSource);
privatevoidcreateNewTasks()throws JsonProcessingException { ...... // 由于是新创建的任务,因此这里的任务数量必定是0 // iterate through all the current task groups and make sure each one has the desired number of replica tasks boolean createdTask = false; for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) { ..... if (ioConfig.getReplicas() > taskGroup.tasks.size()) { ...... createTasksForGroup(groupId, ioConfig.getReplicas() - taskGroup.tasks.size()); createdTask = true; } }
if (createdTask && firstRunTime.isBeforeNow()) { // Schedule a run event after a short delay to update our internal data structures with the new tasks that were // just created. This is mainly for the benefit of the status API in situations where the run period is lengthy. scheduledExec.schedule(buildRunTask(), 5000, TimeUnit.MILLISECONDS); } }
// Sensible order to start stuff: final Lifecycle leaderLifecycle = new Lifecycle("task-master"); if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") .emit(); }
privatevoidmanage()throws InterruptedException { log.info("Beginning management in %s.", config.getStartDelay()); Thread.sleep(config.getStartDelay().getMillis());
// Ignore return value- we'll get the IDs and futures from getKnownTasks later. taskRunner.restore();
while (active) { giant.lock();
try { // Task futures available from the taskRunner final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>(); for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) { runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); } // Attain futures for all active tasks (assuming they are ready to run). // Copy tasks list, as notifyStatus may modify it. for (final Task task : ImmutableList.copyOf(tasks)) { if (!taskFutures.containsKey(task.getId())) { final ListenableFuture<TaskStatus> runnerTaskFuture; if (runnerTaskFutures.containsKey(task.getId())) { runnerTaskFuture = runnerTaskFutures.get(task.getId()); } else { // Task should be running, so run it. finalboolean taskIsReady; try { taskIsReady = task.isReady(taskActionClientFactory.create(task)); } catch (Exception e) { log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass()); continue; } if (taskIsReady) { log.info("Asking taskRunner to run: %s", task.getId()); runnerTaskFuture = taskRunner.run(task); } else { continue; } } taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture)); } elseif (isTaskPending(task)) { // if the taskFutures contain this task and this task is pending, also let the taskRunner // to run it to guarantee it will be assigned to run // see https://github.com/apache/druid/pull/6991 taskRunner.run(task); } } // Kill tasks that shouldn't be running final Set<String> tasksToKill = Sets.difference( runnerTaskFutures.keySet(), ImmutableSet.copyOf( Lists.transform( tasks, new Function<Task, Object>() { @Override public String apply(Task task) { return task.getId(); } } ) ) ); if (!tasksToKill.isEmpty()) { log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size()); for (final String taskId : tasksToKill) { try { taskRunner.shutdown( taskId, "task is not in runnerTaskFutures[%s]", runnerTaskFutures.keySet() ); } catch (Exception e) { log.warn(e, "TaskRunner failed to clean up task: %s", taskId); } } } // awaitNanos because management may become necessary without this condition signalling, // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox. managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS); } finally { giant.unlock(); } } }
privatebooleanannounceTask( final Task task, final ZkWorker theZkWorker, final RemoteTaskRunnerWorkItem taskRunnerWorkItem )throws Exception { final String worker = theZkWorker.getWorker().getHost(); synchronized (statusLock) { if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) { // the worker might have been killed or marked as lazy log.info("Not assigning task to already removed worker[%s]", worker); returnfalse; } log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId()); if (workItem == null) { log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!") .addData("taskId", task.getId()) .emit(); returnfalse; }
RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null); runningTasks.put(task.getId(), newWorkItem); log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost()); TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running // on a worker - this avoids overflowing a worker with tasks Stopwatch timeoutStopwatch = Stopwatch.createStarted(); while (!isWorkerRunningTask(theZkWorker, task.getId())) { finallong waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis(); statusLock.wait(waitMs); long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS); if (elapsed >= waitMs) { log.makeAlert( "Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!", worker, task.getId(), elapsed, config.getTaskAssignmentTimeout() ).emit(); taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId())); break; } } returntrue; } }