diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f5ad3a882b..069e5f9504 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -628,6 +628,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). \n" + "The user has to be aware that the dynamic partition value should not contain this value to avoid confusions."), DEFAULT_ZOOKEEPER_PARTITION_NAME("hive.lockmgr.zookeeper.default.partition.name", "__HIVE_DEFAULT_ZOOKEEPER_PARTITION__", ""), + HIVE_EXEC_SCHEDULED_POOL_NUM_THREADS("hive.exec.scheduler.num.threads", 10, + new RangeValidator(1, Integer.MAX_VALUE), "Number of threads to use commonly for scheduled works"), // Whether to show a link to the most failed task + debugging tips SHOW_JOB_FAIL_DEBUG_INFO("hive.exec.show.job.failure.debug.info", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java index 6bfccd6aec..4d01c58d0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java @@ -21,21 +21,18 @@ import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import static org.apache.commons.lang3.math.NumberUtils.INTEGER_ONE; import static org.apache.commons.lang3.math.NumberUtils.INTEGER_ZERO; public class SmallTableCache { @@ -90,7 +87,6 @@ public static MapJoinTableContainer get(String key, Callable cacheL1; private final Cache cacheL2; - private final ScheduledExecutorService cleanupService; SmallTableLocalCache() { this(Ticker.systemTicker()); @@ -98,14 +94,11 @@ public static MapJoinTableContainer get(String key, Callable { - cleanup(); - }, INTEGER_ZERO, MAINTENANCE_THREAD_CLEANUP_PERIOD, TimeUnit.SECONDS); + SchedulerThreadPool.getInstance().scheduleAtFixedRate( + this::cleanup, INTEGER_ZERO, MAINTENANCE_THREAD_CLEANUP_PERIOD, TimeUnit.SECONDS); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index e2f3a11ffa..40d2adcc0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -20,13 +20,12 @@ import java.io.IOException; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.collect.Sets; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,14 +177,14 @@ private void startTimeoutThread() { TimeUnit.MILLISECONDS); long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS); - ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor(); // Schedules a thread that does the following: iterates through all the active SparkSessions // and calls #triggerTimeout(long) on each one. If #triggerTimeout(long) returns true, then // the SparkSession is removed from the set of active sessions managed by this class. - timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream() - .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) - .forEach(createdSessions::remove), - 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); + timeoutFuture = SchedulerThreadPool.getInstance().scheduleAtFixedRate( + () -> createdSessions.stream() + .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) + .forEach(createdSessions::remove), + 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java index 14a688ee38..26d2d01a5c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java @@ -18,12 +18,9 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.slf4j.Logger; @@ -48,16 +45,12 @@ @Override public void run() { try { - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("PoolValidator %d").build(); - ScheduledExecutorService validatorExecutorService = Executors - .newScheduledThreadPool(sessionTriggerProviders.size(), threadFactory); for (Map.Entry entry : sessionTriggerProviders.entrySet()) { String poolName = entry.getKey(); if (!poolValidators.containsKey(poolName)) { LOG.info("Creating trigger validator for pool: {}", poolName); TriggerValidatorRunnable poolValidator = new TriggerValidatorRunnable(entry.getValue(), triggerActionHandler); - validatorExecutorService.scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs, + SchedulerThreadPool.getInstance().scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs, triggerValidationIntervalMs, TimeUnit.MILLISECONDS); poolValidators.put(poolName, poolValidator); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index d3748edb86..0b5f0bf2c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -20,8 +20,7 @@ import java.io.IOException; import java.net.URISyntaxException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -29,11 +28,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.tez.dag.api.TezException; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * TezSession that is aware of the session pool, and also keeps track of expiration and use. @@ -61,24 +60,23 @@ } public static abstract class AbstractTriggerValidator { - private ScheduledExecutorService scheduledExecutorService = null; + private ScheduledFuture triggerValidator; + abstract Runnable getTriggerValidatorRunnable(); void startTriggerValidator(long triggerValidationIntervalMs) { - if (scheduledExecutorService == null) { - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); + if (triggerValidator == null) { Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); - scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, - triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + triggerValidator = SchedulerThreadPool.getInstance().scheduleWithFixedDelay( + triggerValidatorRunnable, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); LOG.info("Started trigger validator with interval: {} ms", triggerValidationIntervalMs); } } void stopTriggerValidator() { - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - scheduledExecutorService = null; + if (triggerValidator != null) { + triggerValidator.cancel(true); LOG.info("Stopped trigger validator"); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index 4592f5ec34..6999f5d726 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -76,7 +77,6 @@ import org.joda.time.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.collect.ImmutableList; @@ -99,7 +99,7 @@ new ConcurrentHashMap>(); /* Whether the cache has been initialized or not. */ - private AtomicBoolean initialized = new AtomicBoolean(false); + private final AtomicBoolean initialized = new AtomicBoolean(false); private HiveMaterializedViewsRegistry() { } @@ -144,13 +144,10 @@ public void init(Hive db) { LOG.info("Using dummy materialized views registry"); } else { // We initialize the cache - long period = HiveConf.getTimeVar(db.getConf(), ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH, TimeUnit.SECONDS); - ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("HiveMaterializedViewsRegistry-%d") - .build()); - pool.scheduleAtFixedRate(new Loader(db), 0, period, TimeUnit.SECONDS); + long period = HiveConf.getTimeVar( + db.getConf(), ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH, TimeUnit.SECONDS); + SchedulerThreadPool.getInstance().scheduleAtFixedRate( + new Loader(db), 0, period, TimeUnit.SECONDS); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java index 010f00c7d3..06e2cd2cab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java @@ -20,10 +20,7 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,10 +29,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.util.ReflectionUtils; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +41,6 @@ private static NotificationEventPoll instance; Configuration conf; - ScheduledExecutorService executorService; List eventConsumers = new ArrayList<>(); ScheduledFuture pollFuture; long lastCheckedEventId; @@ -101,13 +96,7 @@ private NotificationEventPoll(Configuration conf) throws Exception { LOG.info("Initializing lastCheckedEventId to {}", lastCheckedEventId); // Start the scheduled poll task - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("NotificationEventPoll %d") - .build(); - executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - pollFuture = executorService.scheduleAtFixedRate(new Poller(), + pollFuture = SchedulerThreadPool.getInstance().scheduleAtFixedRate(new Poller(), pollInterval, pollInterval, TimeUnit.MILLISECONDS); } @@ -116,10 +105,6 @@ private void stop() { pollFuture.cancel(true); pollFuture = null; } - if (executorService != null) { - executorService.shutdown(); - executorService = null; - } } class Poller implements Runnable { diff --git ql/src/java/org/apache/hadoop/hive/ql/util/SchedulerThreadPool.java ql/src/java/org/apache/hadoop/hive/ql/util/SchedulerThreadPool.java new file mode 100644 index 0000000000..9dc5794490 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/util/SchedulerThreadPool.java @@ -0,0 +1,52 @@ +package org.apache.hadoop.hive.ql.util; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; + +/** + * Utility singleton class to manage all the scheduler threads. + */ +public class SchedulerThreadPool { + + private static volatile ScheduledExecutorService pool; + + /** + * Initialize the thread pool with configuration. + */ + public static void initialize(Configuration conf) { + if (pool == null) { + synchronized (SchedulerThreadPool.class) { + if (pool == null) { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Hive-exec Scheduled Worker %d").build(); + pool = Executors.newScheduledThreadPool(HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_EXEC_SCHEDULED_POOL_NUM_THREADS), threadFactory); + } + } + } + } + + public static ScheduledExecutorService getInstance() { + if (pool == null) { + // allow initialization with default pool size because of arbitrary hive-exec.jar usages + initialize(new HiveConf()); + } + return pool; + } + + public static void shutdown() { + if (pool != null) { + synchronized (SchedulerThreadPool.class) { + if (pool != null) { + pool.shutdown(); + pool = null; + } + } + } + } +} diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index d600f3a1fb..09fad714ad 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,7 +40,6 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.api.BackgroundCallback; @@ -86,6 +84,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.compactor.CompactorThread; import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.apache.hadoop.hive.ql.util.SchedulerThreadPool; import org.apache.hadoop.hive.registry.impl.ZookeeperUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; @@ -666,9 +665,14 @@ public String getServerHost() throws Exception { @Override public synchronized void start() { super.start(); + + HiveConf hiveConf = getHiveConf(); + + // Initialize the server-wide pool of scheduler workers + SchedulerThreadPool.initialize(hiveConf); + // If we're supporting dynamic service discovery, we'll add the service uri for this // HiveServer2 instance to Zookeeper as a znode. - HiveConf hiveConf = getHiveConf(); if (!serviceDiscovery || !activePassiveHA) { allowClientSessions(); } @@ -949,6 +953,8 @@ public synchronized void stop() { if (zKClientForPrivSync != null) { zKClientForPrivSync.close(); } + + SchedulerThreadPool.shutdown(); } private void shutdownExecutor(final ExecutorService leaderActionsExecutorService) { @@ -970,12 +976,7 @@ private void shutdownExecutor(final ExecutorService leaderActionsExecutorService @VisibleForTesting public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( - new BasicThreadFactory.Builder() - .namingPattern("cleardanglingscratchdir-%d") - .daemon(true) - .build()); - executor.scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, false, + SchedulerThreadPool.getInstance().scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, false, HiveConf.getVar(hiveConf, HiveConf.ConfVars.SCRATCHDIR), hiveConf), initialWaitInSec, HiveConf.getTimeVar(hiveConf, ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS);