diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index cca6bc6..9435ca9 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -380,6 +380,12 @@ public void serviceStart() throws Exception { } getConfig().setInt(ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT.varname, LlapOutputFormatService.get().getPort()); + // Ensuire this is set in the config so that the AM can read it. + getConfig() + .setIfUnset(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, + ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE + .getDefaultValue()); + this.registry.init(getConfig()); this.registry.start(); LOG.info( diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 3c0a661..dc594a2 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -134,9 +134,9 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting final DelayQueue delayedTaskQueue = new DelayQueue<>(); + private volatile boolean dagRunning = false; private final ContainerFactory containerFactory; - private final Random random = new Random(); @VisibleForTesting final Clock clock; @@ -167,20 +167,14 @@ public int compare(Priority o1, Priority o2) { private final NodeBlacklistConf nodeBlacklistConf; private final LocalityDelayConf localityDelayConf; - // Per daemon - private final int memoryPerInstance; - private final int coresPerInstance; - private final int executorsPerInstance; - private final int numSchedulableTasksPerNode; - // Per Executor Thread - private final Resource resourcePerExecutor; // when there are no live nodes in the cluster and this timeout elapses the query is failed private final long timeout; private final Lock timeoutLock = new ReentrantLock(); private final ScheduledExecutorService timeoutExecutor; + private final ScheduledExecutorService scheduledLoggingExecutor; private final SchedulerTimeoutMonitor timeoutMonitor; private ScheduledFuture timeoutFuture; @@ -221,9 +215,6 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock // TODO HIVE-13483 Get all of these properties from the registry. This will need to take care of different instances // publishing potentially different values when we support changing configurations dynamically. // For now, this can simply be fetched from a single registry instance. - this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB); - this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE); - this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); this.nodeBlacklistConf = new NodeBlacklistConf( HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS, TimeUnit.MILLISECONDS), @@ -247,9 +238,9 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock .build()); this.timeoutFuture = null; - int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); - int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); - this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); + this.scheduledLoggingExecutor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimedLogThread") + .build()); String instanceId = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); @@ -280,19 +271,16 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock String sessionId = conf.get("llap.daemon.metrics.sessionid"); // TODO: Not sure about the use of this. Should we instead use workerIdentity as sessionId? this.metrics = LlapTaskSchedulerMetrics.create(displayName, sessionId); - this.metrics.setNumExecutors(executorsPerInstance); - this.metrics.setMemoryPerInstance(memoryPerInstance * 1024L * 1024L); - this.metrics.setCpuCoresPerInstance(coresPerExecutor); } else { this.metrics = null; this.pauseMonitor = null; } - LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance - + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" - + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor - + ", nodeBlacklistConf=" + nodeBlacklistConf - + ", localityDelayMs=" + localityDelayMs); + String hostsString = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + LOG.info( + "Running with configuration: hosts={}, numSchedulableTasksPerNode={}, nodeBlacklistConf={}, localityConf={}", + hostsString, numSchedulableTasksPerNode, nodeBlacklistConf, localityDelayConf); + } @Override @@ -304,6 +292,21 @@ public void initialize() { public void start() throws IOException { writeLock.lock(); try { + scheduledLoggingExecutor.schedule(new Callable() { + @Override + public Void call() throws Exception { + readLock.lock(); + try { + if (dagRunning) { + LOG.info("Stats for current dag: {}", dagStats); + } + } finally { + readLock.unlock(); + } + return null; + } + }, 10000L, TimeUnit.MILLISECONDS); + nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable); Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG)); @@ -344,12 +347,6 @@ public void onUpdate(ServiceInstance serviceInstance) { LOG.warn( "Not expecing Updates from the registry. Received update for instance={}. Ignoring", serviceInstance); -// Replacing NodeInfo means we end up discarding whatever state was known about that node. -// instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance, -// nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics)); -// -// -// LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity()); } @Override @@ -408,6 +405,8 @@ public void shutdown() { writeLock.lock(); try { if (!this.isStopped.getAndSet(true)) { + scheduledLoggingExecutor.shutdownNow(); + nodeEnablerCallable.shutdown(); if (nodeEnablerFuture != null) { nodeEnablerFuture.cancel(true); @@ -526,7 +525,13 @@ public void dagComplete() { if (metrics != null) { metrics.incrCompletedDagCount(); } - dagStats = new StatsPerDag(); + writeLock.lock(); + try { + dagRunning = false; + dagStats = new StatsPerDag(); + } finally { + writeLock.unlock(); + } // TODO Cleanup pending tasks etc, so that the next dag is not affected. } @@ -554,6 +559,7 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin priority, capability, Arrays.toString(hosts)); writeLock.lock(); try { + dagRunning = true; dagStats.registerTaskRequest(hosts, racks); } finally { writeLock.unlock(); @@ -573,6 +579,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container priority, capability, containerId); writeLock.lock(); try { + dagRunning = true; dagStats.registerTaskRequest(null, null); } finally { writeLock.unlock(); @@ -621,7 +628,6 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd taskInfo.getState(), endReason); // Re-enable the node if preempted if (taskInfo.getState() == TaskInfo.State.PREEMPTED) { - LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason); unregisterPendingPreemption(taskInfo.assignedNode.getHost()); nodeInfo.registerUnsuccessfulTaskEnd(true); if (nodeInfo.isDisabled()) { @@ -1051,7 +1057,10 @@ protected void schedulePendingTasks() { } taskInfo.triedAssigningTask(); ScheduleResult scheduleResult = scheduleTask(taskInfo, totalResource); - LOG.info("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult); + if (LOG.isDebugEnabled()) { + LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, + scheduleResult); + } if (scheduleResult == ScheduleResult.SCHEDULED) { taskIter.remove(); } else { @@ -1185,13 +1194,13 @@ private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource) { if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) { NodeInfo nodeInfo = selectHostResult.nodeInfo; Container container = - containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, + containerFactory.createContainer(nodeInfo.getResourcePerExecutor(), taskInfo.priority, nodeInfo.getHost(), nodeInfo.getRpcPort(), nodeInfo.getServiceAddress()); writeLock.lock(); // While updating local structures try { - LOG.info("Assigned task={} on node={}, to container={} on node={}", + LOG.info("Assigned task={} on node={}, to container={}", taskInfo, nodeInfo.toShortString(), container.getId()); dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, nodeInfo.getHost()); @@ -1498,6 +1507,7 @@ public void shutdown() { private int numScheduledTasks = 0; private final int numSchedulableTasks; private final LlapTaskSchedulerMetrics metrics; + private final Resource resourcePerExecutor; /** * Create a NodeInfo bound to a service instance @@ -1516,6 +1526,11 @@ public void shutdown() { this.clock = clock; this.metrics = metrics; + int numVcores = serviceInstance.getResource().getVirtualCores(); + int memoryPerInstance = serviceInstance.getResource().getMemory(); + int memoryPerExecutor = (int)(memoryPerInstance / (double) numVcores); + resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 1); + if (numSchedulableTasksConf == 0) { int pendingQueueuCapacity = 0; String pendingQueueCapacityString = serviceInstance.getProperties() @@ -1526,7 +1541,7 @@ public void shutdown() { if (pendingQueueCapacityString != null) { pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString); } - this.numSchedulableTasks = serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity; + this.numSchedulableTasks = numVcores + pendingQueueuCapacity; } else { this.numSchedulableTasks = numSchedulableTasksConf; LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); @@ -1553,6 +1568,10 @@ String getServiceAddress() { return serviceInstance.getServicesAddress(); } + public Resource getResourcePerExecutor() { + return resourcePerExecutor; + } + void enableNode() { expireTimeMillis = -1; disabled = false; @@ -1634,21 +1653,36 @@ boolean hadCommFailure() { return hadCommFailure; } + int canAcceptCounter = 0; /* Returning true does not guarantee that the task will run, considering other queries may be running in the system. Also depends upon the capacity usage configuration */ boolean canAcceptTask() { boolean result = !hadCommFailure && !disabled &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0)); - if (LOG.isDebugEnabled()) { - LOG.debug("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " + - serviceInstance.getWorkerIdentity() + "]: " + - "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}", - result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled); + if (LOG.isTraceEnabled()) { + LOG.trace(constructCanAcceptLogResult(result)); + } + if (canAcceptCounter == 10000) { + canAcceptCounter++; + LOG.info(constructCanAcceptLogResult(result)); + canAcceptCounter = 0; } return result; } + String constructCanAcceptLogResult(boolean result) { + StringBuilder sb = new StringBuilder(); + sb.append("Node[").append(serviceInstance.getHost()).append(":").append(serviceInstance.getRpcPort()) + .append(", ").append(serviceInstance.getWorkerIdentity()).append("]: ") + .append("canAcceptTask=").append(result) + .append(", numScheduledTasks=").append(numScheduledTasks) + .append(", numSchedulableTasks=").append(numSchedulableTasks) + .append(", hadCommFailure=").append(hadCommFailure) + .append(", disabled=").append(disabled); + return sb.toString(); + } + @Override public long getDelay(TimeUnit unit) { return unit.convert(expireTimeMillis - clock.getTime(), TimeUnit.MILLISECONDS); @@ -1911,7 +1945,7 @@ public String toString() { ", priority=" + priority + ", startTime=" + startTime + ", containerId=" + containerId + - ", assignedNode=" + (assignedNode == null ? "" : assignedNode.toShortString()) + + (assignedNode != null ? "assignedNode=" + assignedNode.toShortString() : "") + ", uniqueId=" + uniqueId + ", localityDelayTimeout=" + localityDelayTimeout + '}';