diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 3ec6a80..297ce44 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -32,6 +32,7 @@ import java.util.TreeMap; import java.util.TreeSet; +import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -109,7 +110,6 @@ public int compare(InputSplit inp1, InputSplit inp2) { VertexManagerPluginContext context; private InputConfigureVertexTasksEvent configureVertexTaskEvent; - private List dataInformationEvents; private int numBuckets = -1; private Configuration conf = null; private final SplitGrouper grouper = new SplitGrouper(); @@ -223,8 +223,6 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr configureVertexTaskEvent = cEvent; LOG.info("Configure task for input name: " + inputName + " num tasks: " + configureVertexTaskEvent.getNumTasks()); - dataInformationEvents = - Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks()); } if (event instanceof InputUpdatePayloadEvent) { // this event can never occur. If it does, fail. @@ -232,7 +230,6 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr } else if (event instanceof InputDataInformationEvent) { dataInformationEventSeen = true; InputDataInformationEvent diEvent = (InputDataInformationEvent) event; - dataInformationEvents.add(diEvent); FileSplit fileSplit; try { fileSplit = getFileSplitFromEvent(diEvent); @@ -341,16 +338,26 @@ private void processAllSideEvents(String inputName, + " multi mr inputs. " + bucketToTaskMap); Integer[] numSplitsForTask = new Integer[taskCount]; + + Multimap bucketToSerializedSplitMap = LinkedListMultimap.create(); + + // Create the list of serialized splits for each bucket. for (Entry> entry : bucketToGroupedSplitMap.asMap().entrySet()) { + for (InputSplit split : entry.getValue()) { + MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split); + ByteBuffer bs = serializedSplit.toByteString().asReadOnlyByteBuffer(); + bucketToSerializedSplitMap.put(entry.getKey(), bs); + } + } + + for (Entry> entry : bucketToSerializedSplitMap.asMap().entrySet()) { Collection destTasks = bucketToTaskMap.get(entry.getKey()); for (Integer task : destTasks) { int count = 0; - for (InputSplit split : entry.getValue()) { + for (ByteBuffer buf : entry.getValue()) { count++; - MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split); InputDataInformationEvent diEvent = - InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit - .toByteString().asReadOnlyByteBuffer()); + InputDataInformationEvent.createWithSerializedPayload(count, buf); diEvent.setTargetIndex(task); taskEvents.add(diEvent); } @@ -472,6 +479,7 @@ private void processAllEvents(String inputName, context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + finalSplits.clear(); } UserPayload getBytePayload(Multimap routingTable) throws IOException {