Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java (revision 634c62ba8e6fbfbe664ef083a283701f6c737ace) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java (revision ) @@ -24,7 +24,6 @@ import java.lang.reflect.Constructor; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; @@ -44,11 +43,6 @@ /** The split returned by the underlying InputFormat split. */ private org.apache.hadoop.mapred.InputSplit baseMapRedSplit; - /** The schema for the HCatTable */ - private HCatSchema tableSchema; - - private HiveConf hiveConf; - /** * Instantiates a new hcat split. */ @@ -60,16 +54,13 @@ * * @param partitionInfo the partition info * @param baseMapRedSplit the base mapred split - * @param tableSchema the table level schema */ public HCatSplit(PartInfo partitionInfo, - org.apache.hadoop.mapred.InputSplit baseMapRedSplit, - HCatSchema tableSchema) { + org.apache.hadoop.mapred.InputSplit baseMapRedSplit) { this.partitionInfo = partitionInfo; // dataSchema can be obtained from partitionInfo.getPartitionSchema() this.baseMapRedSplit = baseMapRedSplit; - this.tableSchema = tableSchema; } /** @@ -101,7 +92,8 @@ * @return the table schema */ public HCatSchema getTableSchema() { - return this.tableSchema; + assert this.partitionInfo.getTableInfo() != null : "TableInfo should have been set at this point."; + return this.partitionInfo.getTableInfo().getAllColumns(); } /* (non-Javadoc) @@ -159,9 +151,6 @@ } catch (Exception e) { throw new IOException("Exception from " + baseSplitClassName, e); } - - String tableSchemaString = WritableUtils.readString(input); - tableSchema = (HCatSchema) HCatUtil.deserialize(tableSchemaString); } /* (non-Javadoc) @@ -178,10 +167,6 @@ Writable baseSplitWritable = (Writable) baseMapRedSplit; //write baseSplit into output baseSplitWritable.write(output); - - //write the table schema into output - String tableSchemaString = HCatUtil.serialize(tableSchema); - WritableUtils.writeString(output, tableSchemaString); } } Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java (revision 634c62ba8e6fbfbe664ef083a283701f6c737ace) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java (revision ) @@ -182,5 +182,10 @@ ObjectInputStream partInfoReader = new ObjectInputStream(new InflaterInputStream(ois)); partitions = (List)partInfoReader.readObject(); + for (PartInfo partInfo : partitions) { + if (partInfo.getTableInfo() == null) { + partInfo.setTableInfo(this.tableInfo); + } + } } } Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java (revision 634c62ba8e6fbfbe664ef083a283701f6c737ace) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java (revision ) @@ -21,10 +21,13 @@ import java.io.IOException; import java.io.Serializable; +import java.util.LinkedList; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; /** @@ -45,6 +48,7 @@ /** The table schema. */ private final HCatSchema dataColumns; private final HCatSchema partitionColumns; + private final HCatSchema allColumns; /** The table being written to */ private final Table table; @@ -79,6 +83,10 @@ this.table = table; this.storerInfo = storerInfo; this.partitionColumns = partitionColumns; + LinkedList allColumns = Lists.newLinkedList(); + allColumns.addAll(dataColumns.getFields()); + allColumns.addAll(partitionColumns.getFields()); + this.allColumns = new HCatSchema(allColumns); } /** @@ -109,6 +117,13 @@ */ public HCatSchema getPartitionColumns() { return partitionColumns; + } + + /** + * @return All columns for the table (including data and partition columns). + */ + public HCatSchema getAllColumns() { + return allColumns; } /** Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java (revision 634c62ba8e6fbfbe664ef083a283701f6c737ace) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java (revision ) @@ -158,9 +158,7 @@ inputFormat.getSplits(jobConf, desiredNumSplits); for (org.apache.hadoop.mapred.InputSplit split : baseSplits) { - splits.add(new HCatSplit( - partitionInfo, - split, allCols)); + splits.add(new HCatSplit(partitionInfo, split)); } } @@ -185,6 +183,12 @@ HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); PartInfo partitionInfo = hcatSplit.getPartitionInfo(); + // Ensure PartInfo's TableInfo is initialized. + if (partitionInfo.getTableInfo() == null) { + partitionInfo.setTableInfo(((InputJobInfo)HCatUtil.deserialize( + taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO) + )).getTableInfo()); + } JobContext jobContext = taskContext; Configuration conf = jobContext.getConfiguration(); Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java (revision 634c62ba8e6fbfbe664ef083a283701f6c737ace) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java (revision ) @@ -18,6 +18,8 @@ */ package org.apache.hive.hcatalog.mapreduce; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Map; import java.util.Properties; @@ -31,14 +33,14 @@ /** The serialization version */ private static final long serialVersionUID = 1L; - /** The partition schema. */ - private final HCatSchema partitionSchema; + /** The partition data-schema. */ + private HCatSchema partitionSchema; /** The information about which input storage handler to use */ - private final String storageHandlerClassName; - private final String inputFormatClassName; - private final String outputFormatClassName; - private final String serdeClassName; + private String storageHandlerClassName; + private String inputFormatClassName; + private String outputFormatClassName; + private String serdeClassName; /** HCat-specific properties set at the partition */ private final Properties hcatProperties; @@ -52,8 +54,11 @@ /** Job properties associated with this parition */ Map jobProperties; - /** the table info associated with this partition */ - HCatTableInfo tableInfo; + /** + * The table info associated with this partition. + * Not serialized per PartInfo instance. Constant, per table. + */ + transient HCatTableInfo tableInfo; /** * Instantiates a new hcat partition info. @@ -161,5 +166,64 @@ */ public HCatTableInfo getTableInfo() { return tableInfo; + } + + void setTableInfo(HCatTableInfo thatTableInfo) { + this.tableInfo = thatTableInfo; + + if (partitionSchema == null) { + partitionSchema = tableInfo.getDataColumns(); + } + + if (storageHandlerClassName == null) { + storageHandlerClassName = tableInfo.getStorerInfo().getStorageHandlerClass(); + } + + if (inputFormatClassName == null) { + inputFormatClassName = tableInfo.getStorerInfo().getIfClass(); + } + + if (outputFormatClassName == null) { + outputFormatClassName = tableInfo.getStorerInfo().getOfClass(); + } + + if (serdeClassName == null) { + serdeClassName = tableInfo.getStorerInfo().getSerdeClass(); + } + } + + /** + * Serialization method. Suppresses serialization of redundant information that's already + * available from TableInfo. + */ + private void writeObject(ObjectOutputStream oos) + throws IOException { + // Suppress commonality with TableInfo. + + assert tableInfo != null : "TableInfo can't be null at this point."; + + if (partitionSchema != null && partitionSchema.equals(tableInfo.getDataColumns())) { + partitionSchema = null; + } + + if (storageHandlerClassName != null && + storageHandlerClassName.equals(tableInfo.getStorerInfo().getStorageHandlerClass())) { + storageHandlerClassName = null; + } + + if (inputFormatClassName != null && + inputFormatClassName.equals(tableInfo.getStorerInfo().getIfClass())) { + inputFormatClassName = null; + } + + if (outputFormatClassName != null && outputFormatClassName.equals(tableInfo.getStorerInfo().getOfClass())) { + outputFormatClassName = null; + } + + if (serdeClassName != null && serdeClassName.equals(tableInfo.getStorerInfo().getSerdeClass())) { + serdeClassName = null; + } + + oos.defaultWriteObject(); } }