diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 36bc08f..b3db9b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -253,7 +253,6 @@ public static Random randGen = new Random(); - private static final Object INPUT_SUMMARY_LOCK = new Object(); private static final Object ROOT_HDFS_DIR_LOCK = new Object(); @FunctionalInterface @@ -2454,45 +2453,42 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa final long[] summary = {0L, 0L, 0L}; final Set pathNeedProcess = new HashSet<>(); - // Since multiple threads could call this method concurrently, locking - // this method will avoid number of threads out of control. - synchronized (INPUT_SUMMARY_LOCK) { - // For each input path, calculate the total size. - for (final Path path : work.getPathToAliases().keySet()) { - if (path == null) { - continue; - } - if (filter != null && !filter.accept(path)) { - continue; - } - - ContentSummary cs = ctx.getCS(path); - if (cs != null) { - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); - } else { - pathNeedProcess.add(path); - } + // For each input path, calculate the total size. + for (final Path path : work.getPathToAliases().keySet()) { + if (path == null) { + continue; + } + if (filter != null && !filter.accept(path)) { + continue; } - // Process the case when name node call is needed - final ExecutorService executor; - - int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); - if (numExecutors > 1) { - LOG.info("Using {} threads for getContentSummary", numExecutors); - executor = Executors.newFixedThreadPool(numExecutors, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Get-Input-Summary-%d").build()); + ContentSummary cs = ctx.getCS(path); + if (cs != null) { + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); } else { - LOG.info("Not using thread pool for getContentSummary"); - executor = MoreExecutors.newDirectExecutorService(); + pathNeedProcess.add(path); } - getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess), - work, summary, executor); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); } + + // Process the case when name node call is needed + final ExecutorService executor; + + int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); + if (numExecutors > 1) { + LOG.info("Using {} threads for getContentSummary", numExecutors); + executor = Executors.newFixedThreadPool(numExecutors, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Get-Input-Summary-%d").build()); + } else { + LOG.info("Not using thread pool for getContentSummary"); + executor = MoreExecutors.newDirectExecutorService(); + } + getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess), + work, summary, executor); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); + return new ContentSummary.Builder().length(summary[0]) .fileCount(summary[1]).directoryCount(summary[2]).build(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZNodeCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZNodeCleaner.java new file mode 100644 index 0000000..1caff83 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZNodeCleaner.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr.zookeeper; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Task which deletes ZNodes for Hive locking which have not been used in a long + * time. + */ +public class ZNodeCleaner extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(ZNodeCleaner.class); + private static final long MAX_BACKOFF = TimeUnit.MINUTES.toMillis(60L); + private final CuratorFramework curatorFramework; + private final String nameSpace; + + public ZNodeCleaner(final CuratorFramework curatorFramework, + final String nameSpace) { + super("ZNodeCleaner"); + this.curatorFramework = curatorFramework; + this.nameSpace = nameSpace; + + super.setDaemon(true); + super.setPriority(Thread.MIN_PRIORITY); + } + + @Override + public void run() { + while (true) { + try { + Thread.sleep(TimeUnit.DAYS.toMillis(1L)); + + // There may be other instances of this cleaner running in the cluster. + // They should not all be running at the same time, so add some random + Thread.sleep(ThreadLocalRandom.current().nextLong(MAX_BACKOFF)); + + LOG.info("Running ZNode cleaner"); + cleanNodes(ZKPaths.makePath(this.nameSpace, "")); + } catch (InterruptedException e) { + return; + } catch (Exception e) { + LOG.warn("Failed to clean all nodes. Will try again later.", e); + } + } + } + + private void cleanNodes(final String currentPath) throws Exception { + final List childNodeNames = + this.curatorFramework.getChildren().forPath(currentPath); + + for (final String childNodeName : childNodeNames) { + cleanNodes(ZKPaths.makePath(currentPath, childNodeName)); + } + + final Stat stat = this.curatorFramework.checkExists().forPath(currentPath); + if (stat != null && stat.getNumChildren() == 0) { + final long age = System.currentTimeMillis() - stat.getMtime(); + if (age >= TimeUnit.DAYS.toMillis(14L)) { + this.curatorFramework.delete().forPath(currentPath); + } + } + } +}