diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index e0d838c..e1b8321 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -58,6 +59,7 @@ * Just pump the records through the query plan. */ public class ReduceRecordProcessor extends RecordProcessor{ + private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); @@ -169,20 +171,41 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in @Override void run() throws IOException{ + List shuffleInputs = getShuffleInputs(inputs); + KeyValuesReader kvsReader; + + if(shuffleInputs.size() == 1){ + //no merging of inputs required + kvsReader = shuffleInputs.get(0).getReader(); + }else { + //get a sort merged input + kvsReader = new InputMerger(shuffleInputs); + } - //TODO - changes this for joins - ShuffledMergedInput in = (ShuffledMergedInput)inputs.values().iterator().next(); - KeyValuesReader reader = in.getReader(); - - //process records until done - while(reader.next()){ - Object key = reader.getCurrentKey(); - Iterable values = reader.getCurrentValues(); + while(kvsReader.next()){ + Object key = kvsReader.getCurrentKey(); + Iterable values = kvsReader.getCurrentValues(); boolean needMore = processKeyValues(key, values); if(!needMore){ break; } } + + } + + /** + * Get the inputs that should be streamed through reduce plan. + * @param inputs + * @return + */ + private List getShuffleInputs(Map inputs) { + //the reduce plan inputs have tags, add all inputs that have tags + Map tag2input = redWork.getTagToInput(); + ArrayList shuffleInputs = new ArrayList(); + for(String inpStr : tag2input.values()){ + shuffleInputs.add((ShuffledMergedInput)inputs.get(inpStr)); + } + return shuffleInputs; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java new file mode 100644 index 0000000..efc7f36 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.library.api.KeyValuesReader; +import org.apache.tez.runtime.library.input.ShuffledMergedInput; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in ShuffledMergedInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class InputMerger implements KeyValuesReader { + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private PriorityQueue pQueue = null; + private KeyValuesReader nextKVReader = null; + + public InputMerger(List shuffleInputs) throws IOException { + //get KeyValuesReaders from the ShuffledMergedInput and add them to priority queue + int initialCapacity = shuffleInputs.size(); + pQueue = new PriorityQueue(initialCapacity, new KVReaderComparator()); + for(ShuffledMergedInput input : shuffleInputs){ + addToQueue(input.getReader()); + } + } + + /** + * Add KeyValuesReader to queue if it has more key-values + * @param kvsReadr + * @throws IOException + */ + private void addToQueue(KeyValuesReader kvsReadr) throws IOException{ + if(kvsReadr.next()){ + pQueue.add(kvsReadr); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + public Iterable getCurrentValues() throws IOException { + return nextKVReader.getCurrentValues(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator { + + @Override + public int compare(KeyValuesReader kvReadr1, KeyValuesReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentKey(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentKey(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } + + +} \ No newline at end of file