diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 0f8ff66..ad17144 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -711,7 +711,8 @@ public ServiceInstance getInstance(String name) { @Override public int size() { - return instancesCache.getCurrentData().size(); + // not using the path child cache here as there could be more than 1 path per host (worker and slot znodes) + return nodeToInstanceCache.size(); } } diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index cfcf0f0..c086b83 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -181,6 +182,7 @@ public int compare(Priority o1, Priority o2) { private final ScheduledExecutorService scheduledLoggingExecutor; private final SchedulerTimeoutMonitor timeoutMonitor; private ScheduledFuture timeoutFuture; + private AtomicReference> timeoutFutureRef; private final LlapRegistryService registry = new LlapRegistryService(false); @@ -242,6 +244,7 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimeoutMonitor") .build()); this.timeoutFuture = null; + this.timeoutFutureRef = new AtomicReference<>(null); this.scheduledLoggingExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimedLogThread") @@ -385,6 +388,7 @@ private void startTimeoutMonitor() { if ((timeoutFuture == null || (timeoutFuture != null && timeoutFuture.isDone())) && activeInstances.size() == 0) { timeoutFuture = timeoutExecutor.schedule(timeoutMonitor, timeout, TimeUnit.MILLISECONDS); + timeoutFutureRef.set(timeoutFuture); LOG.info("Scheduled timeout monitor task to run after {} ms", timeout); } else { LOG.info("Timeout monitor task not started. Timeout future state: {}, #instances: {}", @@ -399,6 +403,7 @@ private void stopTimeoutMonitor() { timeoutLock.lock(); try { if (timeoutFuture != null && activeInstances.size() != 0 && timeoutFuture.cancel(false)) { + timeoutFutureRef.set(null); LOG.info("Stopped timeout monitor task"); } else { LOG.info("Timeout monitor task not stopped. Timeout future state: {}, #instances: {}", @@ -910,7 +915,7 @@ private SelectHostResult randomSelection(final List nodesWithFreeSlots private void addNode(NodeInfo node, ServiceInstance serviceInstance) { // we have just added a new node. Signal timeout monitor to reset timer - if (activeInstances.size() == 1) { + if (activeInstances.size() != 0 && timeoutFutureRef.get() != null) { LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer."); stopTimeoutMonitor(); }