diff --git a/common/src/java/org/apache/hadoop/hive/common/pool/CollectionSizePartitionPolicy.java b/common/src/java/org/apache/hadoop/hive/common/pool/CollectionSizePartitionPolicy.java new file mode 100644 index 0000000..160842f --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/pool/CollectionSizePartitionPolicy.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Base partition policy class that requires subclasses to define the number of + * partitions to create based on the size of the target collection. + */ +public abstract class CollectionSizePartitionPolicy implements PartitionPolicy { + + @Override + @SuppressWarnings("unchecked") + public List> partition(final Collection c) { + if (c.isEmpty()) { + return Collections.emptyList(); + } + final int elementCount = c.size(); + final int partitionCount = getPartitionCount(elementCount); + Preconditions + .checkArgument(partitionCount > 0 && partitionCount <= elementCount); + final int elementsPerPartition = + (elementCount + partitionCount - 1) / partitionCount; + final List list = (c instanceof List) ? (List) c : new ArrayList<>(c); + final List partitions = Lists.partition(list, elementsPerPartition); + return (List>) partitions; + } + + /** + * Give then size of a Collection, return the number of partitions (sublists) + * it should be partitioned into. Each sublist is the same size though the + * final list may be smaller. Returned value must be greater than 0 and less + * than or equal to the collectionCount. + * + * @param collectionCount The number of elements in the Collections + * @return The number of partitions to create + */ + protected abstract int getPartitionCount(int collectionCount); + +} diff --git a/common/src/java/org/apache/hadoop/hive/common/pool/LnPartitionPolicy.java b/common/src/java/org/apache/hadoop/hive/common/pool/LnPartitionPolicy.java new file mode 100644 index 0000000..9784080 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/pool/LnPartitionPolicy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import java.util.Collection; +import java.util.List; + +/** + * A policy which partitions a Collection into consecutive sublists. The + * sublists are populated in the order in which they are returned by the + * Collection's iterator. Each sublist is the same size though the final list + * may be smaller. The number of partitions returns is equal to the natural log + * of the size of the Collection. + */ +public class LnPartitionPolicy extends CollectionSizePartitionPolicy { + + private static final LnPartitionPolicy INSTANCE = new LnPartitionPolicy(); + + /** + * Convenience method for static invocation. + * + * @param c The Collection to partition + * @return A list of zero or more sublists + */ + @SuppressWarnings("unchecked") + public static List> createPartitions(Collection c) { + List partitions = INSTANCE.partition(c); + return (List>) partitions; + } + + @Override + protected int getPartitionCount(final int collectionCount) { + return Math.max(1, (int) Math.log(collectionCount)); + } +} diff --git a/common/src/java/org/apache/hadoop/hive/common/pool/PartitionPolicy.java b/common/src/java/org/apache/hadoop/hive/common/pool/PartitionPolicy.java new file mode 100644 index 0000000..050b6de --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/pool/PartitionPolicy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import java.util.Collection; +import java.util.List; + +/** + * This interface allows for different policies that partitions a Collection + * into zero or more sublists. + */ +public interface PartitionPolicy { + + /** + * Partitions a Collection into sublists. The number and size of each sublist + * will be dependent on the underlying implementation. This interface provides + * no stipulation regarding the order of the items within each sublist. + * + * @param c The collection to partition + * @return A list of zero or more sublists + */ + List> partition(Collection c); + +} diff --git a/common/src/java/org/apache/hadoop/hive/common/pool/ThreadPools.java b/common/src/java/org/apache/hadoop/hive/common/pool/ThreadPools.java new file mode 100644 index 0000000..37165b7 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/pool/ThreadPools.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.MoreExecutors; + +/** + * A utility class for creating thread pools implemented as + * {@link ExecutorService}. + */ +public final class ThreadPools { + + private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class); + + private ThreadPools() { + } + + /** + * Creates a thread pool that reuses a fixed number of threads operating off a + * shared unbounded queue where the number of threads in the pool is equal to + * the number of processors available to the Java virtual machine. If + * additional tasks are submitted when all threads are active, they will wait + * in the queue until a thread is available. If any thread terminates due to a + * failure during execution prior to shutdown, a new one will take its place + * if needed to execute subsequent tasks. The threads in the pool will exist + * until it is explicitly shutdown. + * + * @see {@link MoreExecutors#newDirectExecutorService} + * @see {@link Runtime#availableProcessors()} + * @return An ExecutorService, must be explicitly shutdown + */ + public static ExecutorService createFixedThreadPool() { + return createFixedThreadPool(Integer.MAX_VALUE, true); + } + + /** + * Creates a thread pool that reuses a fixed number of threads operating off a + * shared unbounded queue. At any point, at most nThreads threads will be + * actively processing tasks, however, the number of threads in the returned + * ExecutorService are capped at at the number of processors available to the + * Java virtual machine. If additional tasks are submitted when all threads + * are active, they will wait in the queue until a thread is available. If any + * thread terminates due to a failure during execution prior to shutdown, a + * new one will take its place if needed to execute subsequent tasks. The + * threads in the pool will exist until it is explicitly shutdown. If + * requesting a thread pool with fewer than one thread (including negative) + * this method will return a DirectExecutorService. This ExecutorService runs + * each task in the thread that invokes execute/submit. + * + * @see {@link MoreExecutors#newDirectExecutorService} + * @see {@link Runtime#availableProcessors()} + * @param nThreads The number of active threads in the thread pool + * @return An ExecutorService, must be explicitly shutdown + */ + public static ExecutorService createFixedThreadPool(final int nThreads) { + return createFixedThreadPool(nThreads, true); + } + + /** + * Creates a thread pool that reuses a fixed number of threads operating off a + * shared unbounded queue. At any point, at most nThreads threads will be + * actively processing tasks. The number of threads may optionally be capped + * at the number of processors available to the Java virtual machine. If + * additional tasks are submitted when all threads are active, they will wait + * in the queue until a thread is available. If any thread terminates due to a + * failure during execution prior to shutdown, a new one will take its place + * if needed to execute subsequent tasks. The threads in the pool will exist + * until it is explicitly shutdown. If requesting a thread pool with fewer + * than one thread (including negative) this method will return a + * DirectExecutorService. This ExecutorService runs each task in the thread + * that invokes execute/submit. + * + * @see {@link MoreExecutors#newDirectExecutorService} + * @see {@link Runtime#availableProcessors()} + * @param nThreads The number of active threads in the thread pool + * @param systemCap Cap the number of active threads at the number of + * processors available to the Java virtual machine. + * @return An ExecutorService, must be explicitly shutdown + */ + public static ExecutorService createFixedThreadPool(final int nThreads, + final boolean systemCap) { + if (nThreads <= 0) { + LOG.debug("Requested {} threads. Returning DirectExecutorService", + nThreads); + return MoreExecutors.newDirectExecutorService(); + } + int threadCount = nThreads; + if (systemCap) { + int sysThreads = Runtime.getRuntime().availableProcessors(); + threadCount = Math.min(sysThreads, threadCount); + } + LOG.debug("Requested {} threads. Limited by CPU count to {} threads", + nThreads, threadCount); + return Executors.newFixedThreadPool(threadCount); + } +} diff --git a/common/src/test/org/apache/hadoop/hive/common/pool/TestLnPartitionPolicy.java b/common/src/test/org/apache/hadoop/hive/common/pool/TestLnPartitionPolicy.java new file mode 100644 index 0000000..22f20e4 --- /dev/null +++ b/common/src/test/org/apache/hadoop/hive/common/pool/TestLnPartitionPolicy.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; + +/** + * Test suite for {@link LnPartitionPolicy}. + */ +public class TestLnPartitionPolicy { + + private LnPartitionPolicy partitionPolicy; + + @Before + public void setup() { + partitionPolicy = new LnPartitionPolicy(); + } + + @Test + public void testEmptyCollection() { + List> result = + this.partitionPolicy.partition(Collections.emptyList()); + assertEquals(0, result.size()); + } + + @Test + public void testPartitionCollection() { + Collection testData = Arrays.asList(1, 2, 3, 4, 5, 1, 2, 3, 4); + List> result = this.partitionPolicy.partition(testData); + + assertEquals(2, result.size()); + + assertEquals(1, result.get(0).get(0)); + assertEquals(2, result.get(0).get(1)); + assertEquals(3, result.get(0).get(2)); + assertEquals(4, result.get(0).get(3)); + assertEquals(5, result.get(0).get(4)); + + assertEquals(1, result.get(1).get(0)); + assertEquals(2, result.get(1).get(1)); + assertEquals(3, result.get(1).get(2)); + assertEquals(4, result.get(1).get(3)); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 5f2539f..4ec4ae8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -23,26 +23,28 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.hive.common.StringInternUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.common.pool.LnPartitionPolicy; +import org.apache.hadoop.hive.common.pool.ThreadPools; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -64,6 +66,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; /** @@ -78,39 +84,34 @@ private static final String CLASS_NAME = CombineHiveInputFormat.class.getName(); public static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - // max number of threads we can use to check non-combinable paths - private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50; - private static final int DEFAULT_NUM_PATH_PER_THREAD = 100; - - private class CheckNonCombinablePathCallable implements Callable> { - private final Path[] paths; - private final int start; - private final int length; + private class CheckNonCombinablePathCallable implements Callable> { + private final Collection paths; private final JobConf conf; private final boolean isMerge; + private final Map pathPartitionMap; - public CheckNonCombinablePathCallable( - Path[] paths, int start, int length, JobConf conf, boolean isMerge) { + CheckNonCombinablePathCallable(Collection paths, JobConf conf, + boolean isMerge, Map pathPartitionMap) { this.paths = paths; - this.start = start; - this.length = length; this.conf = conf; this.isMerge = isMerge; + this.pathPartitionMap = pathPartitionMap; } @Override - public Set call() throws Exception { - Set nonCombinablePathIndices = new HashSet(); - for (int i = 0; i < length; i++) { - PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively( - pathToPartitionInfo, paths[i + start], - IOPrepareCache.get().allocatePartitionDescMap()); + public Set call() throws Exception { + Set nonCombinablePaths = new HashSet<>(); + for (final Path path : this.paths) { + LOG.trace("Checking if path is combinable: {}", path); + PartitionDesc part = + HiveFileFormatUtils.getFromPathRecursively(pathPartitionMap, + path, IOPrepareCache.get().allocatePartitionDescMap()); // Use HiveInputFormat if any of the paths is not splittable Class inputFormatClass = part.getInputFileFormatClass(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, conf); boolean isAvoidSplitCombine = inputFormat instanceof AvoidSplitCombination && - ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf); + ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, conf); TableDesc tbl = part.getTableDesc(); boolean isMmNonMerge = false; if (tbl != null) { @@ -121,12 +122,12 @@ public CheckNonCombinablePathCallable( } if (isAvoidSplitCombine || isMmNonMerge) { - Utilities.FILE_OP_LOGGER.info("The path [" + paths[i + start] + + Utilities.FILE_OP_LOGGER.info("The path [" + path + "] is being parked for HiveInputFormat.getSplits"); - nonCombinablePathIndices.add(i + start); + nonCombinablePaths.add(path); } } - return nonCombinablePathIndices; + return nonCombinablePaths; } } @@ -346,14 +347,16 @@ public int hashCode() { return splits; } - if (combine.getInputPathsShim(job).length == 0) { + final Path[] shimInputPaths = combine.getInputPathsShim(job); + if (ArrayUtils.isEmpty(shimInputPaths)) { throw new IOException("No input paths specified in job"); } ArrayList result = new ArrayList(); - // combine splits only from same tables and same partitions. Do not combine splits from multiple - // tables or multiple partitions. - Path[] paths = StringInternUtils.internUriStringsInPathArray(combine.getInputPathsShim(job)); + // combine splits only from same tables and same partitions. Do not combine + // splits from multiple tables or multiple partitions. + final Path[] paths = + StringInternUtils.internUriStringsInPathArray(shimInputPaths); List inpDirs = new ArrayList(); List inpFiles = new ArrayList(); @@ -364,7 +367,7 @@ public int hashCode() { for (Path path : paths) { if (lDrvStat != null && lDrvStat.isAborted()) { - throw new IOException("Operation is Canceled. "); + throw new IOException("Operation is Canceled"); } PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively( @@ -410,13 +413,13 @@ public int hashCode() { f = poolMap.get(combinePathInputFormat); if (f == null) { f = new CombineFilter(filterPath); - LOG.info("CombineHiveInputSplit creating pool for " + path + - "; using filter path " + filterPath); + LOG.debug("CombineHiveInputSplit: creating pool for {}; " + + "using filter path {}", path, filterPath); combine.createPool(job, f); poolMap.put(combinePathInputFormat, f); } else { - LOG.debug("CombineHiveInputSplit: pool is already created for " + path + - "; using filter path " + filterPath); + LOG.debug("CombineHiveInputSplit: pool is already created for {}; " + + "using filter path {}", path, filterPath); f.addPath(filterPath); } } else { @@ -447,7 +450,7 @@ public int hashCode() { processPaths(job, combine, iss, path); } - if (inpFiles.size() > 0) { + if (!inpFiles.isEmpty()) { // Processing files for (Path filterPath : poolSet) { combine.createPool(job, new CombineFilter(filterPath)); @@ -456,7 +459,7 @@ public int hashCode() { } } - if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) { + if (MapUtils.isNotEmpty(mrwork.getNameToSplitSample())) { iss = sampleSplits(iss); } @@ -464,35 +467,42 @@ public int hashCode() { CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is, pathToPartitionInfo); result.add(csplit); } - LOG.debug("Number of splits " + result.size()); - return result.toArray(new InputSplit[result.size()]); + LOG.debug("Number of splits {}", result.size()); + return result.toArray(new InputSplit[0]); } /** * Gets all the path indices that should not be combined */ @VisibleForTesting - public Set getNonCombinablePathIndices(JobConf job, Path[] paths, int numThreads) - throws ExecutionException, InterruptedException { - LOG.info("Total number of paths: " + paths.length + - ", launching " + numThreads + " threads to check non-combinable ones."); - int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); - - ExecutorService executor = Executors.newFixedThreadPool(numThreads); - List>> futureList = new ArrayList>>(numThreads); + public Collection getNonCombinablePaths(final JobConf job, + final Path[] paths) throws ExecutionException, InterruptedException { + if (ArrayUtils.isEmpty(paths)) { + LOG.debug("List of paths is empty"); + return Collections.emptyList(); + } + + final List> partitions = + LnPartitionPolicy.createPartitions(Arrays.asList(paths)); + final int partitionCount = partitions.size(); + + LOG.info("Check non-combinable Paths. Paths: {}, Threads: {}", + paths.length, partitionCount); + + ExecutorService executor = ThreadPools.createFixedThreadPool(partitionCount); + List>> futureList = new ArrayList<>(partitionCount); try { boolean isMerge = mrwork != null && mrwork.isMergeFromResolver(); - for (int i = 0; i < numThreads; i++) { - int start = i * numPathPerThread; - int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; - futureList.add(executor.submit(new CheckNonCombinablePathCallable( - paths, start, length, job, isMerge))); + for (final List partition : partitions) { + futureList.add(executor.submit( + new CheckNonCombinablePathCallable(partition, job, isMerge, + Collections.unmodifiableMap(pathToPartitionInfo)))); } - Set nonCombinablePathIndices = new HashSet(); - for (Future> future : futureList) { - nonCombinablePathIndices.addAll(future.get()); + final Set nonCombinablePaths = new HashSet<>(); + for (Future> future : futureList) { + nonCombinablePaths.addAll(future.get()); } - return nonCombinablePathIndices; + return nonCombinablePaths; } finally { executor.shutdownNow(); } @@ -511,60 +521,44 @@ public int hashCode() { Path[] paths = getInputPaths(job); - List nonCombinablePaths = new ArrayList(paths.length / 2); - List combinablePaths = new ArrayList(paths.length / 2); + // Ordering matters (TreeSet) otherwise qtests break + Collection combinablePaths = new TreeSet<>(Arrays.asList(paths)); + Collection nonCombinablePaths = Collections.emptyList(); - int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, - (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); + try { + nonCombinablePaths = getNonCombinablePaths(job, paths); + combinablePaths.removeAll(nonCombinablePaths); - // This check is necessary because for Spark branch, the result array from - // getInputPaths() above could be empty, and therefore numThreads could be 0. - // In that case, Executors.newFixedThreadPool will fail. - if (numThreads > 0) { - try { - Set nonCombinablePathIndices = getNonCombinablePathIndices(job, paths, numThreads); - for (int i = 0; i < paths.length; i++) { - if (nonCombinablePathIndices.contains(i)) { - nonCombinablePaths.add(paths[i]); - } else { - combinablePaths.add(paths[i]); - } - } - } catch (Exception e) { - LOG.error("Error checking non-combinable path", e); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); - throw new IOException(e); - } + LOG.debug("List of combinable paths: {}", combinablePaths); + LOG.debug("List of non-combinable paths: {}", nonCombinablePaths); + } catch (Exception e) { + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); + throw new IOException("Error checking non-combinable path", e); } // Store the previous value for the path specification - String oldPaths = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); - if (LOG.isDebugEnabled()) { - LOG.debug("The received input paths are: [" + oldPaths + - "] against the property " - + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); - } + String oldPaths = job + .get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); + + LOG.debug("The received input paths are: [{}] against the property {}", + oldPaths, + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); // Process the normal splits - if (nonCombinablePaths.size() > 0) { - FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray - (new Path[nonCombinablePaths.size()])); + if (!nonCombinablePaths.isEmpty()) { + FileInputFormat.setInputPaths(job, + nonCombinablePaths.toArray(new Path[0])); InputSplit[] splits = super.getSplits(job, numSplits); - for (InputSplit split : splits) { - result.add(split); - } + result.addAll(Arrays.asList(splits)); } // Process the combine splits - if (combinablePaths.size() > 0) { - FileInputFormat.setInputPaths(job, combinablePaths.toArray - (new Path[combinablePaths.size()])); + if (!combinablePaths.isEmpty()) { + FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[0])); Map pathToPartitionInfo = this.pathToPartitionInfo != null ? this.pathToPartitionInfo : Utilities.getMapWork(job).getPathToPartitionInfo(); InputSplit[] splits = getCombineSplits(job, numSplits, pathToPartitionInfo); - for (InputSplit split : splits) { - result.add(split); - } + result.addAll(Arrays.asList(splits)); } // Restore the old path information back @@ -584,9 +578,9 @@ public int hashCode() { new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(paths[0]), ZeroRowsInputFormat.class.getName())); } - LOG.info("Number of all splits " + result.size()); + LOG.info("Number of all splits {}", result.size()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); - return result.toArray(new InputSplit[result.size()]); + return result.toArray(new InputSplit[0]); } private void processPaths(JobConf job, CombineFileInputFormatShim combine, @@ -710,7 +704,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, inputFormatClassName = hsplit.inputFormatClassName(); inputFormatClass = job.getClassByName(inputFormatClassName); } catch (Exception e) { - throw new IOException("cannot find class " + inputFormatClassName); + throw new IOException("Cannot find class " + inputFormatClassName); } pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(0)); @@ -742,15 +736,13 @@ public void addPath(Path p) { // in this TestFilter. @Override public boolean accept(Path path) { - boolean find = false; - while (path != null && !find) { - if(pStrings.contains(path.toUri().getPath())) { - find = true; - break; + while (path != null) { + if (pStrings.contains(path.toUri().getPath())) { + return true; } path = path.getParent(); } - return find; + return false; } @Override @@ -758,7 +750,7 @@ public String toString() { StringBuilder s = new StringBuilder(); s.append("PathFilter: "); for (String pString : pStrings) { - s.append(pString + " "); + s.append(pString).append(' '); } return s.toString(); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java index 07cef93..f58fd26 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java @@ -17,7 +17,12 @@ */ package org.apache.hadoop.hive.ql.io; -import junit.framework.TestCase; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedHashMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,58 +36,80 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; - -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Set; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; /** - * Unittest for CombineHiveInputFormat. + * Unit tests for CombineHiveInputFormat. */ -public class TestCombineHiveInputFormat extends TestCase { - public void testAvoidSplitCombination() throws Exception { - Configuration conf = new Configuration(); - JobConf job = new JobConf(conf); - - TableDesc tblDesc = Utilities.defaultTd; - tblDesc.setInputFileFormatClass(TestSkipCombineInputFormat.class); - PartitionDesc partDesc = new PartitionDesc(tblDesc, null); - LinkedHashMap pt = new LinkedHashMap<>(); - pt.put(new Path("/tmp/testfolder1"), partDesc); - pt.put(new Path("/tmp/testfolder2"), partDesc); - MapredWork mrwork = new MapredWork(); - mrwork.getMapWork().setPathToPartitionInfo(pt); - Path mapWorkPath = new Path("/tmp/" + System.getProperty("user.name"), "hive"); - Utilities.setMapRedWork(conf, mrwork, - mapWorkPath); - - try { - Path[] paths = new Path[2]; - paths[0] = new Path("/tmp/testfolder1"); - paths[1] = new Path("/tmp/testfolder2"); - CombineHiveInputFormat combineInputFormat = - ReflectionUtils.newInstance(CombineHiveInputFormat.class, conf); - combineInputFormat.pathToPartitionInfo = - Utilities.getMapWork(conf).getPathToPartitionInfo(); - Set results = combineInputFormat.getNonCombinablePathIndices(job, paths, 2); - assertEquals("Should have both path indices in the results set", 2, results.size()); - } finally { - // Cleanup the mapwork path - FileSystem.get(conf).delete(mapWorkPath, true); - } +public class TestCombineHiveInputFormat { + + private Configuration conf; + private JobConf job; + private Path mapWorkPath; + + @Before + public void setup() { + this.conf = new Configuration(); + this.job = new JobConf(); + this.mapWorkPath = + new Path("/tmp/" + System.getProperty("user.name"), "hive"); + } + + @After + public void teardown() throws IOException { + FileSystem.get(this.conf).delete(mapWorkPath, true); + } + + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testAvoidSplitCombination() throws Exception { + TableDesc tblDesc = Utilities.defaultTd; + tblDesc.setInputFileFormatClass(TestSkipCombineInputFormat.class); + + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap<>(); + pt.put(new Path("/tmp/testfolder1"), partDesc); + pt.put(new Path("/tmp/testfolder2"), partDesc); + + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + + Utilities.setMapRedWork(conf, mrwork, mapWorkPath); + + final Path[] paths = new Path[] {new Path("/tmp/testfolder1"), + new Path("/tmp/testfolder2")}; + + CombineHiveInputFormat combineInputFormat = + ReflectionUtils.newInstance(CombineHiveInputFormat.class, conf); + combineInputFormat.pathToPartitionInfo = + Utilities.getMapWork(conf).getPathToPartitionInfo(); + + Collection results = + combineInputFormat.getNonCombinablePaths(job, paths); + + assertEquals("Should have both path indices in the results set", 2, + results.size()); + } + + /** + * TestSkipCombineInputFormat. + */ + @SuppressWarnings("rawtypes") + public static class TestSkipCombineInputFormat extends FileInputFormat + implements CombineHiveInputFormat.AvoidSplitCombination { + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, + Reporter reporter) throws IOException { + return null; } - public static class TestSkipCombineInputFormat extends FileInputFormat - implements CombineHiveInputFormat.AvoidSplitCombination { - @Override public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, - Reporter reporter) throws IOException { - return null; - } - - @Override public boolean shouldSkipCombine(Path path, Configuration conf) - throws IOException { - // Skip combine for all paths - return true; - } + @Override + public boolean shouldSkipCombine(Path path, Configuration conf) + throws IOException { + // Skip combine for all paths + return true; } + } }