Index: shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java =================================================================== --- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (revision 951194) +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (working copy) @@ -19,6 +19,8 @@ import java.io.IOException; import java.lang.reflect.Constructor; +import java.io.DataOutput; +import java.io.DataInput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -43,6 +45,7 @@ import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.io.WritableComparable; /** * Implemention of shims against Hadoop 0.20.0. @@ -138,6 +141,35 @@ } } + public static class CombineHiveKey implements WritableComparable { + Object key; + + public CombineHiveKey(Object key) { + this.key = key; + } + + public Object getKey() { + return key; + } + + public void setKey(Object key) { + this.key = key; + } + + public void write(DataOutput out) throws IOException { + throw new IOException("Method not supported"); + } + + public void readFields(DataInput in) throws IOException { + throw new IOException("Method not supported"); + } + + public int compareTo(Object w) { + assert false; + return 0; + } + } + /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy * of org.apache.hadoop.mapred.lib.CombineFileRecordReader @@ -164,8 +196,8 @@ public boolean next(K key, V value) throws IOException { - while ((curReader == null) || !curReader.next(key, value)) { - if (!initNextRecordReader()) { + while ((curReader == null) || !curReader.next((K)((CombineHiveKey)key).getKey(), value)) { + if (!initNextRecordReader(key, value)) { return false; } } @@ -173,7 +205,8 @@ } public K createKey() { - return curReader.createKey(); + K newKey = curReader.createKey(); + return (K)(new CombineHiveKey(newKey)); } public V createValue() { @@ -224,13 +257,13 @@ throw new RuntimeException(rrClass.getName() + " does not have valid constructor", e); } - initNextRecordReader(); + initNextRecordReader(null, null); } /** * Get the record reader for the next chunk in this CombineFileSplit. */ - protected boolean initNextRecordReader() throws IOException { + protected boolean initNextRecordReader(K key, V value) throws IOException { if (curReader != null) { curReader.close(); @@ -250,6 +283,12 @@ curReader = rrConstructor.newInstance(new Object[] {split, jc, reporter, Integer.valueOf(idx)}); + // change the key if need be + if (key != null) { + K newKey = curReader.createKey(); + ((CombineHiveKey)key).setKey(newKey); + } + // setup some helper config variables. jc.set("map.input.file", split.getPath(idx).toString()); jc.setLong("map.input.start", split.getOffset(idx));