diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 291ca7d..6cb2acb 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3081,6 +3081,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4, "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" + "executed in parallel.", "llap.daemon.num.executors"), + LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4, + "Maximum number of threads to be used for AM reporter", "llap.daemon.am-reporter.max.threads"), LLAP_DAEMON_RPC_PORT("hive.llap.daemon.rpc.port", 0, "The LLAP daemon RPC port.", "llap.daemon.rpc.port. A value of 0 indicates a dynamic port"), LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 027d8eb..0502e01 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -40,6 +40,8 @@ import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -118,15 +120,23 @@ volatile ListenableFuture queueLookupFuture; private final DaemonId daemonId; - public AMReporter(AtomicReference localAddress, - QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) { + public AMReporter(int numExecutors, int maxThreads, AtomicReference + localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) { super(AMReporter.class.getName()); this.localAddress = localAddress; this.queryFailedHandler = queryFailedHandler; this.conf = conf; this.daemonId = daemonId; - ExecutorService rawExecutor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build()); + if (maxThreads < numExecutors) { + maxThreads = numExecutors; + LOG.warn("maxThreads={} is less than numExecutors={}. Setting maxThreads=numExecutors", + maxThreads, numExecutors); + } + ExecutorService rawExecutor = + new ThreadPoolExecutor(numExecutors, maxThreads, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build()); this.executor = MoreExecutors.listeningDecorator(rawExecutor); ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build()); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 519bfbd..cca6bc6 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -254,7 +254,9 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor LOG.info("Started LlapMetricsSystem with displayName: " + displayName + " sessionId: " + sessionId); - this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf, daemonId); + int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS); + this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, srvAddress, + new QueryFailedHandlerProxy(), daemonConf, daemonId); SecretManager sm = null; if (UserGroupInformation.isSecurityEnabled()) {