diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1e8a389..cce6dc6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4365,6 +4365,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If the query timeout is also set by thrift API call, the smaller one will be taken."), HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new SizeValidator(0L, true, 1024L, true), "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."), + HIVE_EXEC_INPUT_SUMMARY_MAX_THREADS("hive.exec.input.summary.max.threads", 1, new SizeValidator(1L, true, 1024L, true), + "Maximum number of threads that Hive uses to get file summaries from file systems"), HIVE_QUERY_REEXECUTION_ENABLED("hive.query.reexecution.enabled", true, "Enable query reexecutions"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/InputSummaryService.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/InputSummaryService.java new file mode 100644 index 0000000..fa1aad7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/InputSummaryService.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.HiveInterruptCallback; +import org.apache.hadoop.hive.common.HiveInterruptUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.InputEstimator; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A class responsible for fetching {@link ContentSummary} for {@link Path}s in + * an asynchronous manner from a single thread pool that all requests share. + */ +public final class InputSummaryService { + + private static final Logger LOG = + LoggerFactory.getLogger(InputSummaryService.class); + + private static final String CLASS_NAME = InputSummaryService.class.getName(); + + public static final InputSummaryService INSTANCE = new InputSummaryService(); + + @Deprecated + protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = + "mapred.dfsclient.parallelism.max"; + + private ExecutorService executorService; + + /** + * Private constructor. + */ + private InputSummaryService() { + HiveConf hiveConf = new HiveConf(); + int nThreads = getThreadCountForInputSummaryListing(hiveConf); + LOG.info("Using {} threads for fetching path input summary", nThreads); + executorService = + Executors.newFixedThreadPool(nThreads, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Get-Input-Summary-%d").build()); + } + + /** + * Calculate the total size of input files for a given {@link MapWork} plan. + * + * @param ctx the hadoop job context + * @param work map reduce job plan + * @param filter filter to apply to the input paths before calculating size + * @return the summary of all the input paths. + * @throws IOException + */ + public ContentSummary getInputSummary(final Context ctx, MapWork work, + PathFilter filter) throws IOException { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INPUT_SUMMARY); + + final long[] summary = {0L, 0L, 0L}; + + final Collection inputPaths = work.getPathToAliases().keySet(); + final List pathsNeedProcess = new ArrayList<>(256); + + // For each input path, calculate the total size. + for (final Path path : inputPaths) { + if (path != null && (filter == null || filter.accept(path))) { + ContentSummary cs = ctx.getCS(path); + if (cs == null) { + LOG.debug("No ContentSummary information loaded for: {}", path); + pathsNeedProcess.add(path); + } else { + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); + } + } + } + + // Process the case when NameNode call is needed + ContentSummary cs = + getInputSummaryWithPool(ctx, pathsNeedProcess, work, summary); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); + + return cs; + } + + @VisibleForTesting + ContentSummary getInputSummaryWithPool(final Context ctx, + final Collection pathsNeedProcess, MapWork work, + final long[] summary) throws IOException { + + HiveInterruptCallback interrup = + HiveInterruptUtils.add(new HiveInterruptCallback() { + @Override + public void interrupt() { + for (Path path : pathsNeedProcess) { + try { + path.getFileSystem(ctx.getConf()).close(); + } catch (IOException ignore) { + LOG.debug("Failed to close filesystem", ignore); + } + } + executorService.shutdownNow(); + } + }); + + // A mapping of each Path to its corresponding ContententSummary + List>> futures = + new ArrayList<>(pathsNeedProcess.size()); + + List> results = + new ArrayList<>(pathsNeedProcess.size()); + + try { + Configuration conf = ctx.getConf(); + JobConf jobConf = new JobConf(conf); + for (final Path path : pathsNeedProcess) { + // All threads share the same Configuration and JobConf based on the + // assumption that they are thread safe if only read operations are + // executed. It is not stated in Hadoop's javadoc, the sourcce codes + // clearly showed that they made efforts for it and we believe it is + // thread safe. Will revisit this piece of codes if we find the + // assumption is not correct. + Future> result = executorService + .submit(new InputSummaryFetcher(path, conf, jobConf, work)); + futures.add(result); + } + + LOG.debug("Waiting on all ContentSummary tasks to finish"); + for (final Future> future : futures) { + try { + results.add(future.get()); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.info("Error while waiting on input summary", ie); + } catch (ExecutionException ee) { + throw new IOException("Error while fetching input summary", ee); + } + } + + HiveInterruptUtils.checkInterrupted(); + + LOG.debug("Combining ContentSummary results"); + for (Map result : results) { + for (Map.Entry entry : result.entrySet()) { + String pathStr = entry.getKey(); + ContentSummary cs = entry.getValue(); + + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); + + LOG.info( + "Cache ContentSummary for {} length: {} file count: {} " + + " directory count: {}", + pathStr, cs.getLength(), cs.getFileCount(), + cs.getDirectoryCount()); + + ctx.addCS(pathStr, cs); + } + } + ContentSummary cs = new ContentSummary.Builder().length(summary[0]) + .fileCount(summary[1]).directoryCount(summary[2]).build(); + LOG.debug("Final ContentSummary: {}", cs); + return cs; + } finally { + HiveInterruptUtils.remove(interrup); + } + } + + /** + * The Callable responsible for fetching the {@link Path} + * {@link ContentSummary}. + */ + private static class InputSummaryFetcher + implements Callable> { + + private final Path path; + private final Configuration conf; + private final JobConf jobConf; + private final MapWork mapWork; + private final PartitionDesc partDesc; + + InputSummaryFetcher(final Path path, final Configuration conf, + final JobConf jobConf, final MapWork mapWork) { + this.path = path; + this.conf = conf; + this.jobConf = jobConf; + this.mapWork = mapWork; + this.partDesc = mapWork.getPathToPartitionInfo().get(path); + } + + @Override + @SuppressWarnings("rawtypes") + public Map call() throws Exception { + final String pathStr = path.toString(); + try { + Class inputFormatCls = + partDesc.getInputFileFormatClass(); + InputFormat inputFormatObj = + HiveInputFormat.getInputFormatFromCache(inputFormatCls, jobConf); + if (inputFormatObj instanceof ContentSummaryInputFormat) { + ContentSummaryInputFormat csif = + (ContentSummaryInputFormat) inputFormatObj; + ContentSummary cs = csif.getContentSummary(path, jobConf); + return Collections.singletonMap(pathStr, cs); + } + + String metaTableStorage = null; + if (partDesc.getTableDesc() != null + && partDesc.getTableDesc().getProperties() != null) { + metaTableStorage = partDesc.getTableDesc().getProperties() + .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, null); + } + if (partDesc.getProperties() != null) { + metaTableStorage = partDesc.getProperties().getProperty( + hive_metastoreConstants.META_TABLE_STORAGE, metaTableStorage); + } + + HiveStorageHandler handler = + HiveUtils.getStorageHandler(conf, metaTableStorage); + if (handler instanceof InputEstimator) { + long total = 0L; + TableDesc tableDesc = partDesc.getTableDesc(); + InputEstimator estimator = (InputEstimator) handler; + for (String alias : HiveFileFormatUtils + .doGetAliasesFromPath(mapWork.getPathToAliases(), path)) { + JobConf jobConfCpy = new JobConf(jobConf); + TableScanOperator scanOp = + (TableScanOperator) mapWork.getAliasToWork().get(alias); + Utilities.setColumnNameList(jobConfCpy, scanOp, true); + Utilities.setColumnTypeList(jobConfCpy, scanOp, true); + PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); + Utilities.copyTableJobPropertiesToConf(tableDesc, jobConfCpy); + total += + estimator.estimate(jobConfCpy, scanOp, -1).getTotalLength(); + } + return Collections.singletonMap(pathStr, new ContentSummary.Builder() + .length(total).fileCount(-1L).directoryCount(-1L).build()); + } else { + // todo: should nullify summary for non-native tables, + // not to be selected as a mapjoin target + FileSystem fs = path.getFileSystem(conf); + return Collections.singletonMap(pathStr, fs.getContentSummary(path)); + } + } catch (Exception e) { + // We safely ignore this exception for summary data. + // We do not update the cache to protect it from polluting other + // usages. The worst case is that IOException will always be + // retried for another getInputSummary(), which is fine as + // IOException is not considered as a common case. + LOG.debug("Cannot get size of {}. Safely ignored.", pathStr, e); + } + return Collections.emptyMap(); + } + + } + + /** + * Returns the maximum number of executors required to get file information. + * It checks both HIVE_EXEC_INPUT_SUMMARY_MAX_THREADS and + * DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX + * + * @param conf Configuration object to get the maximum number of threads. + * @return The maximum number of threads to use. + */ + @VisibleForTesting + int getThreadCountForInputSummaryListing(final Configuration conf) { + + int listingMaxThreads = + HiveConf.getIntVar(conf, ConfVars.HIVE_EXEC_INPUT_SUMMARY_MAX_THREADS); + + int depListingMaxThreads = + conf.getInt(DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, + ConfVars.HIVE_EXEC_INPUT_SUMMARY_MAX_THREADS.defaultIntVal); + + if (depListingMaxThreads > listingMaxThreads) { + // DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX must be removed on next + // Hive version (probably on 3.0). If the deprecated value is higher + // than HIVE_EXEC_INPUT_LISTING_MAX_THREADS, assume that is what the user + // intended to use. + LOG.warn("Deprecated configuration is used: {}. Please use {}", + DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, + ConfVars.HIVE_EXEC_INPUT_SUMMARY_MAX_THREADS.varname); + listingMaxThreads = depListingMaxThreads; + } + + return listingMaxThreads; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 80478ca..57302e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -65,7 +65,6 @@ import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.regex.Matcher; @@ -96,8 +95,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.HiveInterruptCallback; -import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; @@ -108,7 +105,6 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; @@ -126,10 +122,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -138,16 +132,12 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; -import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; -import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -160,7 +150,6 @@ import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.Adjacency; @@ -213,7 +202,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - /** * Utilities. * @@ -2357,216 +2345,6 @@ static int getMaxExecutorsForInputListing(final Configuration conf, int inputLoc return maxExecutors; } - /** - * Calculate the total size of input files. - * - * @param ctx - * the hadoop job context - * @param work - * map reduce job plan - * @param filter - * filter to apply to the input paths before calculating size - * @return the summary of all the input paths. - * @throws IOException - */ - public static ContentSummary getInputSummary(final Context ctx, MapWork work, PathFilter filter) - throws IOException { - PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INPUT_SUMMARY); - - long[] summary = {0, 0, 0}; - - final Set pathNeedProcess = new HashSet<>(); - - // Since multiple threads could call this method concurrently, locking - // this method will avoid number of threads out of control. - synchronized (INPUT_SUMMARY_LOCK) { - // For each input path, calculate the total size. - for (Path path : work.getPathToAliases().keySet()) { - Path p = path; - - if (filter != null && !filter.accept(p)) { - continue; - } - - ContentSummary cs = ctx.getCS(path); - if (cs == null) { - if (path == null) { - continue; - } - pathNeedProcess.add(path); - } else { - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); - } - } - - // Process the case when name node call is needed - final Map resultMap = new ConcurrentHashMap(); - final ExecutorService executor; - - int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); - if (numExecutors > 1) { - LOG.info("Using {} threads for getContentSummary", numExecutors); - executor = Executors.newFixedThreadPool(numExecutors, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Get-Input-Summary-%d").build()); - } else { - executor = null; - } - ContentSummary cs = getInputSummaryWithPool(ctx, pathNeedProcess, work, summary, executor); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); - return cs; - } - } - - @VisibleForTesting - static ContentSummary getInputSummaryWithPool(final Context ctx, Set pathNeedProcess, MapWork work, - long[] summary, ExecutorService executor) throws IOException { - List> results = new ArrayList>(); - final Map resultMap = new ConcurrentHashMap(); - - HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { - @Override - public void interrupt() { - for (Path path : pathNeedProcess) { - try { - path.getFileSystem(ctx.getConf()).close(); - } catch (IOException ignore) { - LOG.debug("Failed to close filesystem", ignore); - } - } - if (executor != null) { - executor.shutdownNow(); - } - } - }); - try { - Configuration conf = ctx.getConf(); - JobConf jobConf = new JobConf(conf); - for (Path path : pathNeedProcess) { - final Path p = path; - final String pathStr = path.toString(); - // All threads share the same Configuration and JobConf based on the - // assumption that they are thread safe if only read operations are - // executed. It is not stated in Hadoop's javadoc, the sourcce codes - // clearly showed that they made efforts for it and we believe it is - // thread safe. Will revisit this piece of codes if we find the assumption - // is not correct. - final Configuration myConf = conf; - final JobConf myJobConf = jobConf; - final Map> aliasToWork = work.getAliasToWork(); - final Map> pathToAlias = work.getPathToAliases(); - final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p); - Runnable r = new Runnable() { - @Override - public void run() { - try { - Class inputFormatCls = partDesc - .getInputFileFormatClass(); - InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( - inputFormatCls, myJobConf); - if (inputFormatObj instanceof ContentSummaryInputFormat) { - ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj; - resultMap.put(pathStr, cs.getContentSummary(p, myJobConf)); - return; - } - - String metaTableStorage = null; - if (partDesc.getTableDesc() != null && - partDesc.getTableDesc().getProperties() != null) { - metaTableStorage = partDesc.getTableDesc().getProperties() - .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, null); - } - if (partDesc.getProperties() != null) { - metaTableStorage = partDesc.getProperties() - .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, metaTableStorage); - } - - HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, metaTableStorage); - if (handler instanceof InputEstimator) { - long total = 0; - TableDesc tableDesc = partDesc.getTableDesc(); - InputEstimator estimator = (InputEstimator) handler; - for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) { - JobConf jobConf = new JobConf(myJobConf); - TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias); - Utilities.setColumnNameList(jobConf, scanOp, true); - Utilities.setColumnTypeList(jobConf, scanOp, true); - PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); - Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); - total += estimator.estimate(jobConf, scanOp, -1).getTotalLength(); - } - resultMap.put(pathStr, new ContentSummary(total, -1, -1)); - } else { - // todo: should nullify summary for non-native tables, - // not to be selected as a mapjoin target - FileSystem fs = p.getFileSystem(myConf); - resultMap.put(pathStr, fs.getContentSummary(p)); - } - } catch (Exception e) { - // We safely ignore this exception for summary data. - // We don't update the cache to protect it from polluting other - // usages. The worst case is that IOException will always be - // retried for another getInputSummary(), which is fine as - // IOException is not considered as a common case. - LOG.info("Cannot get size of {}. Safely ignored.", pathStr); - } - } - }; - - if (executor == null) { - r.run(); - } else { - Future result = executor.submit(r); - results.add(result); - } - } - - if (executor != null) { - for (Future result : results) { - boolean executorDone = false; - do { - try { - result.get(); - executorDone = true; - } catch (InterruptedException e) { - LOG.info("Interrupted when waiting threads: ", e); - Thread.currentThread().interrupt(); - break; - } catch (ExecutionException e) { - throw new IOException(e); - } - } while (!executorDone); - } - executor.shutdown(); - } - HiveInterruptUtils.checkInterrupted(); - for (Map.Entry entry : resultMap.entrySet()) { - ContentSummary cs = entry.getValue(); - - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); - - ctx.addCS(entry.getKey(), cs); - if (LOG.isInfoEnabled()) { - LOG.info("Cache Content Summary for {} length: {} file count: {} " + - " directory count: {}", entry.getKey(), cs.getLength(), - cs.getFileCount(), cs.getDirectoryCount()); - } - } - - return new ContentSummary(summary[0], summary[1], summary[2]); - } finally { - if (executor != null) { - executor.shutdownNow(); - } - HiveInterruptUtils.remove(interrup); - } - } - public static long sumOf(Map aliasToSize, Set aliases) { return sumOfExcept(aliasToSize, aliases, null); } @@ -3289,7 +3067,7 @@ public static String getVertexCounterName(String counter, String vertexName) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INPUT_PATHS); Set pathsProcessed = new HashSet(); - List pathsToAdd = new LinkedList(); + List pathsToAdd = new ArrayList(); LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState(); // AliasToWork contains all the aliases Collection aliasToWork = work.getAliasToWork().keySet(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index a71faf8..16891bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.InputSummaryService; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -101,7 +102,8 @@ public int execute(DriverContext driverContext) { conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) { if (inputSummary == null) { - inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null); + inputSummary = InputSummaryService.INSTANCE + .getInputSummary(driverContext.getCtx(), work.getMapWork(), null); } // set the values of totalInputFileSize and totalInputNumFiles, estimating them @@ -423,7 +425,8 @@ private void setNumberOfReducers() throws IOException { + reducers); } else { if (inputSummary == null) { - inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null); + inputSummary = InputSummaryService.INSTANCE + .getInputSummary(driverContext.getCtx(), work.getMapWork(), null); } int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), work.isFinalMapRed()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java index e7e9d55..7ee272f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java @@ -28,8 +28,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.InputSummaryService; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.Node; @@ -124,7 +124,8 @@ public long getTotalKnownInputSize(Context context, MapWork currWork, try { // go over all the input paths, and calculate a known total size, known // size for each input alias. - Utilities.getInputSummary(context, currWork, null).getLength(); + InputSummaryService.INSTANCE.getInputSummary(context, currWork, null) + .getLength(); // set alias to size mapping, this can be used to determine if one table // is chosen as big table, what's the total size of left tables, which diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index 9f4a201..7a3795c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.InputSummaryService; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -202,8 +203,8 @@ public boolean accept(Path file) { boolean hasNonLocalJob = false; for (ExecDriver mrtask : mrtasks) { try { - ContentSummary inputSummary = Utilities.getInputSummary - (ctx, mrtask.getWork().getMapWork(), p); + ContentSummary inputSummary = InputSummaryService.INSTANCE + .getInputSummary(ctx, mrtask.getWork().getMapWork(), p); int numReducers = getNumberOfReducers(mrtask.getWork(), conf); long estimatedInput; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestInputSummaryService.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestInputSummaryService.java new file mode 100644 index 0000000..062f112 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestInputSummaryService.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Properties; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.InputEstimator; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestInputSummaryService { + + @Test + public void testGetInputSummaryWithASingleThread() throws IOException { + final int NUM_PARTITIONS = 5; + final int BYTES_PER_FILE = 5; + + JobConf jobConf = new JobConf(); + Properties properties = new Properties(); + + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_SUMMARY_MAX_THREADS.varname, 0); + ContentSummary summary = runTestGetInputSummary(jobConf, properties, + NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); + assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); + assertEquals(NUM_PARTITIONS, summary.getFileCount()); + assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); + } + + @Test + public void testGetInputSummaryWithMultipleThreads() throws IOException { + final int NUM_PARTITIONS = 5; + final int BYTES_PER_FILE = 5; + + JobConf jobConf = new JobConf(); + Properties properties = new Properties(); + + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_SUMMARY_MAX_THREADS.varname, 2); + ContentSummary summary = runTestGetInputSummary(jobConf, properties, + NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); + assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); + assertEquals(NUM_PARTITIONS, summary.getFileCount()); + assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); + + // Test deprecated mapred.dfsclient.parallelism.max + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_SUMMARY_MAX_THREADS.varname, 0); + jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); + summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, + BYTES_PER_FILE, HiveInputFormat.class); + assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); + assertEquals(NUM_PARTITIONS, summary.getFileCount()); + assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); + } + + @Test + public void testGetInputSummaryWithInputEstimator() + throws IOException, HiveException { + final int NUM_PARTITIONS = 5; + final int BYTES_PER_FILE = 10; + final int NUM_OF_ROWS = 5; + + JobConf jobConf = new JobConf(); + Properties properties = new Properties(); + + jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); + + properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, + InputEstimatorTestClass.class.getName()); + InputEstimatorTestClass.setEstimation( + new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE)); + + /* + * Let's write more bytes to the files to test that Estimator is actually + * working returning the file size not from the filesystem + */ + ContentSummary summary = runTestGetInputSummary(jobConf, properties, + NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class); + assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); + assertEquals(NUM_PARTITIONS * -1, summary.getFileCount()); // Current + // getInputSummary() + // returns -1 for + // each file + // found + assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount()); // Current + // getInputSummary() + // returns + // -1 for + // each file + // found + + // Test deprecated mapred.dfsclient.parallelism.max + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_SUMMARY_MAX_THREADS.varname, 1); + + properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, + InputEstimatorTestClass.class.getName()); + InputEstimatorTestClass.setEstimation( + new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE)); + + /* + * Let's write more bytes to the files to test that Estimator is actually + * working returning the file size not from the filesystem + */ + summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, + BYTES_PER_FILE * 2, HiveInputFormat.class); + assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); + assertEquals(NUM_PARTITIONS * -1, summary.getFileCount()); // Current + // getInputSummary() + // returns -1 for + // each file + // found + assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount()); // Current + // getInputSummary() + // returns + // -1 for + // each file + // found + } + + static class ContentSummaryInputFormatTestClass extends FileInputFormat + implements ContentSummaryInputFormat { + private static ContentSummary summary = + new ContentSummary.Builder().build(); + + public static void setContentSummary(ContentSummary contentSummary) { + summary = contentSummary; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, + Reporter reporter) throws IOException { + return null; + } + + @Override + public ContentSummary getContentSummary(Path p, JobConf job) + throws IOException { + return summary; + } + } + + @Test + public void testGetInputSummaryWithContentSummaryInputFormat() + throws IOException { + final int NUM_PARTITIONS = 5; + final int BYTES_PER_FILE = 10; + + JobConf jobConf = new JobConf(); + Properties properties = new Properties(); + + jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); + + ContentSummaryInputFormatTestClass + .setContentSummary(new ContentSummary.Builder().length(BYTES_PER_FILE) + .fileCount(2).directoryCount(1).build()); + + /* + * Let's write more bytes to the files to test that + * ContentSummaryInputFormat is actually working returning the file size not + * from the filesystem + */ + ContentSummary summary = + runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, + BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class); + assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); + assertEquals(NUM_PARTITIONS * 2, summary.getFileCount()); + assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); + } + + private ContentSummary runTestGetInputSummary(JobConf jobConf, + Properties properties, int numOfPartitions, int bytesPerFile, + Class inputFormatClass) throws IOException { + // creates scratch directories needed by the Context object + SessionState.start(new HiveConf()); + + MapWork mapWork = new MapWork(); + Context context = new Context(jobConf); + LinkedHashMap pathToPartitionInfo = + new LinkedHashMap<>(); + LinkedHashMap> pathToAliasTable = + new LinkedHashMap<>(); + TableScanOperator scanOp = new TableScanOperator(); + + PartitionDesc partitionDesc = new PartitionDesc( + new TableDesc(inputFormatClass, null, properties), null); + + String testTableName = "testTable"; + + Path testTablePath = new Path(testTableName); + Path[] testPartitionsPaths = new Path[numOfPartitions]; + for (int i = 0; i < numOfPartitions; i++) { + String testPartitionName = "p=" + 1; + testPartitionsPaths[i] = new Path(testTablePath, "p=" + i); + + pathToPartitionInfo.put(testPartitionsPaths[i], partitionDesc); + + pathToAliasTable.put(testPartitionsPaths[i], + Lists.newArrayList(testPartitionName)); + + mapWork.getAliasToWork().put(testPartitionName, scanOp); + } + + mapWork.setPathToAliases(pathToAliasTable); + mapWork.setPathToPartitionInfo(pathToPartitionInfo); + + FileSystem fs = FileSystem.getLocal(jobConf); + try { + fs.mkdirs(testTablePath); + byte[] data = new byte[bytesPerFile]; + + for (int i = 0; i < numOfPartitions; i++) { + fs.mkdirs(testPartitionsPaths[i]); + FSDataOutputStream out = + fs.create(new Path(testPartitionsPaths[i], "test1.txt")); + out.write(data); + out.close(); + } + + return InputSummaryService.INSTANCE.getInputSummary(context, mapWork, + null); + } finally { + if (fs.exists(testTablePath)) { + fs.delete(testTablePath, true); + } + } + } + +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index fdc268c..b4ff22b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -38,12 +38,10 @@ import java.io.Serializable; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -51,19 +49,15 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.*; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -73,18 +67,12 @@ import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.RecordReader; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -541,46 +529,6 @@ private void runTestGetInputPaths(JobConf jobConf, int numOfPartitions) throws E } @Test - public void testGetInputSummaryPool() throws ExecutionException, InterruptedException, IOException { - ExecutorService pool = mock(ExecutorService.class); - when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); - - Set pathNeedProcess = new HashSet<>(); - pathNeedProcess.add(new Path("dummy-path1")); - pathNeedProcess.add(new Path("dummy-path2")); - pathNeedProcess.add(new Path("dummy-path3")); - - SessionState.start(new HiveConf()); - JobConf jobConf = new JobConf(); - Context context = new Context(jobConf); - - Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool); - verify(pool, times(3)).submit(any(Runnable.class)); - verify(pool).shutdown(); - verify(pool).shutdownNow(); - } - - @Test - public void testGetInputSummaryPoolAndFailure() throws ExecutionException, InterruptedException, IOException { - ExecutorService pool = mock(ExecutorService.class); - when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); - - Set pathNeedProcess = new HashSet<>(); - pathNeedProcess.add(new Path("dummy-path1")); - pathNeedProcess.add(new Path("dummy-path2")); - pathNeedProcess.add(new Path("dummy-path3")); - - SessionState.start(new HiveConf()); - JobConf jobConf = new JobConf(); - Context context = new Context(jobConf); - - Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool); - verify(pool, times(3)).submit(any(Runnable.class)); - verify(pool).shutdown(); - verify(pool).shutdownNow(); - } - - @Test public void testGetInputPathsPool() throws IOException, ExecutionException, InterruptedException { List pathsToAdd = new ArrayList<>(); Path path = new Path("dummy-path"); @@ -631,165 +579,6 @@ public void testGetInputPathsPoolAndFailure() throws IOException, ExecutionExcep verify(pool).shutdownNow(); } - @Test - public void testGetInputSummaryWithASingleThread() throws IOException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 5; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - } - - @Test - public void testGetInputSummaryWithMultipleThreads() throws IOException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 5; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2); - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - - // Test deprecated mapred.dfsclient.parallelism.max - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); - jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); - summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - } - - @Test - public void testGetInputSummaryWithInputEstimator() throws IOException, HiveException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 10; - final int NUM_OF_ROWS = 5; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); - - properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName()); - InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE)); - - /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */ - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS * -1, summary.getFileCount()); // Current getInputSummary() returns -1 for each file found - assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount()); // Current getInputSummary() returns -1 for each file found - - // Test deprecated mapred.dfsclient.parallelism.max - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2); - - properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName()); - InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE)); - - /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */ - summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS * -1, summary.getFileCount()); // Current getInputSummary() returns -1 for each file found - assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount()); // Current getInputSummary() returns -1 for each file found - } - - static class ContentSummaryInputFormatTestClass extends FileInputFormat implements ContentSummaryInputFormat { - private static ContentSummary summary = new ContentSummary.Builder().build(); - - public static void setContentSummary(ContentSummary contentSummary) { - summary = contentSummary; - } - - @Override - public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { - return null; - } - - @Override - public ContentSummary getContentSummary(Path p, JobConf job) throws IOException { - return summary; - } - } - - @Test - public void testGetInputSummaryWithContentSummaryInputFormat() throws IOException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 10; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); - - ContentSummaryInputFormatTestClass.setContentSummary( - new ContentSummary.Builder().length(BYTES_PER_FILE).fileCount(2).directoryCount(1).build()); - - /* Let's write more bytes to the files to test that ContentSummaryInputFormat is actually working returning the file size not from the filesystem */ - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS * 2, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - } - - private ContentSummary runTestGetInputSummary(JobConf jobConf, Properties properties, int numOfPartitions, int bytesPerFile, Class inputFormatClass) throws IOException { - // creates scratch directories needed by the Context object - SessionState.start(new HiveConf()); - - MapWork mapWork = new MapWork(); - Context context = new Context(jobConf); - LinkedHashMap pathToPartitionInfo = new LinkedHashMap<>(); - LinkedHashMap> pathToAliasTable = new LinkedHashMap<>(); - TableScanOperator scanOp = new TableScanOperator(); - - PartitionDesc partitionDesc = new PartitionDesc(new TableDesc(inputFormatClass, null, properties), null); - - String testTableName = "testTable"; - - Path testTablePath = new Path(testTableName); - Path[] testPartitionsPaths = new Path[numOfPartitions]; - for (int i=0; i getDependencyCollectionTask(){ return TaskFactory.get(new DependencyCollectionWork());