diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 3ad76c5..2671137 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -35,7 +35,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.serializer.SerializationFactory; @@ -78,28 +77,17 @@ public class CustomPartitionVertex implements VertexManagerPlugin { private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName()); - public static final String GROUP_SPLITS = "hive.enable.custom.grouped.splits"; - VertexManagerPluginContext context; - private Multimap bucketToTaskMap = HashMultimap.create(); - private Multimap bucketToInitialSplitMap = - ArrayListMultimap.create(); - private RootInputConfigureVertexTasksEvent configureVertexTaskEvent; private List dataInformationEvents; - private Map> pathFileSplitsMap = new TreeMap>(); private int numBuckets = -1; private Configuration conf = null; private boolean rootVertexInitialized = false; - Multimap bucketToGroupedSplitMap; + private SplitGrouper grouper = new SplitGrouper(); - - private Map bucketToNumTaskMap = new HashMap(); - - public CustomPartitionVertex() { - } + public CustomPartitionVertex() {} @Override public void initialize(VertexManagerPluginContext context) { @@ -154,20 +142,22 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr * TezGroupedSplits. */ - if (conf.getBoolean(GROUP_SPLITS, true)) { - // Changing the InputFormat - so that the correct one is initialized in MRInput. - this.conf.set("mapred.input.format.class", TezGroupedSplitsInputFormat.class.getName()); - MRInputUserPayloadProto updatedPayload = MRInputUserPayloadProto - .newBuilder(protoPayload) - .setConfigurationBytes(MRHelpers.createByteStringFromConf(conf)) - .build(); - inputDescriptor.setUserPayload(updatedPayload.toByteArray()); - } + // This assumes that Grouping will always be used. + // Changing the InputFormat - so that the correct one is initialized in MRInput. + this.conf.set("mapred.input.format.class", TezGroupedSplitsInputFormat.class.getName()); + MRInputUserPayloadProto updatedPayload = MRInputUserPayloadProto + .newBuilder(protoPayload) + .setConfigurationBytes(MRHelpers.createByteStringFromConf(conf)) + .build(); + inputDescriptor.setUserPayload(updatedPayload.toByteArray()); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); } - boolean dataInformationEventSeen = false; + + boolean dataInformationEventSeen = false; + Map> pathFileSplitsMap = new TreeMap>(); + for (Event event : events) { if (event instanceof RootInputConfigureVertexTasksEvent) { // No tasks should have been started yet. Checked by initial state check. @@ -204,17 +194,30 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr } } - setBucketNumForPath(pathFileSplitsMap); + Multimap bucketToInitialSplitMap = getBucketSplitMapForPath(pathFileSplitsMap); + try { - groupSplits(); - processAllEvents(inputName); + int totalResource = context.getTotalAVailableResource().getMemory(); + int taskResource = context.getVertexTaskResource().getMemory(); + float waves = conf.getFloat( + TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, + TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT); + + int availableSlots = (int) (totalResource / taskResource); + + Multimap bucketToGroupedSplitMap + = grouper.group(conf, bucketToInitialSplitMap, availableSlots, waves); + + processAllEvents(inputName, bucketToGroupedSplitMap); } catch (IOException e) { throw new RuntimeException(e); } } - private void processAllEvents(String inputName) throws IOException { + private void processAllEvents(String inputName, Multimap bucketToGroupedSplitMap) + throws IOException { + Multimap bucketToTaskMap = HashMultimap.create(); List finalSplits = Lists.newLinkedList(); int taskCount = 0; for (Entry> entry : bucketToGroupedSplitMap.asMap().entrySet()) { @@ -261,7 +264,7 @@ private void processAllEvents(String inputName) throws IOException { // Replace the Edge Managers context.setVertexParallelism( taskCount, - new VertexLocationHint(createTaskLocationHintsFromSplits(finalSplits + new VertexLocationHint(grouper.createTaskLocationHints(finalSplits .toArray(new InputSplit[finalSplits.size()]))), emMap); // Set the actual events for the tasks. @@ -301,9 +304,15 @@ private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) /* * This method generates the map of bucket to file splits. */ - private void setBucketNumForPath(Map> pathFileSplitsMap) { + private Multimap getBucketSplitMapForPath( + Map> pathFileSplitsMap) { + int bucketNum = 0; int fsCount = 0; + + Multimap bucketToInitialSplitMap = + ArrayListMultimap.create(); + for (Map.Entry> entry : pathFileSplitsMap.entrySet()) { int bucketId = bucketNum % numBuckets; for (FileSplit fsplit : entry.getValue()) { @@ -315,92 +324,7 @@ private void setBucketNumForPath(Map> pathFileSplitsMap) { LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: " + pathFileSplitsMap.size()); - } - - private void groupSplits () throws IOException { - bucketToGroupedSplitMap = - ArrayListMultimap.create(bucketToInitialSplitMap); - if (conf.getBoolean(GROUP_SPLITS, true)) { - estimateBucketSizes(); - Map> bucketSplitMap = bucketToInitialSplitMap.asMap(); - for (int bucketId : bucketSplitMap.keySet()) { - CollectioninputSplitCollection = bucketSplitMap.get(bucketId); - TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper(); - - InputSplit[] groupedSplits = grouper.getGroupedSplits(conf, - inputSplitCollection.toArray(new InputSplit[0]), bucketToNumTaskMap.get(bucketId), - HiveInputFormat.class.getName()); - LOG.info("Original split size is " + - inputSplitCollection.toArray(new InputSplit[0]).length + - " grouped split size is " + groupedSplits.length); - bucketToGroupedSplitMap.removeAll(bucketId); - for (InputSplit inSplit : groupedSplits) { - bucketToGroupedSplitMap.put(bucketId, inSplit); - } - } - } - } - - private void estimateBucketSizes() { - MapbucketSizeMap = new HashMap(); - Map> bucketSplitMap = bucketToInitialSplitMap.asMap(); - long totalSize = 0; - for (int bucketId : bucketSplitMap.keySet()) { - Long size = 0L; - CollectioninputSplitCollection = bucketSplitMap.get(bucketId); - Iterator iter = inputSplitCollection.iterator(); - while (iter.hasNext()) { - FileSplit fsplit = (FileSplit)iter.next(); - size += fsplit.getLength(); - totalSize += fsplit.getLength(); - } - bucketSizeMap.put(bucketId, size); - } - - int totalResource = context.getTotalAVailableResource().getMemory(); - int taskResource = context.getVertexTaskResource().getMemory(); - float waves = conf.getFloat( - TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, - TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT); - - int numTasks = (int)((totalResource*waves)/taskResource); - LOG.info("Total resource: " + totalResource + " Task Resource: " + taskResource - + " waves: " + waves + " total size of splits: " + totalSize + - " total number of tasks: " + numTasks); - - for (int bucketId : bucketSizeMap.keySet()) { - int numEstimatedTasks = 0; - if (totalSize != 0) { - numEstimatedTasks = (int)(numTasks * bucketSizeMap.get(bucketId) / totalSize); - } - LOG.info("Estimated number of tasks: " + numEstimatedTasks + " for bucket " + bucketId); - if (numEstimatedTasks == 0) { - numEstimatedTasks = 1; - } - bucketToNumTaskMap.put(bucketId, numEstimatedTasks); - } - } - - private static List createTaskLocationHintsFromSplits( - org.apache.hadoop.mapred.InputSplit[] oldFormatSplits) { - Iterable iterable = Iterables.transform(Arrays.asList(oldFormatSplits), - new Function() { - @Override - public TaskLocationHint apply(org.apache.hadoop.mapred.InputSplit input) { - try { - if (input.getLocations() != null) { - return new TaskLocationHint(new HashSet(Arrays.asList(input.getLocations())), - null); - } else { - LOG.info("NULL Location: returning an empty location hint"); - return new TaskLocationHint(null,null); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - - return Lists.newArrayList(iterable); + + return bucketToInitialSplitMap; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index f578534..f27bde8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -23,7 +23,6 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -51,14 +50,12 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.exec.tez.tools.TezMergedLogicalInput; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; -import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; @@ -97,10 +94,9 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; -import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; -import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; +import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -415,7 +411,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, Vertex map = null; // use tez to combine splits - boolean useTezGroupedSplits = true; + boolean useTezGroupedSplits = false; int numTasks = -1; Class amSplitGeneratorClass = null; @@ -431,44 +427,8 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, } } } - - // we cannot currently allow grouping of splits where each split is a different input format - // or has different deserializers similar to the checks in CombineHiveInputFormat. We do not - // need the check for the opList because we will not process different opLists at this time. - // Long term fix would be to have a custom input format - // logic that groups only the splits that share the same input format - Class previousInputFormatClass = null; - Class previousDeserializerClass = null; - for (String path : mapWork.getPathToPartitionInfo().keySet()) { - PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path); - Class currentDeserializerClass = pd.getDeserializer(conf).getClass(); - Class currentInputFormatClass = pd.getInputFileFormatClass(); - if (previousInputFormatClass == null) { - previousInputFormatClass = currentInputFormatClass; - } - if (previousDeserializerClass == null) { - previousDeserializerClass = currentDeserializerClass; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Current input format class = "+currentInputFormatClass+", previous input format class = " - + previousInputFormatClass + ", verifying " + " current deserializer class = " - + currentDeserializerClass + " previous deserializer class = " + previousDeserializerClass); - } - if ((currentInputFormatClass != previousInputFormatClass) || - (currentDeserializerClass != previousDeserializerClass)) { - useTezGroupedSplits = false; - break; - } - } if (vertexHasCustomInput) { - // if it is the case of different input formats for different partitions, we cannot group - // in the custom vertex for now. Long term, this can be improved to group the buckets that - // share the same input format. - if (useTezGroupedSplits == false) { - conf.setBoolean(CustomPartitionVertex.GROUP_SPLITS, false); - } else { - conf.setBoolean(CustomPartitionVertex.GROUP_SPLITS, true); - } + useTezGroupedSplits = false; // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat // here would cause pre-mature grouping which would be incorrect. inputFormatClass = HiveInputFormat.class; @@ -476,23 +436,19 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, // mapreduce.tez.input.initializer.serialize.event.payload should be set to false when using // this plug-in to avoid getting a serialized event at run-time. conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false); - } else if (useTezGroupedSplits) { + } else { // we'll set up tez to combine spits for us iff the input format // is HiveInputFormat if (inputFormatClass == HiveInputFormat.class) { + useTezGroupedSplits = true; conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class); - } else { - conf.setClass("mapred.input.format.class", CombineHiveInputFormat.class, InputFormat.class); - useTezGroupedSplits = false; } - } else { - conf.setClass("mapred.input.format.class", CombineHiveInputFormat.class, InputFormat.class); } if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) { // if we're generating the splits in the AM, we just need to set // the correct plugin. - amSplitGeneratorClass = MRInputAMSplitGenerator.class; + amSplitGeneratorClass = HiveSplitGenerator.class; } else { // client side split generation means we have to compute them now inputSplitInfo = MRHelpers.generateInputSplits(conf, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java new file mode 100644 index 0000000..3f76c95 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.mapred.split.TezGroupedSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TezRootInputInitializer; +import org.apache.tez.runtime.api.TezRootInputInitializerContext; +import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent; +import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; + +import com.google.common.collect.Lists; + +/** + * This class is used to generate splits inside the AM on the cluster. + * It optionally groups together splits based on available headroom as + * well as making sure that splits from different partitions are only + * grouped if they are of the same schema, format and serde + */ +public class HiveSplitGenerator implements TezRootInputInitializer { + + private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class); + + private SplitGrouper grouper = new SplitGrouper(); + + @Override + public List initialize(TezRootInputInitializerContext rootInputContext) + throws Exception { + + MRInputUserPayloadProto userPayloadProto = MRHelpers + .parseMRInputPayload(rootInputContext.getUserPayload()); + + Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto + .getConfigurationBytes()); + + boolean sendSerializedEvents = conf.getBoolean( + "mapreduce.tez.input.initializer.serialize.event.payload", true); + + // Read all credentials into the credentials instance stored in JobConf. + JobConf jobConf = new JobConf(conf); + jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); + + InputSplitInfoMem inputSplitInfo = null; + String realInputFormatName = userPayloadProto.getInputFormatName(); + if (realInputFormatName != null && !realInputFormatName.isEmpty()) { + inputSplitInfo = generateGroupedSplits(rootInputContext, jobConf, conf, realInputFormatName); + } else { + inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf); + } + + return createEventList(sendSerializedEvents, inputSplitInfo); + } + + private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext context, + JobConf jobConf, Configuration conf, String realInputFormatName) throws Exception { + + int totalResource = context.getTotalAvailableResource().getMemory(); + int taskResource = context.getVertexTaskResource().getMemory(); + float waves = conf.getFloat( + TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, + TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT); + + MapWork work = Utilities.getMapWork(jobConf); + + InputSplit[] splits = grouper.group(work, jobConf, realInputFormatName, + (int) (totalResource / taskResource), waves); + + List locationHints = grouper.createTaskLocationHints(splits); + + Utilities.clearWork(jobConf); + + return new InputSplitInfoMem(splits, locationHints, splits.length, null, jobConf); + } + + private List createEventList(boolean sendSerializedEvents, + InputSplitInfoMem inputSplitInfo) { + + List events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1); + + RootInputConfigureVertexTasksEvent configureVertexEvent + = new RootInputConfigureVertexTasksEvent( + inputSplitInfo.getNumTasks(), inputSplitInfo.getTaskLocationHints()); + events.add(configureVertexEvent); + + if (sendSerializedEvents) { + MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto(); + int count = 0; + for (MRSplitProto mrSplit : splitsProto.getSplitsList()) { + RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, + mrSplit.toByteArray()); + events.add(diEvent); + } + } else { + int count = 0; + for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) { + RootInputDataInformationEvent diEvent + = new RootInputDataInformationEvent(count++, split); + events.add(diEvent); + } + } + return events; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java new file mode 100644 index 0000000..ece7f26 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.split.TezGroupedSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper; +import org.apache.tez.mapreduce.hadoop.MRHelpers; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + +/** + * SplitGrouper is used to combine splits based on headroom and locality. It also enforces + * restrictions around schema, file format and bucketing. + */ +public class SplitGrouper { + + private static final Log LOG = LogFactory.getLog(SplitGrouper.class); + + private final TezMapredSplitsGrouper tezGrouper = new TezMapredSplitsGrouper(); + + public InputSplit[] group(MapWork work, JobConf jobConf, String realInputFormatName, + int availableSlots, float waves) throws Exception { + + LOG.info("Grouping splits for " + work.getName() + ". " + availableSlots + + " available slots, " + waves + " waves. Input format is: "+realInputFormatName); + + //Need to instanciate the realInputFormat + InputFormat inputFormat; + + try { + inputFormat = (InputFormat) ReflectionUtils.newInstance( + Class.forName(realInputFormatName), jobConf); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + + // Create the ungrouped splits + InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); + LOG.info("Number of input splits: " + splits.length); + + Multimap bucketSplitMultiMap + = ArrayListMultimap.create(); + + Class previousInputFormatClass = null; + String previousDeserializerClass = null; + Map, Map> cache + = new HashMap, Map>(); + + int i = 0; + + for (InputSplit s: splits) { + // this is the bit where we make sure we don't group across partition schema boundaries + + Path path = ((FileSplit)s).getPath(); + + PartitionDesc pd = HiveFileFormatUtils.getPartitionDescFromPathRecursively( + work.getPathToPartitionInfo(), path, cache); + + String currentDeserializerClass = pd.getDeserializerClassName(); + Class currentInputFormatClass = pd.getInputFileFormatClass(); + + if ((currentInputFormatClass != previousInputFormatClass) || + (!currentDeserializerClass.equals(previousDeserializerClass))) { + ++i; + } + + previousInputFormatClass = currentInputFormatClass; + previousDeserializerClass = currentDeserializerClass; + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding split " + path + " to bucket " + i); + } + bucketSplitMultiMap.put(i, s); + } + + // group them into the chunks we want + Multimap groupedSplits = group(jobConf, bucketSplitMultiMap, + availableSlots, waves); + + // And finally return them in a flat array + InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]); + LOG.info("Number of grouped splits: " + flatSplits.length); + + return flatSplits; + } + + /** group splits for each bucket separately - while evenly filling all the + available slots with tasks */ + public Multimap group(Configuration conf, + Multimap bucketSplitMultimap, + int availableSlots, float waves) throws IOException { + + // figure out how many tasks we want for each bucket + Map bucketTaskMap = estimateBucketSizes(availableSlots, waves, + bucketSplitMultimap.asMap()); + + // allocate map bucket id to grouped splits + Multimap bucketGroupedSplitMultimap = + ArrayListMultimap.create(); + + // use the tez grouper to combine splits once per bucket + for (int bucketId : bucketSplitMultimap.keySet()) { + Collection inputSplitCollection = bucketSplitMultimap.get(bucketId); + + InputSplit[] rawSplits = inputSplitCollection.toArray(new InputSplit[0]); + InputSplit[] groupedSplits = tezGrouper.getGroupedSplits(conf, + rawSplits, bucketTaskMap.get(bucketId), + HiveInputFormat.class.getName()); + + LOG.info("Original split size is " + + rawSplits.length + " grouped split size is " + groupedSplits.length + + ", for bucket: " + bucketId); + + for (InputSplit inSplit : groupedSplits) { + bucketGroupedSplitMultimap.put(bucketId, inSplit); + } + } + + return bucketGroupedSplitMultimap; + } + + /** get the size estimates for each bucket in tasks. This is used to make sure we allocate + the headroom evenly */ + private Map estimateBucketSizes(int availableSlots, float waves, + Map> bucketSplitMap) { + + // mapping of bucket id to size of all splits in bucket in bytes + Map bucketSizeMap = new HashMap(); + + // mapping of bucket id to number of required tasks to run + Map bucketTaskMap = new HashMap(); + + // compute the total size per bucket + long totalSize = 0; + for (int bucketId: bucketSplitMap.keySet()) { + long size = 0; + for (InputSplit s: bucketSplitMap.get(bucketId)) { + FileSplit fsplit = (FileSplit) s; + size += fsplit.getLength(); + totalSize += fsplit.getLength(); + } + bucketSizeMap.put(bucketId, size); + } + + // compute the number of tasks + for (int bucketId : bucketSizeMap.keySet()) { + int numEstimatedTasks = 0; + if (totalSize != 0) { + // availableSlots * waves => desired slots to fill + // sizePerBucket/totalSize => weight for particular bucket. weights add up to 1. + numEstimatedTasks = (int) (availableSlots * waves * + bucketSizeMap.get(bucketId) / totalSize); + } + + LOG.info("Estimated number of tasks: " + numEstimatedTasks + " for bucket " + bucketId); + if (numEstimatedTasks == 0) { + numEstimatedTasks = 1; + } + bucketTaskMap.put(bucketId, numEstimatedTasks); + } + + return bucketTaskMap; + } + + public List createTaskLocationHints(InputSplit[] splits) throws IOException { + + List locationHints = Lists.newArrayListWithCapacity(splits.length); + + for (InputSplit split : splits) { + String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit)split).getRack() : null; + if (rack == null) { + if (split.getLocations() != null) { + locationHints.add(new TaskLocationHint( + new HashSet(Arrays.asList(split.getLocations())), null)); + } else { + locationHints.add(new TaskLocationHint(null, null)); + } + } else { + locationHints.add(new TaskLocationHint(null, Collections.singleton(rack))); + } + } + + return locationHints; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index 08a957a..1149bda 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -118,16 +118,23 @@ public void setPartSpec(final LinkedHashMap partSpec) { return inputFileFormatClass; } - /** - * Return a deserializer object corresponding to the partitionDesc. - */ - public Deserializer getDeserializer(Configuration conf) throws Exception { + public String getDeserializerClassName() { Properties schema = getProperties(); String clazzName = schema.getProperty(serdeConstants.SERIALIZATION_LIB); if (clazzName == null) { throw new IllegalStateException("Property " + serdeConstants.SERIALIZATION_LIB + " cannot be null"); } + + return clazzName; + } + + /** + * Return a deserializer object corresponding to the partitionDesc. + */ + public Deserializer getDeserializer(Configuration conf) throws Exception { + Properties schema = getProperties(); + String clazzName = getDeserializerClassName(); Deserializer deserializer = ReflectionUtils.newInstance(conf.getClassByName(clazzName) .asSubclass(Deserializer.class), conf); SerDeUtils.initializeSerDe(deserializer, conf, getTableDesc().getProperties(), schema);