diff --git a/common/src/java/org/apache/hadoop/hive/common/pool/CollectionSizePartitionPolicy.java b/common/src/java/org/apache/hadoop/hive/common/pool/CollectionSizePartitionPolicy.java new file mode 100644 index 0000000..160842f --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/pool/CollectionSizePartitionPolicy.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Base partition policy class that requires subclasses to define the number of + * partitions to create based on the size of the target collection. + */ +public abstract class CollectionSizePartitionPolicy implements PartitionPolicy { + + @Override + @SuppressWarnings("unchecked") + public List> partition(final Collection c) { + if (c.isEmpty()) { + return Collections.emptyList(); + } + final int elementCount = c.size(); + final int partitionCount = getPartitionCount(elementCount); + Preconditions + .checkArgument(partitionCount > 0 && partitionCount <= elementCount); + final int elementsPerPartition = + (elementCount + partitionCount - 1) / partitionCount; + final List list = (c instanceof List) ? (List) c : new ArrayList<>(c); + final List partitions = Lists.partition(list, elementsPerPartition); + return (List>) partitions; + } + + /** + * Give then size of a Collection, return the number of partitions (sublists) + * it should be partitioned into. Each sublist is the same size though the + * final list may be smaller. Returned value must be greater than 0 and less + * than or equal to the collectionCount. + * + * @param collectionCount The number of elements in the Collections + * @return The number of partitions to create + */ + protected abstract int getPartitionCount(int collectionCount); + +} diff --git a/common/src/java/org/apache/hadoop/hive/common/pool/LnPartitionPolicy.java b/common/src/java/org/apache/hadoop/hive/common/pool/LnPartitionPolicy.java new file mode 100644 index 0000000..43e4c2c --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/pool/LnPartitionPolicy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import java.util.Collection; +import java.util.List; + +/** + * A policy which partitions a Collection into consecutive sublists. The + * sublists are populated in the order in which they are returned by the + * Collection's iterator. Each sublist is the same size though the final list + * may be smaller. The number of partitions returns is equal to the natural log + * of the size of the Collection. + */ +public class LnPartitionPolicy extends CollectionSizePartitionPolicy { + + private static LnPartitionPolicy INSTANCE = new LnPartitionPolicy(); + + /** + * Convenience method for static invocation. + * + * @param c The Collection to partition + * @return A list of zero or more sublists + */ + @SuppressWarnings("unchecked") + public static List> createPartitions(Collection c) { + List partitions = INSTANCE.partition(c); + return (List>) partitions; + } + + @Override + protected int getPartitionCount(final int collectionCount) { + return Math.max(1, (int) Math.log(collectionCount)); + } +} diff --git a/common/src/java/org/apache/hadoop/hive/common/pool/PartitionPolicy.java b/common/src/java/org/apache/hadoop/hive/common/pool/PartitionPolicy.java new file mode 100644 index 0000000..050b6de --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/pool/PartitionPolicy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import java.util.Collection; +import java.util.List; + +/** + * This interface allows for different policies that partitions a Collection + * into zero or more sublists. + */ +public interface PartitionPolicy { + + /** + * Partitions a Collection into sublists. The number and size of each sublist + * will be dependent on the underlying implementation. This interface provides + * no stipulation regarding the order of the items within each sublist. + * + * @param c The collection to partition + * @return A list of zero or more sublists + */ + List> partition(Collection c); + +} diff --git a/common/src/java/org/apache/hadoop/hive/common/pool/ThreadPools.java b/common/src/java/org/apache/hadoop/hive/common/pool/ThreadPools.java new file mode 100644 index 0000000..cda380e --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/pool/ThreadPools.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.MoreExecutors; + +public class ThreadPools { + + private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class); + + public static ExecutorService createFixedThreadPool(final int nThreads) { + return createFixedThreadPool(nThreads, true); + } + + public static ExecutorService createFixedThreadPool(final int nThreads, + final boolean systemCap) { + if (nThreads <= 0) { + LOG.debug("Requested {} threads. Returning DirectExecutorService", + nThreads); + return MoreExecutors.newDirectExecutorService(); + } + int threadCount = nThreads; + if (systemCap) { + int sysThreads = Runtime.getRuntime().availableProcessors(); + threadCount = Math.min(sysThreads, threadCount); + } + LOG.debug("Requested {} threads. Limited by CPU count to {} threads", + nThreads, threadCount); + return Executors.newFixedThreadPool(threadCount); + } +} diff --git a/common/src/test/org/apache/hadoop/hive/common/pool/TestLnPartitionPolicy.java b/common/src/test/org/apache/hadoop/hive/common/pool/TestLnPartitionPolicy.java new file mode 100644 index 0000000..14aa60f --- /dev/null +++ b/common/src/test/org/apache/hadoop/hive/common/pool/TestLnPartitionPolicy.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.pool; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; + +public class TestLnPartitionPolicy { + + private LnPartitionPolicy partitionPolicy; + + @Before + public void setup() { + partitionPolicy = new LnPartitionPolicy(); + } + + @Test + public void testEmptyCollection() { + List> result = + this.partitionPolicy.partition(Collections.emptyList()); + assertEquals(0, result.size()); + } + + @Test + public void testPartitionCollection() { + Collection testData = Arrays.asList(1, 2, 3, 4, 5, 1, 2, 3, 4); + List> result = this.partitionPolicy.partition(testData); + + assertEquals(2, result.size()); + + assertEquals(1, result.get(0).get(0)); + assertEquals(2, result.get(0).get(1)); + assertEquals(3, result.get(0).get(2)); + assertEquals(4, result.get(0).get(3)); + assertEquals(5, result.get(0).get(4)); + + assertEquals(1, result.get(1).get(0)); + assertEquals(2, result.get(1).get(1)); + assertEquals(3, result.get(1).get(2)); + assertEquals(4, result.get(1).get(3)); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 5f2539f..3cc8749 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -31,18 +33,16 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.hive.common.StringInternUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.common.pool.LnPartitionPolicy; +import org.apache.hadoop.hive.common.pool.ThreadPools; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -64,6 +64,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; /** @@ -78,39 +82,31 @@ private static final String CLASS_NAME = CombineHiveInputFormat.class.getName(); public static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - // max number of threads we can use to check non-combinable paths - private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50; - private static final int DEFAULT_NUM_PATH_PER_THREAD = 100; - - private class CheckNonCombinablePathCallable implements Callable> { - private final Path[] paths; - private final int start; - private final int length; + private class CheckNonCombinablePathCallable implements Callable> { + private final Collection paths; private final JobConf conf; private final boolean isMerge; - public CheckNonCombinablePathCallable( - Path[] paths, int start, int length, JobConf conf, boolean isMerge) { + public CheckNonCombinablePathCallable(Collection paths, JobConf conf, + boolean isMerge) { this.paths = paths; - this.start = start; - this.length = length; this.conf = conf; this.isMerge = isMerge; } @Override - public Set call() throws Exception { - Set nonCombinablePathIndices = new HashSet(); - for (int i = 0; i < length; i++) { - PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively( - pathToPartitionInfo, paths[i + start], - IOPrepareCache.get().allocatePartitionDescMap()); + public Set call() throws Exception { + Set nonCombinablePathIndices = new HashSet<>(); + for (final Path path : this.paths) { + PartitionDesc part = + HiveFileFormatUtils.getFromPathRecursively(pathToPartitionInfo, + path, IOPrepareCache.get().allocatePartitionDescMap()); // Use HiveInputFormat if any of the paths is not splittable Class inputFormatClass = part.getInputFileFormatClass(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, conf); boolean isAvoidSplitCombine = inputFormat instanceof AvoidSplitCombination && - ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf); + ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, conf); TableDesc tbl = part.getTableDesc(); boolean isMmNonMerge = false; if (tbl != null) { @@ -121,9 +117,9 @@ public CheckNonCombinablePathCallable( } if (isAvoidSplitCombine || isMmNonMerge) { - Utilities.FILE_OP_LOGGER.info("The path [" + paths[i + start] + + Utilities.FILE_OP_LOGGER.info("The path [" + path + "] is being parked for HiveInputFormat.getSplits"); - nonCombinablePathIndices.add(i + start); + nonCombinablePathIndices.add(path); } } return nonCombinablePathIndices; @@ -472,24 +468,30 @@ public int hashCode() { * Gets all the path indices that should not be combined */ @VisibleForTesting - public Set getNonCombinablePathIndices(JobConf job, Path[] paths, int numThreads) - throws ExecutionException, InterruptedException { - LOG.info("Total number of paths: " + paths.length + - ", launching " + numThreads + " threads to check non-combinable ones."); - int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); - - ExecutorService executor = Executors.newFixedThreadPool(numThreads); - List>> futureList = new ArrayList>>(numThreads); + public Collection getNonCombinablePaths(final JobConf job, + final Path[] paths) throws ExecutionException, InterruptedException { + if (ArrayUtils.isEmpty(paths)) { + LOG.debug("List of paths is empty"); + return Collections.emptyList(); + } + + final List> partitions = + LnPartitionPolicy.createPartitions(Arrays.asList(paths)); + final int partitionCount = partitions.size(); + + LOG.info("Check non-combinable Paths. Count: {}, Partitions: {}", + paths.length, partitionCount); + + ExecutorService executor = ThreadPools.createFixedThreadPool(partitionCount); + List>> futureList = new ArrayList<>(partitionCount); try { boolean isMerge = mrwork != null && mrwork.isMergeFromResolver(); - for (int i = 0; i < numThreads; i++) { - int start = i * numPathPerThread; - int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; - futureList.add(executor.submit(new CheckNonCombinablePathCallable( - paths, start, length, job, isMerge))); + for (final List partition : partitions) { + futureList.add(executor.submit( + new CheckNonCombinablePathCallable(partition, job, isMerge))); } - Set nonCombinablePathIndices = new HashSet(); - for (Future> future : futureList) { + Set nonCombinablePathIndices = new HashSet<>(); + for (Future> future : futureList) { nonCombinablePathIndices.addAll(future.get()); } return nonCombinablePathIndices; @@ -511,30 +513,16 @@ public int hashCode() { Path[] paths = getInputPaths(job); - List nonCombinablePaths = new ArrayList(paths.length / 2); - List combinablePaths = new ArrayList(paths.length / 2); + Collection combinablePaths = new HashSet<>(Arrays.asList(paths)); + Collection nonCombinablePaths = Collections.emptyList(); - int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, - (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); - - // This check is necessary because for Spark branch, the result array from - // getInputPaths() above could be empty, and therefore numThreads could be 0. - // In that case, Executors.newFixedThreadPool will fail. - if (numThreads > 0) { - try { - Set nonCombinablePathIndices = getNonCombinablePathIndices(job, paths, numThreads); - for (int i = 0; i < paths.length; i++) { - if (nonCombinablePathIndices.contains(i)) { - nonCombinablePaths.add(paths[i]); - } else { - combinablePaths.add(paths[i]); - } - } - } catch (Exception e) { - LOG.error("Error checking non-combinable path", e); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); - throw new IOException(e); - } + try { + nonCombinablePaths = getNonCombinablePaths(job, paths); + combinablePaths.removeAll(nonCombinablePaths); + } catch (Exception e) { + LOG.error("Error checking non-combinable path", e); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); + throw new IOException(e); } // Store the previous value for the path specification diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java index 07cef93..d15578a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestCombineHiveInputFormat.java @@ -17,7 +17,12 @@ */ package org.apache.hadoop.hive.ql.io; -import junit.framework.TestCase; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedHashMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,58 +36,77 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; - -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Set; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; /** - * Unittest for CombineHiveInputFormat. + * Unit tests for CombineHiveInputFormat. */ -public class TestCombineHiveInputFormat extends TestCase { - public void testAvoidSplitCombination() throws Exception { - Configuration conf = new Configuration(); - JobConf job = new JobConf(conf); - - TableDesc tblDesc = Utilities.defaultTd; - tblDesc.setInputFileFormatClass(TestSkipCombineInputFormat.class); - PartitionDesc partDesc = new PartitionDesc(tblDesc, null); - LinkedHashMap pt = new LinkedHashMap<>(); - pt.put(new Path("/tmp/testfolder1"), partDesc); - pt.put(new Path("/tmp/testfolder2"), partDesc); - MapredWork mrwork = new MapredWork(); - mrwork.getMapWork().setPathToPartitionInfo(pt); - Path mapWorkPath = new Path("/tmp/" + System.getProperty("user.name"), "hive"); - Utilities.setMapRedWork(conf, mrwork, - mapWorkPath); - - try { - Path[] paths = new Path[2]; - paths[0] = new Path("/tmp/testfolder1"); - paths[1] = new Path("/tmp/testfolder2"); - CombineHiveInputFormat combineInputFormat = - ReflectionUtils.newInstance(CombineHiveInputFormat.class, conf); - combineInputFormat.pathToPartitionInfo = - Utilities.getMapWork(conf).getPathToPartitionInfo(); - Set results = combineInputFormat.getNonCombinablePathIndices(job, paths, 2); - assertEquals("Should have both path indices in the results set", 2, results.size()); - } finally { - // Cleanup the mapwork path - FileSystem.get(conf).delete(mapWorkPath, true); - } +public class TestCombineHiveInputFormat { + + private Configuration conf; + private JobConf job; + private Path mapWorkPath; + + @Before + public void setup() { + this.conf = new Configuration(); + this.job = new JobConf(); + this.mapWorkPath = + new Path("/tmp/" + System.getProperty("user.name"), "hive"); + } + + @After + public void teardown() throws IOException { + FileSystem.get(this.conf).delete(mapWorkPath, true); + } + + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testAvoidSplitCombination() throws Exception { + TableDesc tblDesc = Utilities.defaultTd; + tblDesc.setInputFileFormatClass(TestSkipCombineInputFormat.class); + + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap<>(); + pt.put(new Path("/tmp/testfolder1"), partDesc); + pt.put(new Path("/tmp/testfolder2"), partDesc); + + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + + Utilities.setMapRedWork(conf, mrwork, mapWorkPath); + + final Path[] paths = new Path[] { new Path("/tmp/testfolder1"), + new Path("/tmp/testfolder2") }; + + CombineHiveInputFormat combineInputFormat = + ReflectionUtils.newInstance(CombineHiveInputFormat.class, conf); + combineInputFormat.pathToPartitionInfo = + Utilities.getMapWork(conf).getPathToPartitionInfo(); + + Collection results = + combineInputFormat.getNonCombinablePaths(job, paths); + + assertEquals("Should have both path indices in the results set", 2, + results.size()); + } + + @SuppressWarnings("rawtypes") + public static class TestSkipCombineInputFormat extends FileInputFormat + implements CombineHiveInputFormat.AvoidSplitCombination { + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, + Reporter reporter) throws IOException { + return null; } - public static class TestSkipCombineInputFormat extends FileInputFormat - implements CombineHiveInputFormat.AvoidSplitCombination { - @Override public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, - Reporter reporter) throws IOException { - return null; - } - - @Override public boolean shouldSkipCombine(Path path, Configuration conf) - throws IOException { - // Skip combine for all paths - return true; - } + @Override + public boolean shouldSkipCombine(Path path, Configuration conf) + throws IOException { + // Skip combine for all paths + return true; } + } }