diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 8937b43..6df3072 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -18,12 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import com.esotericsoftware.kryo.Kryo; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.Expression; @@ -53,6 +47,7 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -65,17 +60,18 @@ import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -138,7 +134,6 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; -import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; @@ -211,9 +206,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import com.esotericsoftware.kryo.Kryo; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -2388,36 +2386,32 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INPUT_SUMMARY); - long[] summary = {0, 0, 0}; - + final long[] summary = {0L, 0L, 0L}; final Set pathNeedProcess = new HashSet<>(); // Since multiple threads could call this method concurrently, locking // this method will avoid number of threads out of control. synchronized (INPUT_SUMMARY_LOCK) { // For each input path, calculate the total size. - for (Path path : work.getPathToAliases().keySet()) { - Path p = path; - - if (filter != null && !filter.accept(p)) { + for (final Path path : work.getPathToAliases().keySet()) { + if (path == null) { + continue; + } + if (filter != null && !filter.accept(path)) { continue; } ContentSummary cs = ctx.getCS(path); - if (cs == null) { - if (path == null) { - continue; - } - pathNeedProcess.add(path); - } else { + if (cs != null) { summary[0] += cs.getLength(); summary[1] += cs.getFileCount(); summary[2] += cs.getDirectoryCount(); + } else { + pathNeedProcess.add(path); } } // Process the case when name node call is needed - final Map resultMap = new ConcurrentHashMap(); final ExecutorService executor; int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); @@ -2429,17 +2423,36 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa } else { executor = null; } - ContentSummary cs = getInputSummaryWithPool(ctx, pathNeedProcess, work, summary, executor); + getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess), + work, summary, executor); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); - return cs; } + return new ContentSummary.Builder().length(summary[0]) + .fileCount(summary[1]).directoryCount(summary[2]).build(); } + /** + * Performs a ContentSummary lookup over a set of paths using 1 or more + * threads. The 'summary' argument is directly modified. + * + * @param ctx + * @param pathNeedProcess + * @param work + * @param summary + * @param executor + * @throws IOException + */ @VisibleForTesting - static ContentSummary getInputSummaryWithPool(final Context ctx, Set pathNeedProcess, MapWork work, - long[] summary, ExecutorService executor) throws IOException { - List> results = new ArrayList>(); - final Map resultMap = new ConcurrentHashMap(); + static void getInputSummaryWithPool(final Context ctx, + final Set pathNeedProcess, final MapWork work, final long[] summary, + final ExecutorService executor) throws IOException { + Preconditions.checkNotNull(ctx); + Preconditions.checkNotNull(pathNeedProcess); + + List> futures = new ArrayList>(pathNeedProcess.size()); + final AtomicLong totalLength = new AtomicLong(0L); + final AtomicLong totalFileCount = new AtomicLong(0L); + final AtomicLong totalDirectoryCount = new AtomicLong(0L); HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { @Override @@ -2459,9 +2472,7 @@ public void interrupt() { try { Configuration conf = ctx.getConf(); JobConf jobConf = new JobConf(conf); - for (Path path : pathNeedProcess) { - final Path p = path; - final String pathStr = path.toString(); + for (final Path path : pathNeedProcess) { // All threads share the same Configuration and JobConf based on the // assumption that they are thread safe if only read operations are // executed. It is not stated in Hadoop's javadoc, the sourcce codes @@ -2472,7 +2483,7 @@ public void interrupt() { final JobConf myJobConf = jobConf; final Map> aliasToWork = work.getAliasToWork(); final Map> pathToAlias = work.getPathToAliases(); - final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p); + final PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); Runnable r = new Runnable() { @Override public void run() { @@ -2482,11 +2493,11 @@ public void run() { InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( inputFormatCls, myJobConf); if (inputFormatObj instanceof ContentSummaryInputFormat) { - ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj; - resultMap.put(pathStr, cs.getContentSummary(p, myJobConf)); + ContentSummaryInputFormat csif = (ContentSummaryInputFormat) inputFormatObj; + final ContentSummary cs = csif.getContentSummary(path, myJobConf); + recordSummary(path, cs); return; } - String metaTableStorage = null; if (partDesc.getTableDesc() != null && partDesc.getTableDesc().getProperties() != null) { @@ -2503,7 +2514,7 @@ public void run() { long total = 0; TableDesc tableDesc = partDesc.getTableDesc(); InputEstimator estimator = (InputEstimator) handler; - for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) { + for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, path)) { JobConf jobConf = new JobConf(myJobConf); TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias); Utilities.setColumnNameList(jobConf, scanOp, true); @@ -2512,12 +2523,12 @@ public void run() { Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); total += estimator.estimate(jobConf, scanOp, -1).getTotalLength(); } - resultMap.put(pathStr, new ContentSummary(total, -1, -1)); + recordSummary(path, new ContentSummary(total, -1, -1)); } else { // todo: should nullify summary for non-native tables, // not to be selected as a mapjoin target - FileSystem fs = p.getFileSystem(myConf); - resultMap.put(pathStr, fs.getContentSummary(p)); + FileSystem fs = path.getFileSystem(myConf); + recordSummary(path, fs.getContentSummary(path)); } } catch (Exception e) { // We safely ignore this exception for summary data. @@ -2525,28 +2536,46 @@ public void run() { // usages. The worst case is that IOException will always be // retried for another getInputSummary(), which is fine as // IOException is not considered as a common case. - LOG.info("Cannot get size of {}. Safely ignored.", pathStr); + LOG.info("Cannot get size of {}. Safely ignored.", path); + LOG.debug("Cannot get size of {}. Safely ignored.", path, e); } } + + private void recordSummary(final Path p, final ContentSummary cs) { + final long csLength = cs.getLength(); + final long csFileCount = cs.getFileCount(); + final long csDirectoryCount = cs.getDirectoryCount(); + + totalLength.addAndGet(csLength); + totalFileCount.addAndGet(csFileCount); + totalDirectoryCount.addAndGet(csDirectoryCount); + + ctx.addCS(p.toString(), cs); + + LOG.debug( + "Cache Content Summary for {} length: {} file count: {} " + + "directory count: {}", + path, csLength, csFileCount, csDirectoryCount); + } }; if (executor == null) { r.run(); } else { - Future result = executor.submit(r); - results.add(result); + Future future = executor.submit(r); + futures.add(future); } } if (executor != null) { - for (Future result : results) { + for (Future future : futures) { boolean executorDone = false; do { try { - result.get(); + future.get(); executorDone = true; } catch (InterruptedException e) { - LOG.info("Interrupted when waiting threads: ", e); + LOG.info("Interrupted when waiting threads", e); Thread.currentThread().interrupt(); break; } catch (ExecutionException e) { @@ -2557,22 +2586,10 @@ public void run() { executor.shutdown(); } HiveInterruptUtils.checkInterrupted(); - for (Map.Entry entry : resultMap.entrySet()) { - ContentSummary cs = entry.getValue(); - - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); - - ctx.addCS(entry.getKey(), cs); - if (LOG.isInfoEnabled()) { - LOG.info("Cache Content Summary for {} length: {} file count: {} " + - " directory count: {}", entry.getKey(), cs.getLength(), - cs.getFileCount(), cs.getDirectoryCount()); - } - } - return new ContentSummary(summary[0], summary[1], summary[2]); + summary[0] += totalLength.get(); + summary[1] += totalFileCount.get(); + summary[2] += totalDirectoryCount.get(); } finally { if (executor != null) { executor.shutdownNow(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java new file mode 100644 index 0000000..77d6a81 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.InputEstimator; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestGetInputSummary { + + private static final String TEST_TABLE_NAME = "testTable"; + private static final Path TEST_TABLE_PATH = new Path(TEST_TABLE_NAME); + + private JobConf jobConf; + private Properties properties; + + @Before + public void setup() throws Exception { + // creates scratch directories needed by the Context object + SessionState.start(new HiveConf()); + + this.jobConf = new JobConf(); + this.properties = new Properties(); + + final FileSystem fs = FileSystem.getLocal(jobConf); + fs.delete(TEST_TABLE_PATH, true); + fs.mkdirs(TEST_TABLE_PATH); + } + + @After + public void teardown() throws Exception { + final FileSystem fs = FileSystem.getLocal(jobConf); + fs.delete(TEST_TABLE_PATH, true); + } + + @Test + public void testGetInputSummaryPoolWithCache() throws Exception { + final int BYTES_PER_FILE = 5; + + final Collection testPaths = Arrays.asList(new Path("p1/test.txt"), + new Path("p2/test.txt"), new Path("p3/test.txt"), + new Path("p4/test.txt"), new Path("p5/test.txt")); + + ContentSummary cs = new ContentSummary.Builder().directoryCount(10L) + .fileCount(10L).length(10L).build(); + + Map cache = new LinkedHashMap<>(); + cache.put(new Path("p2"), cs); + + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); + + ContentSummary summary = runTestGetInputSummary(jobConf, properties, + testPaths, BYTES_PER_FILE, HiveInputFormat.class, cache); + + // The partition paths all contain a single file with 5 bytes of length, + // however, one entry was added to the cache which specifies that the + // partition has 10 directories and 10 files and these values should + // override the real values since the cache is consulted before looking at + // the actual file system. + final long expectedLength = ((testPaths.size() - 1) * BYTES_PER_FILE) + 10L; + final long expectedFileCount = (testPaths.size() - 1) + 10L; + final long expectedDirCount = (testPaths.size() - 1) + 10L; + + assertEquals(expectedLength, summary.getLength()); + assertEquals(expectedFileCount, summary.getFileCount()); + assertEquals(expectedDirCount, summary.getDirectoryCount()); + } + + @Test + @SuppressWarnings("deprecation") + public void testGetInputSummaryWithMultipleThreads() throws IOException { + final int BYTES_PER_FILE = 5; + + final Collection testPaths = Arrays.asList(new Path("p1/test.txt"), + new Path("p2/test.txt"), new Path("p3/test.txt"), + new Path("p4/test.txt"), new Path("p5/test.txt")); + + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2); + ContentSummary summary = + runTestGetInputSummary(jobConf, properties, testPaths, BYTES_PER_FILE, + HiveInputFormat.class, Collections.emptyMap()); + assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength()); + assertEquals(testPaths.size(), summary.getFileCount()); + assertEquals(testPaths.size(), summary.getDirectoryCount()); + + // Test deprecated mapred.dfsclient.parallelism.max + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); + jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); + summary = runTestGetInputSummary(jobConf, properties, testPaths, + BYTES_PER_FILE, HiveInputFormat.class, Collections.emptyMap()); + assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength()); + assertEquals(testPaths.size(), summary.getFileCount()); + assertEquals(testPaths.size(), summary.getDirectoryCount()); + } + + @Test + public void testGetInputSummaryWithInputEstimator() + throws IOException, HiveException { + final int BYTES_PER_FILE = 10; + final int NUM_OF_ROWS = 5; + + final Collection testPaths = Arrays.asList(new Path("p1/test.txt"), + new Path("p2/test.txt"), new Path("p3/test.txt"), + new Path("p4/test.txt"), new Path("p5/test.txt")); + + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2); + + properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, + InputEstimatorTestClass.class.getName()); + InputEstimatorTestClass.setEstimation( + new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE)); + + /* + * Let's write more bytes to the files to test that Estimator is actually + * working returning the file size not from the filesystem + */ + ContentSummary summary = + runTestGetInputSummary(jobConf, properties, testPaths, + BYTES_PER_FILE * 2, HiveInputFormat.class, Collections.emptyMap()); + assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength()); + + // Current getInputSummary() returns -1 for each file found + assertEquals(testPaths.size() * -1, summary.getFileCount()); + + // Current getInputSummary() returns -1 for each file found + assertEquals(testPaths.size() * -1, summary.getDirectoryCount()); + + // Test deprecated mapred.dfsclient.parallelism.max + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2); + + properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, + InputEstimatorTestClass.class.getName()); + InputEstimatorTestClass.setEstimation( + new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE)); + + /* + * Let's write more bytes to the files to test that Estimator is actually + * working returning the file size not from the filesystem + */ + summary = runTestGetInputSummary(jobConf, properties, testPaths, + BYTES_PER_FILE * 2, HiveInputFormat.class, Collections.emptyMap()); + assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength()); + + // Current getInputSummary() returns -1 for each file found + assertEquals(testPaths.size() * -1, summary.getFileCount()); + + // Current getInputSummary() returns -1 for each file found + assertEquals(testPaths.size() * -1, summary.getDirectoryCount()); + } + + @Test + public void testGetInputSummaryWithASingleThread() throws IOException { + final int BYTES_PER_FILE = 5; + + // Set to zero threads to disable thread pool + jobConf.setInt( + HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); + + final Collection testPaths = Arrays.asList(new Path("p1/test.txt"), + new Path("p2/test.txt"), new Path("p3/test.txt"), + new Path("p4/test.txt"), new Path("p5/test.txt")); + + ContentSummary summary = + runTestGetInputSummary(jobConf, properties, testPaths, BYTES_PER_FILE, + HiveInputFormat.class, Collections.emptyMap()); + assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength()); + assertEquals(testPaths.size(), summary.getFileCount()); + assertEquals(testPaths.size(), summary.getDirectoryCount()); + } + + @Test + public void testGetInputSummaryWithContentSummaryInputFormat() + throws IOException { + final int BYTES_PER_FILE = 10; + + final Collection testPaths = Arrays.asList(new Path("p1/test.txt"), + new Path("p2/test.txt"), new Path("p3/test.txt"), + new Path("p4/test.txt"), new Path("p5/test.txt")); + + jobConf.setInt(ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2); + + ContentSummaryInputFormatTestClass + .setContentSummary(new ContentSummary.Builder().length(BYTES_PER_FILE) + .fileCount(2).directoryCount(1).build()); + + /* + * Write more bytes to the files to test that ContentSummaryInputFormat is + * actually working returning the file size not from the filesystem + */ + ContentSummary summary = runTestGetInputSummary(jobConf, properties, + testPaths, BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class, + Collections.emptyMap()); + assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength()); + assertEquals(testPaths.size() * 2, summary.getFileCount()); + assertEquals(testPaths.size(), summary.getDirectoryCount()); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetInputSummaryPool() + throws ExecutionException, InterruptedException, IOException { + ExecutorService pool = mock(ExecutorService.class); + when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); + + Set pathNeedProcess = new HashSet<>(); + pathNeedProcess.add(new Path("dummy-path1")); + pathNeedProcess.add(new Path("dummy-path2")); + pathNeedProcess.add(new Path("dummy-path3")); + + Context context = new Context(jobConf); + + Utilities.getInputSummaryWithPool(context, pathNeedProcess, + mock(MapWork.class), new long[3], pool); + verify(pool, times(3)).submit(any(Runnable.class)); + verify(pool).shutdown(); + verify(pool).shutdownNow(); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetInputSummaryPoolAndFailure() + throws ExecutionException, InterruptedException, IOException { + ExecutorService pool = mock(ExecutorService.class); + when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); + + Set pathNeedProcess = new HashSet<>(); + pathNeedProcess.add(new Path("dummy-path1")); + pathNeedProcess.add(new Path("dummy-path2")); + pathNeedProcess.add(new Path("dummy-path3")); + + Context context = new Context(jobConf); + + Utilities.getInputSummaryWithPool(context, pathNeedProcess, + mock(MapWork.class), new long[3], pool); + verify(pool, times(3)).submit(any(Runnable.class)); + verify(pool).shutdown(); + verify(pool).shutdownNow(); + } + + @SuppressWarnings("rawtypes") + private ContentSummary runTestGetInputSummary(JobConf jobConf, + Properties properties, Collection testPaths, int bytesPerFile, + Class inputFormatClass, + Map cache) throws IOException { + + final FileSystem fs = FileSystem.getLocal(jobConf); + + MapWork mapWork = new MapWork(); + Context context = new Context(jobConf); + + for (Map.Entry entry : cache.entrySet()) { + final Path partitionPath = new Path(TEST_TABLE_PATH, entry.getKey()); + context.addCS(partitionPath.toString(), entry.getValue()); + } + + LinkedHashMap pathToPartitionInfo = + new LinkedHashMap<>(); + LinkedHashMap> pathToAliasTable = + new LinkedHashMap<>(); + TableScanOperator scanOp = new TableScanOperator(); + + PartitionDesc partitionDesc = new PartitionDesc( + new TableDesc(inputFormatClass, null, properties), null); + + final byte[] data = new byte[bytesPerFile]; + for (final Path path : testPaths) { + final Path fullPath = new Path(TEST_TABLE_PATH, path); + final Path partitionPath = fullPath.getParent(); + + fs.mkdirs(partitionPath); + + FSDataOutputStream out = fs.create(fullPath); + out.write(data); + out.close(); + + pathToPartitionInfo.put(partitionPath, partitionDesc); + pathToAliasTable.put(partitionPath, + Lists.newArrayList(partitionPath.getName())); + mapWork.getAliasToWork().put(partitionPath.getName(), scanOp); + } + + mapWork.setPathToAliases(pathToAliasTable); + mapWork.setPathToPartitionInfo(pathToPartitionInfo); + + return Utilities.getInputSummary(context, mapWork, null); + } + + @SuppressWarnings("rawtypes") + static class ContentSummaryInputFormatTestClass extends FileInputFormat + implements ContentSummaryInputFormat { + private static ContentSummary summary = + new ContentSummary.Builder().build(); + + public static void setContentSummary(ContentSummary contentSummary) { + summary = contentSummary; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, + Reporter reporter) throws IOException { + return null; + } + + @Override + public ContentSummary getContentSummary(Path p, JobConf job) + throws IOException { + return summary; + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 90eb45b..305b467 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -37,12 +37,10 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -50,20 +48,16 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.*; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -73,18 +67,12 @@ import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.RecordReader; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -540,46 +528,6 @@ private void runTestGetInputPaths(JobConf jobConf, int numOfPartitions) throws E } @Test - public void testGetInputSummaryPool() throws ExecutionException, InterruptedException, IOException { - ExecutorService pool = mock(ExecutorService.class); - when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); - - Set pathNeedProcess = new HashSet<>(); - pathNeedProcess.add(new Path("dummy-path1")); - pathNeedProcess.add(new Path("dummy-path2")); - pathNeedProcess.add(new Path("dummy-path3")); - - SessionState.start(new HiveConf()); - JobConf jobConf = new JobConf(); - Context context = new Context(jobConf); - - Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool); - verify(pool, times(3)).submit(any(Runnable.class)); - verify(pool).shutdown(); - verify(pool).shutdownNow(); - } - - @Test - public void testGetInputSummaryPoolAndFailure() throws ExecutionException, InterruptedException, IOException { - ExecutorService pool = mock(ExecutorService.class); - when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); - - Set pathNeedProcess = new HashSet<>(); - pathNeedProcess.add(new Path("dummy-path1")); - pathNeedProcess.add(new Path("dummy-path2")); - pathNeedProcess.add(new Path("dummy-path3")); - - SessionState.start(new HiveConf()); - JobConf jobConf = new JobConf(); - Context context = new Context(jobConf); - - Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool); - verify(pool, times(3)).submit(any(Runnable.class)); - verify(pool).shutdown(); - verify(pool).shutdownNow(); - } - - @Test public void testGetInputPathsPool() throws IOException, ExecutionException, InterruptedException { List pathsToAdd = new ArrayList<>(); Path path = new Path("dummy-path"); @@ -630,166 +578,6 @@ public void testGetInputPathsPoolAndFailure() throws IOException, ExecutionExcep verify(pool).shutdownNow(); } - @Test - public void testGetInputSummaryWithASingleThread() throws IOException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 5; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - } - - @Test - public void testGetInputSummaryWithMultipleThreads() throws IOException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 5; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2); - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - - // Test deprecated mapred.dfsclient.parallelism.max - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); - jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); - summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - } - - @Test - public void testGetInputSummaryWithInputEstimator() throws IOException, HiveException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 10; - final int NUM_OF_ROWS = 5; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); - - properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName()); - InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE)); - - /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */ - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS * -1, summary.getFileCount()); // Current getInputSummary() returns -1 for each file found - assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount()); // Current getInputSummary() returns -1 for each file found - - // Test deprecated mapred.dfsclient.parallelism.max - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2); - - properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName()); - InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE)); - - /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */ - summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS * -1, summary.getFileCount()); // Current getInputSummary() returns -1 for each file found - assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount()); // Current getInputSummary() returns -1 for each file found - } - - static class ContentSummaryInputFormatTestClass extends FileInputFormat implements ContentSummaryInputFormat { - private static ContentSummary summary = new ContentSummary.Builder().build(); - - public static void setContentSummary(ContentSummary contentSummary) { - summary = contentSummary; - } - - @Override - public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { - return null; - } - - @Override - public ContentSummary getContentSummary(Path p, JobConf job) throws IOException { - return summary; - } - } - - @Test - public void testGetInputSummaryWithContentSummaryInputFormat() throws IOException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 10; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); - - ContentSummaryInputFormatTestClass.setContentSummary( - new ContentSummary.Builder().length(BYTES_PER_FILE).fileCount(2).directoryCount(1).build()); - - /* Let's write more bytes to the files to test that ContentSummaryInputFormat is actually working returning the file size not from the filesystem */ - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS * 2, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - } - - private ContentSummary runTestGetInputSummary(JobConf jobConf, Properties properties, int numOfPartitions, int bytesPerFile, Class inputFormatClass) throws IOException { - // creates scratch directories needed by the Context object - SessionState.start(new HiveConf()); - - MapWork mapWork = new MapWork(); - Context context = new Context(jobConf); - LinkedHashMap pathToPartitionInfo = new LinkedHashMap<>(); - LinkedHashMap> pathToAliasTable = new LinkedHashMap<>(); - TableScanOperator scanOp = new TableScanOperator(); - - PartitionDesc partitionDesc = new PartitionDesc(new TableDesc(inputFormatClass, null, properties), null); - - String testTableName = "testTable"; - - Path testTablePath = new Path(testTableName); - Path[] testPartitionsPaths = new Path[numOfPartitions]; - for (int i=0; i getDependencyCollectionTask(){ return TaskFactory.get(new DependencyCollectionWork()); }