Index: src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (revision 1293737) +++ src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (working copy) @@ -36,10 +36,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.LazyHCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatFieldSchema; /** The HCat wrapper for the underlying RecordReader, * this ensures that the initialize on @@ -64,6 +67,9 @@ private Map partCols; + private HCatSchema outputSchema = null; + private HCatSchema tableSchema = null; + /** * Instantiates a new hcat record reader. * @param baseRecordReader the base record reader @@ -89,6 +95,10 @@ TaskAttemptContext taskContext) throws IOException, InterruptedException { org.apache.hadoop.mapred.InputSplit baseSplit; + + // Pull the output schema out of the TaskAttemptContext + outputSchema = (HCatSchema)HCatUtil.deserialize( + taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA)); if( split instanceof HCatSplit ) { baseSplit = ((HCatSplit) split).getBaseSplit(); @@ -96,6 +106,10 @@ throw new IOException("Not a HCatSplit"); } + // Pull the table schema out of the Split info + // TODO This should be passed in teh TaskAttemptContext instead + tableSchema = ((HCatSplit)split).getTableSchema(); + Properties properties = new Properties(); for (Map.Entryparam : ((HCatSplit)split).getPartitionInfo() @@ -122,14 +136,32 @@ HCatRecord r; try { - r = new DefaultHCatRecord((new LazyHCatRecord( + /* + return new DefaultHCatRecord((new LazyHCatRecord( serde.deserialize(currentValue), serde.getObjectInspector(), partCols)).getAll()); + */ + r = new LazyHCatRecord(serde.deserialize(currentValue), + serde.getObjectInspector(), partCols); + if (outputSchema == null) { + // there's no projection being done + return new DefaultHCatRecord(r.getAll()); + } else { + // For each field in the outputSchema, do the mapping + DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size()); + for (int i = 0; i < outputSchema.size(); i++) { + // Figure out the field to read + HCatFieldSchema ofs = outputSchema.get(i); + dr.set(i, r.get(ofs.getName(), tableSchema)); + } + return dr; + } + + } catch (Exception e) { throw new IOException("Failed to create HCatRecord " + e); } - return r; } /* (non-Javadoc)