diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 291ca7d..10b5ca3 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3081,6 +3081,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4, "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" + "executed in parallel.", "llap.daemon.num.executors"), + LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4, + "Maximum number of threads to be used for AM reporter. If this is lower than number of\n" + + "executors in llap daemon, it would be set to number of executors at runtime.", + "llap.daemon.am-reporter.max.threads"), LLAP_DAEMON_RPC_PORT("hive.llap.daemon.rpc.port", 0, "The LLAP daemon RPC port.", "llap.daemon.rpc.port. A value of 0 indicates a dynamic port"), LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 027d8eb..93237e6 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -40,6 +40,8 @@ import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -90,8 +92,6 @@ Ignore exceptions when communicating with the AM. At a later point, report back saying the AM is dead so that tasks can be removed from the running queue. - Use a cachedThreadPool so that a few AMs going down does not affect other AppMasters. - Race: When a task completes - it sends out it's message via the regular TaskReporter. The AM after this may run another DAG, or may die. This may need to be consolidated with the LlapTaskReporter. Try ensuring there's no race between the two. @@ -118,15 +118,23 @@ volatile ListenableFuture queueLookupFuture; private final DaemonId daemonId; - public AMReporter(AtomicReference localAddress, - QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) { + public AMReporter(int numExecutors, int maxThreads, AtomicReference + localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) { super(AMReporter.class.getName()); this.localAddress = localAddress; this.queryFailedHandler = queryFailedHandler; this.conf = conf; this.daemonId = daemonId; - ExecutorService rawExecutor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build()); + if (maxThreads < numExecutors) { + maxThreads = numExecutors; + LOG.warn("maxThreads={} is less than numExecutors={}. Setting maxThreads=numExecutors", + maxThreads, numExecutors); + } + ExecutorService rawExecutor = + new ThreadPoolExecutor(numExecutors, maxThreads, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build()); this.executor = MoreExecutors.listeningDecorator(rawExecutor); ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build()); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 519bfbd..cca6bc6 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -254,7 +254,9 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor LOG.info("Started LlapMetricsSystem with displayName: " + displayName + " sessionId: " + sessionId); - this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf, daemonId); + int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS); + this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, srvAddress, + new QueryFailedHandlerProxy(), daemonConf, daemonId); SecretManager sm = null; if (UserGroupInformation.isSecurityEnabled()) {