diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c98f1ca..3ec235b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2485,6 +2485,11 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { new TimeValidator(TimeUnit.MILLISECONDS), "Amount of time to wait on connection failures to the AM from an LLAP daemon before\n" + "considering the AM to be dead.", "llap.am.liveness.connection.timeout-millis"), + LLAP_DAEMON_SCHEDULER_NO_NODES_ALIVE_TIMEOUT_MS( + "hive.llap.daemon.scheduler.no.nodes.alive.timeout.ms", "60000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Amount of time to wait before failing the query when there are no llap daemons running\n" + + "(alive) in the cluster.", "llap.daemon.scheduler.no.nodes.alive.timeout.ms"), // Not used yet - since the Writable RPC engine does not support this policy. LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS( "hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms", diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index e76615e..f79cafa 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -151,6 +151,15 @@ public int compare(Priority o1, Priority o2) { // Per Executor Thread private final Resource resourcePerExecutor; + // when there are no live nodes in the cluster and this timeout elapses the query is failed + private final long scheduleTimeout; + private final Lock timeoutLock = new ReentrantLock(); + private final Condition noNodesCondition = timeoutLock.newCondition(); + private final Condition someNodesCondition = timeoutLock.newCondition(); + private final ListeningExecutorService schedulerTimeoutExecutor; + private final SchedulerTimeoutMonitor schedulerTimeoutMonitor = new SchedulerTimeoutMonitor(); + private volatile ListenableFuture schedulerTimeoutMonitorFuture; + private final LlapRegistryService registry = new LlapRegistryService(false); private volatile ListenableFuture nodeEnablerFuture; @@ -200,6 +209,13 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock this.forceLocation = false; } + this.scheduleTimeout = HiveConf.getTimeVar(conf, + ConfVars.LLAP_DAEMON_SCHEDULER_NO_NODES_ALIVE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + ExecutorService schedulerTimeoutThreadPool = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimeoutMonitor") + .build()); + schedulerTimeoutExecutor = MoreExecutors.listeningDecorator(schedulerTimeoutThreadPool); + int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); @@ -258,6 +274,18 @@ public void onFailure(Throwable t) { LOG.warn("SchedulerThread exited with error", t); } }); + schedulerTimeoutMonitorFuture = schedulerTimeoutExecutor.submit(schedulerTimeoutMonitor); + Futures.addCallback(schedulerTimeoutMonitorFuture, new FutureCallback() { + @Override + public void onSuccess(Void result) { + LOG.info("SchedulerTimeoutMonitorThread exited"); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("SchedulerTimeoutMonitorThread exited with error", t); + } + }); registry.start(); registry.setServiceInstanceStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); @@ -290,6 +318,16 @@ public void onUpdate(final ServiceInstance serviceInstance) { public void onRemove(final ServiceInstance serviceInstance) { instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity()); + // there are no more nodes. Signal timeout monitor to start timer + try { + timeoutLock.lock(); + if (instanceToNodeMap.isEmpty()) { + LOG.info("No node found. Signalling scheduler timeout monitor thread to start timer."); + someNodesCondition.signal(); + } + } finally { + timeoutLock.unlock(); + } } } @@ -304,6 +342,12 @@ public void shutdown() { } nodeEnabledExecutor.shutdownNow(); + schedulerTimeoutMonitor.shutdown(); + if (schedulerTimeoutMonitorFuture != null) { + schedulerTimeoutMonitorFuture.cancel(true); + } + schedulerTimeoutExecutor.shutdownNow(); + schedulerCallable.shutdown(); if (schedulerFuture != null) { schedulerFuture.cancel(true); @@ -559,13 +603,6 @@ private SelectHostResult selectHost(TaskInfo request) { String[] requestedHosts = request.requestedHosts; readLock.lock(); // Read-lock. Not updating any stats at the moment. try { - // Check if any hosts are active. - if (getAvailableResources().getMemory() <= 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Refreshing instances since total memory is 0"); - } - } - // If there's no memory available, fail if (getTotalResources().getMemory() <= 0) { return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY; @@ -639,7 +676,7 @@ private void scanForNodeChanges() { if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) { /* that's a good node, not added to the allocations yet */ LOG.info("Found a new node: " + inst + "."); - addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); } } } finally { @@ -650,6 +687,16 @@ private void scanForNodeChanges() { private void addNode(ServiceInstance inst, NodeInfo node) { LOG.info("Adding node: " + inst); instanceToNodeMap.put(inst.getWorkerIdentity(), node); + // we have just added a new node. Signal timeout monitor to reset timer + try { + timeoutLock.lock(); + if (instanceToNodeMap.size() == 1) { + LOG.info("New node added. Signalling scheduler timeout monitor thread to reset timer."); + noNodesCondition.signal(); + } + } finally { + timeoutLock.unlock(); + } // Trigger scheduling since a new node became available. trySchedulingPendingTasks(); } @@ -1051,6 +1098,37 @@ private void trySchedulingPendingTasks() { } } + private class SchedulerTimeoutMonitor implements Callable { + private AtomicBoolean isShutdown = new AtomicBoolean(false); + + @Override + public Void call() throws Exception { + while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { + try { + timeoutLock.lock(); + while (!instanceToNodeMap.isEmpty()) { + someNodesCondition.await(); + } + while (instanceToNodeMap.isEmpty()) { + final boolean notified = noNodesCondition.await(scheduleTimeout, TimeUnit.MILLISECONDS); + // to prevent spurious wake up check again for size of the map + if (!notified && instanceToNodeMap.isEmpty()) { + getContext().onError(new RuntimeException("No LLAP daemons are running")); + } + } + } finally { + timeoutLock.unlock(); + } + } + return null; + } + + // Call this first, then send in an interrupt to the thread. + public void shutdown() { + isShutdown.set(true); + } + } + private class SchedulerCallable implements Callable { private AtomicBoolean isShutdown = new AtomicBoolean(false);