diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 5f7b20b..57d9449 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -32,7 +32,6 @@ import java.util.TreeMap; import java.util.TreeSet; -import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,9 +47,9 @@ import org.apache.hadoop.mapred.split.TezGroupedSplit; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; @@ -69,6 +68,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; +import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -275,7 +275,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr InputSplit[] inputSplitArray = (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap groupedSplit = - grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, + grouper.generateGroupedSplits(jobConf, inputSplitArray, waves, availableSlots, inputName, mainWorkName.isEmpty()); if (mainWorkName.isEmpty() == false) { Multimap singleBucketToGroupedSplit = @@ -283,7 +283,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr singleBucketToGroupedSplit.putAll(key, groupedSplit.values()); groupedSplit = grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots, - HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES)); + HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES), null); secondLevelGroupingDone = true; } bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); @@ -297,7 +297,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr InputSplit[] inputSplitArray = (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap groupedSplit = - grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, + grouper.generateGroupedSplits(jobConf, inputSplitArray, waves, availableSlots, inputName, false); bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 5fb6052..d3007ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -23,7 +23,6 @@ import java.util.Comparator; import java.util.List; -import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -56,6 +55,7 @@ import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -139,8 +139,9 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE LOG.info("Number of input splits: " + splits.length + ". " + availableSlots + " available slots, " + waves + " waves. Input format is: " + realInputFormatName); - Multimap groupedSplits = - splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots); + Multimap groupedSplits = splitGrouper.generateGroupedSplits(jobConf, + splits, waves, availableSlots); + // And finally return them in a flat array InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]); LOG.info("Number of grouped splits: " + flatSplits.length); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index c12da37..e819b27 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -40,8 +39,10 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.split.SplitSizeEstimator; import org.apache.hadoop.mapred.split.TezGroupedSplit; import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper; import org.apache.tez.dag.api.TaskLocationHint; @@ -72,7 +73,8 @@ * available slots with tasks */ public Multimap group(Configuration conf, - Multimap bucketSplitMultimap, int availableSlots, float waves) + Multimap bucketSplitMultimap, int availableSlots, float waves, + Map splitSizeEstimatorMap) throws IOException { // figure out how many tasks we want for each bucket @@ -86,14 +88,16 @@ // use the tez grouper to combine splits once per bucket for (int bucketId : bucketSplitMultimap.keySet()) { Collection inputSplitCollection = bucketSplitMultimap.get(bucketId); - + SplitSizeEstimator splitSizeEstimator = + splitSizeEstimatorMap == null ? null : splitSizeEstimatorMap.get(bucketId); InputSplit[] rawSplits = inputSplitCollection.toArray(new InputSplit[0]); InputSplit[] groupedSplits = tezGrouper.getGroupedSplits(conf, rawSplits, bucketTaskMap.get(bucketId), - HiveInputFormat.class.getName()); + HiveInputFormat.class.getName(), splitSizeEstimator); LOG.info("Original split size is " + rawSplits.length + " grouped split size is " - + groupedSplits.length + ", for bucket: " + bucketId); + + groupedSplits.length + ", for bucket: " + bucketId + " SplitSizeEstimator: " + + splitSizeEstimator.getClass().getSimpleName()); for (InputSplit inSplit : groupedSplits) { bucketGroupedSplitMultimap.put(bucketId, inSplit); @@ -153,46 +157,63 @@ /** Generate groups of splits, separated by schema evolution boundaries */ public Multimap generateGroupedSplits(JobConf jobConf, - Configuration conf, - InputSplit[] splits, - float waves, int availableSlots) + InputSplit[] splits, + float waves, int availableSlots) throws Exception { - return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true); + return generateGroupedSplits(jobConf, splits, waves, availableSlots, null, true); } /** Generate groups of splits, separated by schema evolution boundaries */ public Multimap generateGroupedSplits(JobConf jobConf, - Configuration conf, - InputSplit[] splits, - float waves, int availableSlots, - String inputName, - boolean groupAcrossFiles) throws - Exception { + InputSplit[] splits, + float waves, int availableSlots, + String inputName, + boolean groupAcrossFiles) throws Exception { MapWork work = populateMapWork(jobConf, inputName); Multimap bucketSplitMultiMap = ArrayListMultimap. create(); - + Map splitSizeEstimatorMap = new HashMap<>(); int i = 0; InputSplit prevSplit = null; + Class inputFormatClass = null; for (InputSplit s : splits) { // this is the bit where we make sure we don't group across partition // schema boundaries if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) { ++i; prevSplit = s; + inputFormatClass = getInputFormatClassFromSplit(s, work); + InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf); + if (inputFormat instanceof SplitSizeEstimator) { + LOG.info(inputFormat.getClass().getSimpleName() + " implements SplitSizeEstimator"); + splitSizeEstimatorMap.put(i, (SplitSizeEstimator) inputFormat); + } else { + LOG.info(inputFormat.getClass().getSimpleName() + " does not implement SplitSizeEstimator"); + splitSizeEstimatorMap.put(i, null); + } } + bucketSplitMultiMap.put(i, s); } LOG.info("# Src groups for split generation: " + (i + 1)); // group them into the chunks we want Multimap groupedSplits = - this.group(jobConf, bucketSplitMultiMap, availableSlots, waves); + this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, splitSizeEstimatorMap); return groupedSplits; } + private Class getInputFormatClassFromSplit(InputSplit s, MapWork work) + throws IOException { + Path path = ((FileSplit) s).getPath(); + PartitionDesc pd = + HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(), + path, cache); + return pd.getInputFileFormatClass(); + } + /** * get the size estimates for each bucket in tasks. This is used to make sure diff --git ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java new file mode 100644 index 0000000..8fd10e5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io; + +/** + * ColumnarSplit onterface when implemented should return the estimated size of columnar projections + * that will be read from the split. This information will be used by split grouper for better + * grouping based on the actual data read instead of the complete split length. + */ +public interface ColumnarSplit { + + /** + * Return the estimation size of the column projections that will be read from this split. + * + * @return - estimated column projection size that will be read in bytes + */ + long getColumnarProjectionSize(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 2548106..7d136e9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -50,7 +50,9 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.ColumnarSplit; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface; import org.apache.hadoop.hive.ql.io.RecordIdentifier; @@ -74,6 +76,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.split.SplitSizeEstimator; import org.apache.hadoop.util.StringUtils; import com.google.common.cache.Cache; @@ -105,9 +108,10 @@ */ public class OrcInputFormat implements InputFormat, InputFormatChecker, VectorizedInputFormatInterface, LlapWrappableInputFormatInterface, - AcidInputFormat, CombineHiveInputFormat.AvoidSplitCombination { + AcidInputFormat, CombineHiveInputFormat.AvoidSplitCombination, + SplitSizeEstimator { - static enum SplitStrategyKind{ + enum SplitStrategyKind{ HYBRID, BI, ETL @@ -1053,7 +1057,7 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, if (isDebugEnabled) { for (OrcSplit split : splits) { LOG.debug(split + " projected_columns_uncompressed_size: " - + split.getProjectedColumnsUncompressedSize()); + + split.getColumnarProjectionSize()); } } return splits; @@ -1066,6 +1070,29 @@ private static void cancelFutures(List> futures) { } @Override + public long getEstimatedSize(InputSplit inputSplit) throws IOException { + long colProjSize = inputSplit.getLength(); + + if (inputSplit instanceof ColumnarSplit) { + colProjSize = ((ColumnarSplit) inputSplit).getColumnarProjectionSize(); + if (isDebugEnabled) { + LOG.debug("Estimated column projection size: " + colProjSize); + } + return colProjSize; + } else if (inputSplit instanceof HiveInputFormat.HiveInputSplit) { + InputSplit innerInputSplit = ((HiveInputFormat.HiveInputSplit) inputSplit).getInputSplit(); + if (innerInputSplit instanceof ColumnarSplit) { + colProjSize = ((ColumnarSplit) innerInputSplit).getColumnarProjectionSize(); + } + if (isDebugEnabled) { + LOG.debug("Estimated column projection size: " + colProjSize); + } + } + + return colProjSize; + } + + @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 1263346..836c113 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.ColumnarSplit; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; @@ -38,7 +39,7 @@ * OrcFileSplit. Holds file meta info * */ -public class OrcSplit extends FileSplit { +public class OrcSplit extends FileSplit implements ColumnarSplit { private static final Log LOG = LogFactory.getLog(OrcSplit.class); private FileMetaInfo fileMetaInfo; @@ -55,7 +56,7 @@ static final int ORIGINAL_FLAG = 2; static final int FOOTER_FLAG = 1; - protected OrcSplit(){ + protected OrcSplit() { //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it. //This constructor is used to create the object and then call readFields() // so just pass nulls to this super constructor. @@ -74,7 +75,7 @@ public OrcSplit(Path path, Long fileId, long offset, long length, String[] hosts this.isOriginal = isOriginal; this.hasBase = hasBase; this.deltas.addAll(deltas); - this.projColsUncompressedSize = projectedDataSize; + this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize; } @Override @@ -173,8 +174,8 @@ public Long getFileId() { return fileId; } - public long getProjectedColumnsUncompressedSize() { + @Override + public long getColumnarProjectionSize() { return projColsUncompressedSize; } - }