diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index a598ccc..49b03a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -29,10 +29,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -108,7 +110,7 @@ public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobCo LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits"); } - nonCombinablePathIndices.add(i); + nonCombinablePathIndices.add(i + start); } } return nonCombinablePathIndices; @@ -451,6 +453,35 @@ public int hashCode() { } /** + * Gets all the path indices that should not be combined + */ + @VisibleForTesting + public Set getNonCombinablePathIndices(JobConf job, Path[] paths, int numThreads) + throws ExecutionException, InterruptedException { + LOG.info("Total number of paths: " + paths.length + + ", launching " + numThreads + " threads to check non-combinable ones."); + int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + List>> futureList = new ArrayList>>(numThreads); + try { + for (int i = 0; i < numThreads; i++) { + int start = i * numPathPerThread; + int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; + futureList.add(executor.submit( + new CheckNonCombinablePathCallable(paths, start, length, job))); + } + Set nonCombinablePathIndices = new HashSet(); + for (Future> future : futureList) { + nonCombinablePathIndices.addAll(future.get()); + } + return nonCombinablePathIndices; + } finally { + executor.shutdownNow(); + } + } + + /** * Create Hive splits based on CombineFileSplit. */ @Override @@ -468,27 +499,13 @@ public int hashCode() { int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); - int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); // This check is necessary because for Spark branch, the result array from // getInputPaths() above could be empty, and therefore numThreads could be 0. // In that case, Executors.newFixedThreadPool will fail. if (numThreads > 0) { - LOG.info("Total number of paths: " + paths.length + - ", launching " + numThreads + " threads to check non-combinable ones."); - ExecutorService executor = Executors.newFixedThreadPool(numThreads); - List>> futureList = new ArrayList>>(numThreads); try { - for (int i = 0; i < numThreads; i++) { - int start = i * numPathPerThread; - int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; - futureList.add(executor.submit( - new CheckNonCombinablePathCallable(paths, start, length, job))); - } - Set nonCombinablePathIndices = new HashSet(); - for (Future> future : futureList) { - nonCombinablePathIndices.addAll(future.get()); - } + Set nonCombinablePathIndices = getNonCombinablePathIndices(job, paths, numThreads); for (int i = 0; i < paths.length; i++) { if (nonCombinablePathIndices.contains(i)) { nonCombinablePaths.add(paths[i]); @@ -500,8 +517,6 @@ public int hashCode() { LOG.error("Error checking non-combinable path", e); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); throw new IOException(e); - } finally { - executor.shutdownNow(); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java new file mode 100644 index 0000000..9b8a519 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Set; + +/** + * Unittest for CombineHiveInputFormat. + */ +public class TestCombineHiveInputFormat extends TestCase { + public void testAvoidSplitCombination() throws Exception { + Configuration conf = new Configuration(); + JobConf job = new JobConf(conf); + + TableDesc tblDesc = Utilities.defaultTd; + tblDesc.setInputFileFormatClass(TestSkipCombineInputFormat.class); + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap(); + pt.put("/tmp/testfolder1", partDesc); + pt.put("/tmp/testfolder2", partDesc); + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + Path mapWorkPath = new Path("/tmp/" + System.getProperty("user.name"), "hive"); + Utilities.setMapRedWork(conf, mrwork, + mapWorkPath); + + try { + Path[] paths = new Path[2]; + paths[0] = new Path("/tmp/testfolder1"); + paths[1] = new Path("/tmp/testfolder2"); + CombineHiveInputFormat combineInputFormat = + ReflectionUtils.newInstance(CombineHiveInputFormat.class, conf); + combineInputFormat.pathToPartitionInfo = + Utilities.getMapWork(conf).getPathToPartitionInfo(); + Set results = combineInputFormat.getNonCombinablePathIndices(job, paths, 2); + assertEquals("Should have both path indices in the results set", 2, results.size()); + } finally { + // Cleanup the mapwork path + FileSystem.get(conf).delete(mapWorkPath, true); + } + } + + public static class TestSkipCombineInputFormat extends FileInputFormat + implements CombineHiveInputFormat.AvoidSplitCombination { + @Override public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, + Reporter reporter) throws IOException { + return null; + } + + @Override public boolean shouldSkipCombine(Path path, Configuration conf) + throws IOException { + // Skip combine for all paths + return true; + } + } +}