diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a7770b4e53..3a046f2a95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -258,7 +258,6 @@ @Deprecated protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max"; - private static final Object INPUT_SUMMARY_LOCK = new Object(); private static final Object ROOT_HDFS_DIR_LOCK = new Object(); @FunctionalInterface @@ -2342,55 +2341,55 @@ static int getMaxExecutorsForInputListing(final Configuration conf, int inputLoc * @return the summary of all the input paths. * @throws IOException */ - public static ContentSummary getInputSummary(final Context ctx, MapWork work, PathFilter filter) - throws IOException { + public static ContentSummary getInputSummary(final Context ctx, MapWork work, + PathFilter filter) throws IOException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INPUT_SUMMARY); - final long[] summary = {0L, 0L, 0L}; + final long[] summary = { 0L, 0L, 0L }; final Set pathNeedProcess = new HashSet<>(); - // Since multiple threads could call this method concurrently, locking - // this method will avoid number of threads out of control. - synchronized (INPUT_SUMMARY_LOCK) { - // For each input path, calculate the total size. - for (final Path path : work.getPathToAliases().keySet()) { - if (path == null) { - continue; - } - if (filter != null && !filter.accept(path)) { - continue; - } - - ContentSummary cs = ctx.getCS(path); - if (cs != null) { - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); - } else { - pathNeedProcess.add(path); - } + // For each input path, calculate the total size. + for (final Path path : work.getPathToAliases().keySet()) { + if (path == null) { + continue; + } + if (filter != null && !filter.accept(path)) { + continue; } - // Process the case when name node call is needed - final ExecutorService executor; - - int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); - if (numExecutors > 1) { - LOG.info("Using {} threads for getContentSummary", numExecutors); - executor = Executors.newFixedThreadPool(numExecutors, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Get-Input-Summary-%d").build()); + ContentSummary cs = ctx.getCS(path); + if (cs != null) { + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); } else { - LOG.info("Not using thread pool for getContentSummary"); - executor = MoreExecutors.newDirectExecutorService(); + pathNeedProcess.add(path); } - getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess), - work, summary, executor); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); } - return new ContentSummary.Builder().length(summary[0]) - .fileCount(summary[1]).directoryCount(summary[2]).build(); + + // Process the case when name node call is needed + final ExecutorService executor; + + int numExecutors = + getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); + if (numExecutors > 1) { + LOG.info("Using {} threads for getContentSummary", numExecutors); + executor = + Executors.newFixedThreadPool(numExecutors, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Get-Input-Summary-%d").build()); + } else { + LOG.info("Not using thread pool for getContentSummary"); + executor = MoreExecutors.newDirectExecutorService(); + } + + getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess), + work, summary, executor); + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); + + return new ContentSummary.Builder().length(summary[0]).fileCount(summary[1]) + .directoryCount(summary[2]).build(); } /**