diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a490ae67f4..14f885e5eb 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -746,6 +746,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). \n" + "The user has to be aware that the dynamic partition value should not contain this value to avoid confusions."), DEFAULT_ZOOKEEPER_PARTITION_NAME("hive.lockmgr.zookeeper.default.partition.name", "__HIVE_DEFAULT_ZOOKEEPER_PARTITION__", ""), + HIVE_EXEC_SCHEDULED_POOL_NUM_THREADS("hive.exec.scheduler.num.threads", 10, + new RangeValidator(1, Integer.MAX_VALUE), "Number of threads to use commonly for scheduled works"), // Whether to show a link to the most failed task + debugging tips SHOW_JOB_FAIL_DEBUG_INFO("hive.exec.show.job.failure.debug.info", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java index 6bfccd6aec..4d01c58d0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java @@ -21,21 +21,18 @@ import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import static org.apache.commons.lang3.math.NumberUtils.INTEGER_ONE; import static org.apache.commons.lang3.math.NumberUtils.INTEGER_ZERO; public class SmallTableCache { @@ -90,7 +87,6 @@ public static MapJoinTableContainer get(String key, Callable cacheL1; private final Cache cacheL2; - private final ScheduledExecutorService cleanupService; SmallTableLocalCache() { this(Ticker.systemTicker()); @@ -98,14 +94,11 @@ public static MapJoinTableContainer get(String key, Callable { - cleanup(); - }, INTEGER_ZERO, MAINTENANCE_THREAD_CLEANUP_PERIOD, TimeUnit.SECONDS); + SchedulerThreadPool.getInstance().scheduleAtFixedRate( + this::cleanup, INTEGER_ZERO, MAINTENANCE_THREAD_CLEANUP_PERIOD, TimeUnit.SECONDS); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index e2f3a11ffa..40d2adcc0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -20,13 +20,12 @@ import java.io.IOException; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.collect.Sets; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,14 +177,14 @@ private void startTimeoutThread() { TimeUnit.MILLISECONDS); long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS); - ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor(); // Schedules a thread that does the following: iterates through all the active SparkSessions // and calls #triggerTimeout(long) on each one. If #triggerTimeout(long) returns true, then // the SparkSession is removed from the set of active sessions managed by this class. - timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream() - .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) - .forEach(createdSessions::remove), - 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); + timeoutFuture = SchedulerThreadPool.getInstance().scheduleAtFixedRate( + () -> createdSessions.stream() + .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) + .forEach(createdSessions::remove), + 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java index 14a688ee38..26d2d01a5c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java @@ -18,12 +18,9 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.slf4j.Logger; @@ -48,16 +45,12 @@ @Override public void run() { try { - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("PoolValidator %d").build(); - ScheduledExecutorService validatorExecutorService = Executors - .newScheduledThreadPool(sessionTriggerProviders.size(), threadFactory); for (Map.Entry entry : sessionTriggerProviders.entrySet()) { String poolName = entry.getKey(); if (!poolValidators.containsKey(poolName)) { LOG.info("Creating trigger validator for pool: {}", poolName); TriggerValidatorRunnable poolValidator = new TriggerValidatorRunnable(entry.getValue(), triggerActionHandler); - validatorExecutorService.scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs, + SchedulerThreadPool.getInstance().scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs, triggerValidationIntervalMs, TimeUnit.MILLISECONDS); poolValidators.put(poolName, poolValidator); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index d3748edb86..0b5f0bf2c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -20,8 +20,7 @@ import java.io.IOException; import java.net.URISyntaxException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -29,11 +28,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.tez.dag.api.TezException; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * TezSession that is aware of the session pool, and also keeps track of expiration and use. @@ -61,24 +60,23 @@ } public static abstract class AbstractTriggerValidator { - private ScheduledExecutorService scheduledExecutorService = null; + private ScheduledFuture triggerValidator; + abstract Runnable getTriggerValidatorRunnable(); void startTriggerValidator(long triggerValidationIntervalMs) { - if (scheduledExecutorService == null) { - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); + if (triggerValidator == null) { Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); - scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, - triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + triggerValidator = SchedulerThreadPool.getInstance().scheduleWithFixedDelay( + triggerValidatorRunnable, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); LOG.info("Started trigger validator with interval: {} ms", triggerValidationIntervalMs); } } void stopTriggerValidator() { - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - scheduledExecutorService = null; + if (triggerValidator != null) { + triggerValidator.cancel(true); LOG.info("Stopped trigger validator"); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index 994b5d86cc..cdf7b82fec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -24,8 +24,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -67,6 +65,7 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -75,7 +74,6 @@ import org.joda.time.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.collect.ImmutableList; @@ -146,13 +144,10 @@ public void init(Hive db) { LOG.info("Using dummy materialized views registry"); } else { // We initialize the cache - long period = HiveConf.getTimeVar(db.getConf(), ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH, TimeUnit.SECONDS); - ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("HiveMaterializedViewsRegistry-%d") - .build()); - pool.scheduleAtFixedRate(new Loader(db), 0, period, TimeUnit.SECONDS); + long period = HiveConf.getTimeVar( + db.getConf(), ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH, TimeUnit.SECONDS); + SchedulerThreadPool.getInstance().scheduleAtFixedRate( + new Loader(db), 0, period, TimeUnit.SECONDS); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java index 010f00c7d3..06e2cd2cab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java @@ -20,10 +20,7 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,10 +29,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.util.ReflectionUtils; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +41,6 @@ private static NotificationEventPoll instance; Configuration conf; - ScheduledExecutorService executorService; List eventConsumers = new ArrayList<>(); ScheduledFuture pollFuture; long lastCheckedEventId; @@ -101,13 +96,7 @@ private NotificationEventPoll(Configuration conf) throws Exception { LOG.info("Initializing lastCheckedEventId to {}", lastCheckedEventId); // Start the scheduled poll task - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("NotificationEventPoll %d") - .build(); - executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - pollFuture = executorService.scheduleAtFixedRate(new Poller(), + pollFuture = SchedulerThreadPool.getInstance().scheduleAtFixedRate(new Poller(), pollInterval, pollInterval, TimeUnit.MILLISECONDS); } @@ -116,10 +105,6 @@ private void stop() { pollFuture.cancel(true); pollFuture = null; } - if (executorService != null) { - executorService.shutdown(); - executorService = null; - } } class Poller implements Runnable { diff --git ql/src/java/org/apache/hadoop/hive/ql/util/SchedulerThreadPool.java ql/src/java/org/apache/hadoop/hive/ql/util/SchedulerThreadPool.java new file mode 100644 index 0000000000..96ae7a1add --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/util/SchedulerThreadPool.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; + +/** + * Utility singleton class to manage all the scheduler threads. + */ +public class SchedulerThreadPool { + + private static volatile ScheduledExecutorService pool; + + /** + * Initialize the thread pool with configuration. + */ + public static void initialize(Configuration conf) { + if (pool == null) { + synchronized (SchedulerThreadPool.class) { + if (pool == null) { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Hive-exec Scheduled Worker %d").build(); + pool = Executors.newScheduledThreadPool(HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_EXEC_SCHEDULED_POOL_NUM_THREADS), threadFactory); + } + } + } + } + + public static ScheduledExecutorService getInstance() { + if (pool == null) { + // allow initialization with default pool size because of arbitrary hive-exec.jar usages + initialize(new HiveConf()); + } + return pool; + } + + public static void shutdown() { + if (pool != null) { + synchronized (SchedulerThreadPool.class) { + if (pool != null) { + pool.shutdown(); + pool = null; + } + } + } + } +} diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 11f8efdd20..11a53e6d78 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -27,12 +27,7 @@ import java.util.Date; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.LongAdder; import com.google.common.base.Predicate; @@ -50,6 +45,7 @@ import org.apache.hadoop.hive.ql.cleanup.CleanupService; import org.apache.hadoop.hive.ql.cleanup.EventualCleanupService; import org.apache.hadoop.hive.ql.hooks.HookUtils; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; @@ -98,6 +94,7 @@ private long checkInterval; private long sessionTimeout; private boolean checkOperation; + private ScheduledFuture timeoutCheckerTask; private volatile boolean shutdown; // The HiveServer2 instance running this service @@ -288,7 +285,7 @@ private void initOperationLogRootDir() { public synchronized void start() { super.start(); if (checkInterval > 0) { - startTimeoutChecker(); + startTimeoutChecker(Math.max(checkInterval, 3000L)); // minimum 3 seconds } } @@ -296,60 +293,15 @@ public CleanupService getCleanupService() { return cleanupService; } - private final Object timeoutCheckerLock = new Object(); - - private void startTimeoutChecker() { - final long interval = Math.max(checkInterval, 3000l); // minimum 3 seconds - final Runnable timeoutChecker = new Runnable() { - @Override - public void run() { - sleepFor(interval); - while (!shutdown) { - long current = System.currentTimeMillis(); - for (HiveSession session : new ArrayList(handleToSession.values())) { - if (shutdown) { - break; - } - if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current - && (!checkOperation || session.getNoOperationTime() > sessionTimeout)) { - SessionHandle handle = session.getSessionHandle(); - LOG.warn("Session " + handle + " is Timed-out (last access : " + - new Date(session.getLastAccessTime()) + ") and will be closed"); - try { - closeSession(handle); - } catch (HiveSQLException e) { - LOG.warn("Exception is thrown closing session " + handle, e); - } finally { - Metrics metrics = MetricsFactory.getInstance(); - if (metrics != null) { - metrics.incrementCounter(MetricsConstant.HS2_ABANDONED_SESSIONS); - } - } - } else { - session.closeExpiredOperations(); - } - } - sleepFor(interval); - } - } - - private void sleepFor(long interval) { - synchronized (timeoutCheckerLock) { - try { - timeoutCheckerLock.wait(interval); - } catch (InterruptedException e) { - // Ignore, and break. - } - } - } - }; - backgroundOperationPool.execute(timeoutChecker); + private void startTimeoutChecker(long interval) { + timeoutCheckerTask = SchedulerThreadPool.getInstance().scheduleAtFixedRate( + new SessionIdleTimeoutChecker(), interval, interval, TimeUnit.MILLISECONDS); } private void shutdownTimeoutChecker() { shutdown = true; - synchronized (timeoutCheckerLock) { - timeoutCheckerLock.notify(); + if (timeoutCheckerTask != null) { + timeoutCheckerTask.cancel(false); } } @@ -763,5 +715,36 @@ public void allowSessions(boolean b) { this.allowSessions = b; } } + + private class SessionIdleTimeoutChecker implements Runnable { + + @Override + public void run() { + long current = System.currentTimeMillis(); + for (HiveSession session : handleToSession.values()) { + if (shutdown) { + break; + } + if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current + && (!checkOperation || session.getNoOperationTime() > sessionTimeout)) { + SessionHandle handle = session.getSessionHandle(); + LOG.warn("Session " + handle + " is Timed-out (last access : " + + new Date(session.getLastAccessTime()) + ") and will be closed"); + try { + closeSession(handle); + } catch (HiveSQLException e) { + LOG.warn("Exception is thrown closing session " + handle, e); + } finally { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + metrics.incrementCounter(MetricsConstant.HS2_ABANDONED_SESSIONS); + } + } + } else { + session.closeExpiredOperations(); + } + } + } + } } diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 2bf25055ce..5ca75c9cbd 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,7 +40,6 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.api.BackgroundCallback; @@ -88,6 +86,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.compactor.CompactorThread; import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.hive.registry.impl.ZookeeperUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; @@ -670,9 +669,14 @@ public String getServerHost() throws Exception { @Override public synchronized void start() { super.start(); + + HiveConf hiveConf = getHiveConf(); + + // Initialize the server-wide pool of scheduler workers + SchedulerThreadPool.initialize(hiveConf); + // If we're supporting dynamic service discovery, we'll add the service uri for this // HiveServer2 instance to Zookeeper as a znode. - HiveConf hiveConf = getHiveConf(); if (!serviceDiscovery || !activePassiveHA) { allowClientSessions(); } @@ -955,6 +959,8 @@ public synchronized void stop() { if (zKClientForPrivSync != null) { zKClientForPrivSync.close(); } + + SchedulerThreadPool.shutdown(); } private void shutdownExecutor(final ExecutorService leaderActionsExecutorService) { @@ -976,12 +982,7 @@ private void shutdownExecutor(final ExecutorService leaderActionsExecutorService @VisibleForTesting public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( - new BasicThreadFactory.Builder() - .namingPattern("cleardanglingscratchdir-%d") - .daemon(true) - .build()); - executor.scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, false, + SchedulerThreadPool.getInstance().scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, false, HiveConf.getVar(hiveConf, HiveConf.ConfVars.SCRATCHDIR), hiveConf), initialWaitInSec, HiveConf.getTimeVar(hiveConf, ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS);