diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java index 33807f5..9220de1 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -125,16 +125,6 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) setInputPath(jobConf, partitionInfo.getLocation()); Map jobProperties = partitionInfo.getJobProperties(); - HCatSchema allCols = new HCatSchema(new LinkedList()); - for (HCatFieldSchema field : - inputJobInfo.getTableInfo().getDataColumns().getFields()) { - allCols.append(field); - } - for (HCatFieldSchema field : - inputJobInfo.getTableInfo().getPartitionColumns().getFields()) { - allCols.append(field); - } - HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); storageHandler = HCatUtil.getStorageHandler( @@ -158,9 +148,7 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) inputFormat.getSplits(jobConf, desiredNumSplits); for (org.apache.hadoop.mapred.InputSplit split : baseSplits) { - splits.add(new HCatSplit( - partitionInfo, - split, allCols)); + splits.add(new HCatSplit(partitionInfo, split)); } } @@ -185,6 +173,12 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); PartInfo partitionInfo = hcatSplit.getPartitionInfo(); + // Ensure PartInfo's TableInfo is initialized. + if (partitionInfo.getTableInfo() == null) { + partitionInfo.setTableInfo(((InputJobInfo)HCatUtil.deserialize( + taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO) + )).getTableInfo()); + } JobContext jobContext = taskContext; Configuration conf = jobContext.getConfiguration(); diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java index bcedb3a..0aa498a 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java @@ -24,7 +24,6 @@ import java.lang.reflect.Constructor; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; @@ -44,11 +43,6 @@ /** The split returned by the underlying InputFormat split. */ private org.apache.hadoop.mapred.InputSplit baseMapRedSplit; - /** The schema for the HCatTable */ - private HCatSchema tableSchema; - - private HiveConf hiveConf; - /** * Instantiates a new hcat split. */ @@ -60,16 +54,13 @@ public HCatSplit() { * * @param partitionInfo the partition info * @param baseMapRedSplit the base mapred split - * @param tableSchema the table level schema */ public HCatSplit(PartInfo partitionInfo, - org.apache.hadoop.mapred.InputSplit baseMapRedSplit, - HCatSchema tableSchema) { + org.apache.hadoop.mapred.InputSplit baseMapRedSplit) { this.partitionInfo = partitionInfo; // dataSchema can be obtained from partitionInfo.getPartitionSchema() this.baseMapRedSplit = baseMapRedSplit; - this.tableSchema = tableSchema; } /** @@ -101,7 +92,8 @@ public HCatSchema getDataSchema() { * @return the table schema */ public HCatSchema getTableSchema() { - return this.tableSchema; + assert this.partitionInfo.getTableInfo() != null : "TableInfo should have been set at this point."; + return this.partitionInfo.getTableInfo().getAllColumns(); } /* (non-Javadoc) @@ -159,9 +151,6 @@ public void readFields(DataInput input) throws IOException { } catch (Exception e) { throw new IOException("Exception from " + baseSplitClassName, e); } - - String tableSchemaString = WritableUtils.readString(input); - tableSchema = (HCatSchema) HCatUtil.deserialize(tableSchemaString); } /* (non-Javadoc) @@ -178,10 +167,6 @@ public void write(DataOutput output) throws IOException { Writable baseSplitWritable = (Writable) baseMapRedSplit; //write baseSplit into output baseSplitWritable.write(output); - - //write the table schema into output - String tableSchemaString = HCatUtil.serialize(tableSchema); - WritableUtils.writeString(output, tableSchemaString); } } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java index 13faf15..14c93ab 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java @@ -21,10 +21,13 @@ import java.io.IOException; import java.io.Serializable; +import java.util.List; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; /** @@ -112,6 +115,15 @@ public HCatSchema getPartitionColumns() { } /** + * @return HCatSchema with all columns (i.e. data and partition columns). + */ + public HCatSchema getAllColumns() { + List allColumns = Lists.newArrayList(dataColumns.getFields()); + allColumns.addAll(partitionColumns.getFields()); + return new HCatSchema(allColumns); + } + + /** * @return the storerInfo */ public StorerInfo getStorerInfo() { diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java index 360e77b..1f23f3f 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java @@ -182,5 +182,10 @@ private void readObject(ObjectInputStream ois) ObjectInputStream partInfoReader = new ObjectInputStream(new InflaterInputStream(ois)); partitions = (List)partInfoReader.readObject(); + for (PartInfo partInfo : partitions) { + if (partInfo.getTableInfo() == null) { + partInfo.setTableInfo(this.tableInfo); + } + } } } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java index 651a9a0..fca0a92 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java @@ -18,27 +18,32 @@ */ package org.apache.hive.hcatalog.mapreduce; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Map; import java.util.Properties; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** The Class used to serialize the partition information read from the metadata server that maps to a partition. */ public class PartInfo implements Serializable { + private static Logger LOG = LoggerFactory.getLogger(PartInfo.class); /** The serialization version */ private static final long serialVersionUID = 1L; - /** The partition schema. */ - private final HCatSchema partitionSchema; + /** The partition data-schema. */ + private HCatSchema partitionSchema; /** The information about which input storage handler to use */ - private final String storageHandlerClassName; - private final String inputFormatClassName; - private final String outputFormatClassName; - private final String serdeClassName; + private String storageHandlerClassName; + private String inputFormatClassName; + private String outputFormatClassName; + private String serdeClassName; /** HCat-specific properties set at the partition */ private final Properties hcatProperties; @@ -52,8 +57,11 @@ /** Job properties associated with this parition */ Map jobProperties; - /** the table info associated with this partition */ - HCatTableInfo tableInfo; + /** + * The table info associated with this partition. + * Not serialized per PartInfo instance. Constant, per table. + */ + transient HCatTableInfo tableInfo; /** * Instantiates a new hcat partition info. @@ -162,4 +170,97 @@ public void setPartitionValues(Map partitionValues) { public HCatTableInfo getTableInfo() { return tableInfo; } + + void setTableInfo(HCatTableInfo thatTableInfo) { + this.tableInfo = thatTableInfo; + + if (partitionSchema == null) { + partitionSchema = tableInfo.getDataColumns(); + } + + if (storageHandlerClassName == null) { + storageHandlerClassName = tableInfo.getStorerInfo().getStorageHandlerClass(); + } + + if (inputFormatClassName == null) { + inputFormatClassName = tableInfo.getStorerInfo().getIfClass(); + } + + if (outputFormatClassName == null) { + outputFormatClassName = tableInfo.getStorerInfo().getOfClass(); + } + + if (serdeClassName == null) { + serdeClassName = tableInfo.getStorerInfo().getSerdeClass(); + } + } + + /** + * Serialization method. Suppresses serialization of redundant information that's already + * available from TableInfo. + */ + private void writeObject(ObjectOutputStream oos) + throws IOException { + // Suppress commonality with TableInfo. + + assert tableInfo != null : "TableInfo can't be null at this point."; + + if (partitionSchema != null) { + if (partitionSchema.equals(tableInfo.getDataColumns())) { + partitionSchema = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Can't suppress data-schema. Partition-schema and table-schema seem to differ! " + + " partitionSchema: " + partitionSchema.getFields() + + " tableSchema: " + tableInfo.getDataColumns()); + } + } + } + + if (storageHandlerClassName != null) { + if (storageHandlerClassName.equals(tableInfo.getStorerInfo().getStorageHandlerClass())) { + storageHandlerClassName = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Partition's storageHandler (" + storageHandlerClassName + ") " + + "differs from table's storageHandler (" + tableInfo.getStorerInfo().getStorageHandlerClass() + ")."); + } + } + } + + if (inputFormatClassName != null) { + if (inputFormatClassName.equals(tableInfo.getStorerInfo().getIfClass())) { + inputFormatClassName = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Partition's InputFormat (" + inputFormatClassName + ") " + + "differs from table's InputFormat (" + tableInfo.getStorerInfo().getIfClass() + ")."); + } + } + } + + if (outputFormatClassName != null) { + if (outputFormatClassName.equals(tableInfo.getStorerInfo().getOfClass())) { + outputFormatClassName = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Partition's OutputFormat (" + outputFormatClassName + ") " + + "differs from table's OutputFormat (" + tableInfo.getStorerInfo().getOfClass() + ")."); + } + } + } + + if (serdeClassName != null) { + if (serdeClassName.equals(tableInfo.getStorerInfo().getSerdeClass())) { + serdeClassName = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Partition's SerDe (" + serdeClassName + ") " + + "differs from table's SerDe (" + tableInfo.getStorerInfo().getSerdeClass() + ")."); + } + } + } + + oos.defaultWriteObject(); + } } diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java index add9d41..f716da9 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; +import com.google.common.collect.Lists; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; @@ -106,7 +107,7 @@ private void initTable() throws Exception { tbl.setDbName(dbName); tbl.setTableName(tblName); StorageDescriptor sd = new StorageDescriptor(); - sd.setCols(fields); + sd.setCols(Lists.newArrayList(new FieldSchema("data_column", serdeConstants.STRING_TYPE_NAME, ""))); tbl.setSd(sd); //sd.setLocation("hdfs://tmp"); @@ -151,7 +152,7 @@ public void testSetOutput() throws Exception { assertEquals(1, jobInfo.getPartitionValues().size()); assertEquals("p1", jobInfo.getPartitionValues().get("colname")); assertEquals(1, jobInfo.getTableInfo().getDataColumns().getFields().size()); - assertEquals("colname", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName()); + assertEquals("data_column", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName()); publishTest(job); }