diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 8100ece..fff4142 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -44,8 +44,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; @@ -66,10 +66,12 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LlapTaskSchedulerService extends TaskSchedulerService { - private static final Log LOG = LogFactory.getLog(LlapTaskSchedulerService.class); + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class); private final ExecutorService appCallbackExecutor; private final TaskSchedulerAppCallback appClientDelegate; @@ -208,7 +210,29 @@ public void serviceStart() throws IOException { writeLock.lock(); try { nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable); + Futures.addCallback(nodeEnablerFuture, new FutureCallback() { + @Override + public void onSuccess(Void result) { + LOG.info("NodeEnabledThread exited"); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("NodeEnabledThread exited with error", t); + } + }); schedulerFuture = schedulerExecutor.submit(schedulerCallable); + Futures.addCallback(schedulerFuture, new FutureCallback() { + @Override + public void onSuccess(Void result) { + LOG.info("SchedulerThread exited"); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("SchedulerThread exited with error", t); + } + }); registry.start(); activeInstances = registry.getInstances(); for (ServiceInstance inst : activeInstances.getAll().values()) { @@ -910,6 +934,7 @@ public boolean hadCommFailure() { public boolean canAcceptTask() { boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive() &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0)); + LOG.info("canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}", result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled, serviceInstance.isAlive()); return result; }