diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 95c5c0e..92ef462 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2660,6 +2660,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Sleep duration while waiting to retry connection failures to the AM from the daemon for\n" + "the general keep-alive thread (milliseconds).", "llap.am.liveness.connection.sleep-between-retries-millis"), + LLAP_DAEMON_SCHEDULER_TIMEOUT_SECONDS( + "hive.llap.daemon.scheduler.timeout.seconds", "60s", + new TimeValidator(TimeUnit.SECONDS), + "Amount of time to wait before failing the query when there are no llap daemons running\n" + + "(alive) in the cluster.", "llap.daemon.scheduler.timeout.seconds"), LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4, "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" + "executed in parallel.", "llap.daemon.num.executors"), diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java index 73f94f3..99ead9b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java @@ -55,4 +55,10 @@ */ public Set getByHost(String host); + /** + * Get number of instances in the currently availabe. + * + * @return - number of instances + */ + public int size(); } \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 8cace8f..440363a 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -221,6 +221,11 @@ public ServiceInstance getInstance(String name) { } return byHost; } + + @Override + public int size() { + return instances.size(); + } } @Override diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index ba38fb8..33ee8ed 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -478,6 +478,11 @@ public ServiceInstance getInstance(String name) { } return byHost; } + + @Override + public int size() { + return instancesCache.getCurrentData().size(); + } } private class InstanceStateChangeListener implements PathChildrenCacheListener { diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index b57ae1a..66e5e15 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -129,7 +130,7 @@ public int compare(Priority o1, Priority o2) { private final Lock scheduleLock = new ReentrantLock(); private final Condition scheduleCondition = scheduleLock.newCondition(); - private final AtomicBoolean pendingScheduleInvodations = new AtomicBoolean(false); + private final AtomicBoolean pendingScheduleInvocations = new AtomicBoolean(false); private final ListeningExecutorService schedulerExecutor; private final SchedulerCallable schedulerCallable = new SchedulerCallable(); @@ -151,6 +152,14 @@ public int compare(Priority o1, Priority o2) { // Per Executor Thread private final Resource resourcePerExecutor; + // when there are no live nodes in the cluster and this timeout elapses the query is failed + private final long scheduleTimeout; + private final Lock timeoutLock = new ReentrantLock(); + private final Condition noNodesCondition = timeoutLock.newCondition(); + private final ListeningExecutorService schedulerTimeoutExecutor; + private final SchedulerTimeoutMonitor schedulerTimeoutMonitor = new SchedulerTimeoutMonitor(); + private volatile ListenableFuture schedulerTimeoutMonitorFuture; + private final LlapRegistryService registry = new LlapRegistryService(false); private volatile ListenableFuture nodeEnablerFuture; @@ -200,6 +209,13 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock this.forceLocation = false; } + this.scheduleTimeout = HiveConf.getTimeVar(conf, + ConfVars.LLAP_DAEMON_SCHEDULER_TIMEOUT_SECONDS, TimeUnit.MILLISECONDS); + ExecutorService schedulerTimeoutThreadPool = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimeoutMonitor") + .build()); + schedulerTimeoutExecutor = MoreExecutors.listeningDecorator(schedulerTimeoutThreadPool); + int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); @@ -258,6 +274,18 @@ public void onFailure(Throwable t) { LOG.warn("SchedulerThread exited with error", t); } }); + schedulerTimeoutMonitorFuture = schedulerTimeoutExecutor.submit(schedulerTimeoutMonitor); + Futures.addCallback(schedulerTimeoutMonitorFuture, new FutureCallback() { + @Override + public void onSuccess(Void result) { + LOG.info("SchedulerTimeoutMonitorThread exited"); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("SchedulerTimeoutMonitorThread exited with error", t); + } + }); registry.start(); registry.registerStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); @@ -291,6 +319,16 @@ public void onRemove(final ServiceInstance serviceInstance) { // FIXME: disabling this for now // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity()); + // if there are no more nodes. Signal timeout monitor to start timer + if (activeInstances.size() == 0) { + try { + timeoutLock.lock(); + LOG.info("No node found. Signalling scheduler timeout monitor thread to start timer."); + schedulerTimeoutMonitor.startTimerAndSignal(); + } finally { + timeoutLock.unlock(); + } + } } } @@ -305,6 +343,12 @@ public void shutdown() { } nodeEnabledExecutor.shutdownNow(); + schedulerTimeoutMonitor.shutdown(); + if (schedulerTimeoutMonitorFuture != null) { + schedulerTimeoutMonitorFuture.cancel(true); + } + schedulerTimeoutExecutor.shutdownNow(); + schedulerCallable.shutdown(); if (schedulerFuture != null) { schedulerFuture.cancel(true); @@ -567,13 +611,6 @@ private SelectHostResult selectHost(TaskInfo request) { String[] requestedHosts = request.requestedHosts; readLock.lock(); // Read-lock. Not updating any stats at the moment. try { - // Check if any hosts are active. - if (getAvailableResources().getMemory() <= 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Refreshing instances since total memory is 0"); - } - } - // If there's no memory available, fail if (getTotalResources().getMemory() <= 0) { return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY; @@ -657,6 +694,16 @@ private void scanForNodeChanges() { private void addNode(ServiceInstance inst, NodeInfo node) { LOG.info("Adding node: " + inst); + // we have just added a new node. Signal timeout monitor to reset timer + if (activeInstances.size() == 1) { + try { + timeoutLock.lock(); + LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer."); + schedulerTimeoutMonitor.stopTimerAndSignal(); + } finally { + timeoutLock.unlock(); + } + } instanceToNodeMap.put(inst.getWorkerIdentity(), node); // Trigger scheduling since a new node became available. trySchedulingPendingTasks(); @@ -794,6 +841,16 @@ protected void schedulePendingTasks() { Iterator taskIter = taskListAtPriority.iterator(); boolean scheduledAllAtPriority = true; while (taskIter.hasNext()) { + if (activeInstances.size() == 0) { + try { + timeoutLock.lock(); + LOG.info("No daemons running before scheduling pending tasks." + + " Signalling scheduler timeout monitor thread to start timer."); + schedulerTimeoutMonitor.startTimerAndSignal(); + } finally { + timeoutLock.unlock(); + } + } // TODO Optimization: Add a check to see if there's any capacity available. No point in // walking through all active nodes, if they don't have potential capacity. @@ -1052,13 +1109,66 @@ public void shutdown() { private void trySchedulingPendingTasks() { scheduleLock.lock(); try { - pendingScheduleInvodations.set(true); + pendingScheduleInvocations.set(true); scheduleCondition.signal(); } finally { scheduleLock.unlock(); } } + private class SchedulerTimeoutMonitor implements Callable { + private AtomicBoolean isShutdown = new AtomicBoolean(false); + private AtomicBoolean startTimer = new AtomicBoolean(false); + + @Override + public Void call() throws Exception { + while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { + try { + timeoutLock.lock(); + + LOG.info("Scheduler timeout monitor waiting on no nodes condition.."); + // wait until signalled to start timed wait + noNodesCondition.await(); + + while (activeInstances.size() == 0 && startTimer.get()) { + LOG.info("No instances found. Scheduler timeout of " + scheduleTimeout + + " ms has started.. "); + final boolean notified = noNodesCondition.await(scheduleTimeout, TimeUnit.MILLISECONDS); + // to prevent spurious wake up and nodes being added in-between check again + if (!notified && activeInstances.size() == 0) { + LOG.info("Reporting SERVICE_UNAVAILABLE error as no instances are running"); + getContext().reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, + "No LLAP Daemons are running", getContext().getCurrentDagInfo()); + } + startTimer.set(false); + } + } finally { + timeoutLock.unlock(); + } + } + return null; + } + + // Call this first, then send in an interrupt to the thread. + public void shutdown() { + isShutdown.set(true); + } + + public void startTimerAndSignal() { + // signal only if previous state is false + if (startTimer.compareAndSet(false, true)) { + noNodesCondition.signal(); + } + } + + public void stopTimerAndSignal() { + // signal only if previous state is true + if (startTimer.compareAndSet(true, false)) { + noNodesCondition.signal(); + } + } + } + private class SchedulerCallable implements Callable { private AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -1067,7 +1177,7 @@ public Void call() { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { scheduleLock.lock(); try { - while (!pendingScheduleInvodations.get()) { + while (!pendingScheduleInvocations.get()) { scheduleCondition.await(); } } catch (InterruptedException e) { @@ -1086,7 +1196,7 @@ public Void call() { // will be handled in the next run. // A new request may come in right after this is set to false, but before the actual scheduling. // This will be handled in this run, but will cause an immediate run after, which is harmless. - pendingScheduleInvodations.set(false); + pendingScheduleInvocations.set(false); // Schedule outside of the scheduleLock - which should only be used to wait on the condition. schedulePendingTasks(); }