diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index d0f6451..a801151 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -18,12 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import com.esotericsoftware.kryo.Kryo; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.Expression; @@ -53,6 +47,7 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -65,17 +60,18 @@ import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -138,7 +134,6 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; -import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; @@ -211,9 +206,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import com.esotericsoftware.kryo.Kryo; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -2385,36 +2383,32 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INPUT_SUMMARY); - long[] summary = {0, 0, 0}; - + final long[] summary = {0L, 0L, 0L}; final Set pathNeedProcess = new HashSet<>(); // Since multiple threads could call this method concurrently, locking // this method will avoid number of threads out of control. synchronized (INPUT_SUMMARY_LOCK) { // For each input path, calculate the total size. - for (Path path : work.getPathToAliases().keySet()) { - Path p = path; - - if (filter != null && !filter.accept(p)) { + for (final Path path : work.getPathToAliases().keySet()) { + if (path == null) { + continue; + } + if (filter != null && !filter.accept(path)) { continue; } ContentSummary cs = ctx.getCS(path); - if (cs == null) { - if (path == null) { - continue; - } - pathNeedProcess.add(path); - } else { + if (cs != null) { summary[0] += cs.getLength(); summary[1] += cs.getFileCount(); summary[2] += cs.getDirectoryCount(); + } else { + pathNeedProcess.add(path); } } // Process the case when name node call is needed - final Map resultMap = new ConcurrentHashMap(); final ExecutorService executor; int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); @@ -2426,17 +2420,36 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa } else { executor = null; } - ContentSummary cs = getInputSummaryWithPool(ctx, pathNeedProcess, work, summary, executor); + getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess), + work, summary, executor); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); - return cs; } + return new ContentSummary.Builder().length(summary[0]) + .fileCount(summary[1]).directoryCount(summary[2]).build(); } + /** + * Performs a ContentSummary lookup over a set of paths using 1 or more + * threads. The 'summary' argument is directly modified. + * + * @param ctx + * @param pathNeedProcess + * @param work + * @param summary + * @param executor + * @throws IOException + */ @VisibleForTesting - static ContentSummary getInputSummaryWithPool(final Context ctx, Set pathNeedProcess, MapWork work, - long[] summary, ExecutorService executor) throws IOException { - List> results = new ArrayList>(); - final Map resultMap = new ConcurrentHashMap(); + static void getInputSummaryWithPool(final Context ctx, + final Set pathNeedProcess, final MapWork work, final long[] summary, + final ExecutorService executor) throws IOException { + Preconditions.checkNotNull(ctx); + Preconditions.checkNotNull(pathNeedProcess); + + List> futures = new ArrayList>(); + final AtomicLong totalLength = new AtomicLong(0L); + final AtomicLong totalFileCount = new AtomicLong(0L); + final AtomicLong totalDirectoryCount = new AtomicLong(0L); HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { @Override @@ -2456,20 +2469,18 @@ public void interrupt() { try { Configuration conf = ctx.getConf(); JobConf jobConf = new JobConf(conf); - for (Path path : pathNeedProcess) { - final Path p = path; - final String pathStr = path.toString(); + for (final Path path : pathNeedProcess) { // All threads share the same Configuration and JobConf based on the // assumption that they are thread safe if only read operations are // executed. It is not stated in Hadoop's javadoc, the sourcce codes // clearly showed that they made efforts for it and we believe it is - // thread safe. Will revisit this piece of codes if we find the assumption - // is not correct. + // thread safe. Will revisit this piece of codes if we find the + // assumption is not correct. final Configuration myConf = conf; final JobConf myJobConf = jobConf; final Map> aliasToWork = work.getAliasToWork(); final Map> pathToAlias = work.getPathToAliases(); - final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p); + final PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); Runnable r = new Runnable() { @Override public void run() { @@ -2479,11 +2490,11 @@ public void run() { InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( inputFormatCls, myJobConf); if (inputFormatObj instanceof ContentSummaryInputFormat) { - ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj; - resultMap.put(pathStr, cs.getContentSummary(p, myJobConf)); + ContentSummaryInputFormat csif = (ContentSummaryInputFormat) inputFormatObj; + final ContentSummary cs = csif.getContentSummary(path, myJobConf); + recordSummary(path, cs); return; } - String metaTableStorage = null; if (partDesc.getTableDesc() != null && partDesc.getTableDesc().getProperties() != null) { @@ -2500,7 +2511,8 @@ public void run() { long total = 0; TableDesc tableDesc = partDesc.getTableDesc(); InputEstimator estimator = (InputEstimator) handler; - for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) { + for (String alias : HiveFileFormatUtils + .doGetAliasesFromPath(pathToAlias, path)) { JobConf jobConf = new JobConf(myJobConf); TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias); Utilities.setColumnNameList(jobConf, scanOp, true); @@ -2509,12 +2521,12 @@ public void run() { Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); total += estimator.estimate(jobConf, scanOp, -1).getTotalLength(); } - resultMap.put(pathStr, new ContentSummary(total, -1, -1)); + recordSummary(path, new ContentSummary(total, -1, -1)); } else { // todo: should nullify summary for non-native tables, // not to be selected as a mapjoin target - FileSystem fs = p.getFileSystem(myConf); - resultMap.put(pathStr, fs.getContentSummary(p)); + FileSystem fs = path.getFileSystem(myConf); + recordSummary(path, fs.getContentSummary(path)); } } catch (Exception e) { // We safely ignore this exception for summary data. @@ -2522,28 +2534,45 @@ public void run() { // usages. The worst case is that IOException will always be // retried for another getInputSummary(), which is fine as // IOException is not considered as a common case. - LOG.info("Cannot get size of {}. Safely ignored.", pathStr); + LOG.info("Cannot get size of {}. Safely ignored.", path); } } + + private void recordSummary(final Path p, final ContentSummary cs) { + final long csLength = cs.getLength(); + final long csFileCount = cs.getFileCount(); + final long csDirectoryCount = cs.getDirectoryCount(); + + totalLength.addAndGet(csLength); + totalFileCount.addAndGet(csFileCount); + totalDirectoryCount.addAndGet(csDirectoryCount); + + ctx.addCS(p.toString(), cs); + + LOG.debug( + "Cache Content Summary for {} length: {} file count: {} " + + "directory count: {}", + path, csLength, csFileCount, csDirectoryCount); + } }; if (executor == null) { r.run(); } else { - Future result = executor.submit(r); - results.add(result); + Future future = executor.submit(r); + futures.add(future); } } if (executor != null) { - for (Future result : results) { + for (Future future : futures) { boolean executorDone = false; do { try { - result.get(); + future.get(); executorDone = true; } catch (InterruptedException e) { - LOG.info("Interrupted when waiting threads: ", e); + LOG.info("Interrupted when waiting threads", e); Thread.currentThread().interrupt(); break; } catch (ExecutionException e) { @@ -2554,22 +2583,10 @@ public void run() { executor.shutdown(); } HiveInterruptUtils.checkInterrupted(); - for (Map.Entry entry : resultMap.entrySet()) { - ContentSummary cs = entry.getValue(); - - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); - - ctx.addCS(entry.getKey(), cs); - if (LOG.isInfoEnabled()) { - LOG.info("Cache Content Summary for {} length: {} file count: {} " + - " directory count: {}", entry.getKey(), cs.getLength(), - cs.getFileCount(), cs.getDirectoryCount()); - } - } - return new ContentSummary(summary[0], summary[1], summary[2]); + summary[0] += totalLength.get(); + summary[1] += totalFileCount.get(); + summary[2] += totalDirectoryCount.get(); } finally { if (executor != null) { executor.shutdownNow();