diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 1620ddf..7376792 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -19,6 +19,7 @@ import java.security.PrivilegedExceptionAction; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CancellationException; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; @@ -116,8 +117,12 @@ public void onSuccess(Void result) { @Override public void onFailure(Throwable t) { - LOG.error("AMReporter QueueDrainer exited with error", t); - System.exit(-1); + if (t instanceof CancellationException && isShutdown.get()) { + LOG.info("AMReporter QueueDrainer exited as a result of a cancellation after shutdown"); + } else { + LOG.error("AMReporter QueueDrainer exited with error", t); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t); + } } }); LOG.info("Started service: " + getName()); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2f2ccb0..46ec074 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -42,7 +42,7 @@ import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.log4j.NDC; @@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory; // TODO Convert this to a CompositeService -public class ContainerRunnerImpl extends AbstractService implements ContainerRunner, FragmentCompletionHandler { +public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler { // TODO Setup a set of threads to process incoming requests. // Make sure requests for a single dag/query are handled by the same thread @@ -88,10 +88,12 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi this.localAddress = localAddress; this.queryTracker = new QueryTracker(conf, localDirsBase); + addIfService(queryTracker); this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, enablePreemption); AuxiliaryServiceHelper.setServiceDataIntoEnv( TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(localShufflePort), localEnv); + addIfService(executorService); // 80% of memory considered for accounted buffers. Rest for objects. // TODO Tune this based on the available size. @@ -113,14 +115,14 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi ); } - public void serviceInit(Configuration conf) { - queryTracker.init(conf); + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); } @Override - public void serviceStart() { + public void serviceStart() throws Exception { // The node id will only be available at this point, since the server has been started in LlapDaemon - queryTracker.start(); + super.serviceStart(); LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort()); this.amReporter = new AMReporter(llapNodeId, conf); @@ -130,12 +132,11 @@ public void serviceStart() { @Override protected void serviceStop() throws Exception { + super.serviceStop(); if (amReporter != null) { amReporter.stop(); amReporter = null; } - queryTracker.stop(); - super.serviceStop(); } @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 067e213..5574483 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -38,18 +38,21 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.StringUtils; -import org.apache.log4j.Logger; +import org.apache.hive.common.util.ShutdownHookManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class LlapDaemon extends AbstractService implements ContainerRunner, LlapDaemonMXBean { +public class LlapDaemon extends CompositeService implements ContainerRunner, LlapDaemonMXBean { - private static final Logger LOG = Logger.getLogger(LlapDaemon.class); + private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class); private final Configuration shuffleHandlerConf; private final LlapDaemonProtocolServerImpl server; @@ -57,7 +60,7 @@ private final LlapRegistryService registry; private final LlapWebServices webServices; private final AtomicLong numSubmissions = new AtomicLong(0); - private JvmPauseMonitor pauseMonitor; + private final JvmPauseMonitor pauseMonitor; private final ObjectName llapDaemonInfoBean; private final LlapDaemonExecutorMetrics metrics; @@ -129,9 +132,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor // Less frequently set parameter, not passing in as a param. int numHandlers = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS, LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT); - this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, rpcPort); - // Initialize the metric system + // Initialize the metrics system LlapMetricsSystem.initialize("LlapDaemon"); this.pauseMonitor = new JvmPauseMonitor(daemonConf); pauseMonitor.start(); @@ -144,6 +146,9 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor LOG.info("Started LlapMetricsSystem with displayName: " + displayName + " sessionId: " + sessionId); + this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, rpcPort); + addIfService(server); + this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize, @@ -153,9 +158,12 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor address, executorMemoryBytes, metrics); + addIfService(containerRunner); this.registry = new LlapRegistryService(true); + addIfService(registry); this.webServices = new LlapWebServices(); + addIfService(webServices); } private void printAsciiArt() { @@ -173,34 +181,23 @@ private void printAsciiArt() { } @Override - public void serviceInit(Configuration conf) { - server.init(conf); - containerRunner.init(conf); - registry.init(conf); - webServices.init(conf); + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); LlapIoProxy.setDaemon(true); LlapIoProxy.initializeLlapIo(conf); } @Override public void serviceStart() throws Exception { + super.serviceStart(); ShuffleHandler.initializeAndStart(shuffleHandlerConf); - server.start(); - containerRunner.start(); - registry.start(); - registry.registerWorker(); - webServices.start(); } public void serviceStop() throws Exception { - // TODO Shutdown LlapIO + super.serviceStop(); shutdown(); - containerRunner.stop(); - server.stop(); - registry.unregisterWorker(); - registry.stop(); - webServices.stop(); ShuffleHandler.shutdown(); + LOG.info("LlapDaemon shutdown complete"); } public void shutdown() { @@ -221,6 +218,7 @@ public void shutdown() { } public static void main(String[] args) throws Exception { + Thread.setDefaultUncaughtExceptionHandler(new LlapDaemonUncaughtExceptionHandler()); LlapDaemon llapDaemon = null; try { // Cache settings will need to be setup in llap-daemon-site.xml - since the daemons don't read hive-site.xml @@ -246,6 +244,9 @@ public static void main(String[] args) throws Exception { cacheMemoryBytes, localDirs, rpcPort, shufflePort); + LOG.info("Adding shutdown hook for LlapDaemon"); + ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1); + llapDaemon.init(daemonConf); llapDaemon.start(); LOG.info("Started LlapDaemon"); @@ -332,5 +333,37 @@ public long getMaxJvmMemory() { return maxJvmMemory; } + private static class LlapDaemonUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.info("UncaughtExceptionHandler invoked"); + if(ShutdownHookManager.isShutdownInProgress()) { + LOG.warn("Thread {} threw a Throwable, but we are shutting down, so ignoring this", t, e); + } else if(e instanceof Error) { + try { + LOG.error("Thread {} threw an Error. Shutting down now...", t, e); + } catch (Throwable err) { + //We don't want to not exit because of an issue with logging + } + if(e instanceof OutOfMemoryError) { + //After catching an OOM java says it is undefined behavior, so don't + //even try to clean up or we can get stuck on shutdown. + try { + System.err.println("Halting due to Out Of Memory Error..."); + e.printStackTrace(); + } catch (Throwable err) { + //Again we done want to exit because of logging issues. + } + ExitUtil.halt(-1); + } else { + ExitUtil.terminate(-1); + } + } else { + LOG.error("Thread {} threw an Exception. Shutting down now...", t, e); + ExitUtil.terminate(-1); + } + } + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index b3e5f74..ff7fb29 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -33,6 +33,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; +import org.apache.hadoop.service.AbstractService; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import org.slf4j.Logger; @@ -65,7 +66,7 @@ * Task executor service can be shut down which will terminated all running tasks and reject all * new tasks. Shutting down of the task executor service can be done gracefully or immediately. */ -public class TaskExecutorService implements Scheduler { +public class TaskExecutorService extends AbstractService implements Scheduler { private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); @@ -94,12 +95,14 @@ private final Object lock = new Object(); public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePreemption) { + super(TaskExecutorService.class.getSimpleName()); this.waitQueue = new EvictingPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize); this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size numExecutors, // max pool size 1, TimeUnit.MINUTES, new SynchronousQueue(), // direct hand-off - new ThreadFactoryBuilder().setNameFormat(TASK_EXECUTOR_THREAD_NAME_FORMAT).build()); + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(TASK_EXECUTOR_THREAD_NAME_FORMAT) + .build()); this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); this.preemptionQueue = new PriorityBlockingQueue<>(numExecutors, new PreemptionQueueComparator()); @@ -124,6 +127,11 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePr + ", enablePreemption=" + enablePreemption); } + @Override + public void serviceStop() { + shutDown(false); + } + /** * Worker that takes tasks from wait queue and schedule it for execution. @@ -149,7 +157,6 @@ public void run() { } continue; } - // if the task cannot finish and if no slots are available then don't schedule it. boolean shouldWait = false; if (task.getTaskRunnerCallable().canFinish()) { @@ -216,6 +223,7 @@ public void onSuccess(Object result) { @Override public void onFailure(Throwable t) { LOG.error("Wait queue scheduler worker exited with failure!", t); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t); } } @@ -443,7 +451,6 @@ private void updatePreemptionListAndNotify(EndReason reason) { } - // TODO: llap daemon should call this to gracefully shutdown the task executor service public void shutDown(boolean awaitTermination) { if (!isShutdown.getAndSet(true)) { if (awaitTermination) { @@ -461,6 +468,7 @@ public void shutDown(boolean awaitTermination) { } executorService.shutdownNow(); waitQueueExecutorService.shutdownNow(); + executionCompletionExecutorService.shutdownNow(); } } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java index f727e09..d3647d0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java @@ -51,24 +51,30 @@ public void serviceStart() throws Exception { if (this.registry != null) { this.registry.start(); } + if (isDaemon) { + registerWorker(); + } } @Override public void serviceStop() throws Exception { + if (isDaemon) { + unregisterWorker(); + } if (this.registry != null) { - this.registry.start(); + this.registry.stop(); } else { LOG.warn("Stopping non-existent registry service"); } } - public void registerWorker() throws IOException { + private void registerWorker() throws IOException { if (this.registry != null) { this.registry.register(); } } - public void unregisterWorker() throws IOException { + private void unregisterWorker() throws IOException { if (this.registry != null) { this.registry.unregister(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 761aefe..40daeec 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -110,7 +110,7 @@ private LlapIoImpl(Configuration conf) throws IOException { // Arbitrary thread pool. Listening is used for unhandled errors for now (TODO: remove?) int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE); executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").build())); + new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build())); // TODO: this should depends on input format and be in a map, or something. this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf, cacheMetrics, @@ -148,5 +148,6 @@ public void close() { MBeans.unregister(buddyAllocatorMXBean); buddyAllocatorMXBean = null; } + executor.shutdownNow(); } }