diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java index 33807f5..55b97dd 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -30,6 +30,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -43,6 +47,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; @@ -195,7 +200,7 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) Map jobProperties = partitionInfo.getJobProperties(); HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); - Map valuesNotInDataCols = getColValsNotInDataColumns( + Map valuesNotInDataCols = getColValsNotInDataColumns( getOutputSchema(conf), partitionInfo ); @@ -206,17 +211,30 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) /** * gets values for fields requested by output schema which will not be in the data */ - private static Map getColValsNotInDataColumns(HCatSchema outputSchema, - PartInfo partInfo) { + private static Map getColValsNotInDataColumns(HCatSchema outputSchema, + PartInfo partInfo) throws HCatException { HCatSchema dataSchema = partInfo.getPartitionSchema(); - Map vals = new HashMap(); + Map vals = new HashMap(); for (String fieldName : outputSchema.getFieldNames()) { if (dataSchema.getPosition(fieldName) == null) { // this entry of output is not present in the output schema // so, we first check the table schema to see if it is a part col - if (partInfo.getPartitionValues().containsKey(fieldName)) { - vals.put(fieldName, partInfo.getPartitionValues().get(fieldName)); + + // First, get the appropriate field schema for this field + HCatFieldSchema fschema = outputSchema.get(fieldName); + + // For a partition key type, this will be a primitive typeinfo. + // Obtain relevant object inspector for this typeinfo + ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(fschema.getTypeInfo()); + + // get appropriate object from the string representation of the value in partInfo.getPartitionValues() + // Essentially, partition values are represented as strings, but we want the actual object type associated + Object objVal = ObjectInspectorConverters + .getConverter(PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi) + .convert(partInfo.getPartitionValues().get(fieldName)); + + vals.put(fieldName, objVal); } else { vals.put(fieldName, null); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatRecordReader.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatRecordReader.java index 3ee6157..7aecb0f 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatRecordReader.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatRecordReader.java @@ -63,7 +63,7 @@ private Deserializer deserializer; - private Map valuesNotInDataCols; + private Map valuesNotInDataCols; private HCatSchema outputSchema = null; private HCatSchema dataSchema = null; @@ -72,7 +72,7 @@ * Instantiates a new hcat record reader. */ public HCatRecordReader(HiveStorageHandler storageHandler, - Map valuesNotInDataCols) { + Map valuesNotInDataCols) { this.storageHandler = storageHandler; this.valuesNotInDataCols = valuesNotInDataCols; } diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java index ee57f3f..c98d947 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java @@ -381,6 +381,7 @@ Job runMRCreate(Map partitionValues, List parti readRecords.clear(); Configuration conf = new Configuration(); + conf.set(HiveConf.ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname,"true"); Job job = new Job(conf, "hcat mapreduce read test"); job.setJarByClass(this.getClass()); job.setMapperClass(HCatMapReduceTest.MapRead.class); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java index a386415..d31f29c 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java @@ -35,7 +35,6 @@ import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; -import org.junit.BeforeClass; import org.junit.Test; import static junit.framework.Assert.assertEquals; @@ -73,7 +72,7 @@ public TestHCatPartitioned(String formatName, String serdeClass, String inputFor List fields = new ArrayList(); //Defining partition names in unsorted order fields.add(new FieldSchema("PaRT1", serdeConstants.STRING_TYPE_NAME, "")); - fields.add(new FieldSchema("part0", serdeConstants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("part0", serdeConstants.INT_TYPE_NAME, "")); return fields; } @@ -85,19 +84,18 @@ public TestHCatPartitioned(String formatName, String serdeClass, String inputFor return fields; } - @Test public void testHCatPartitionedTable() throws Exception { Map partitionMap = new HashMap(); partitionMap.put("part1", "p1value1"); - partitionMap.put("part0", "p0value1"); + partitionMap.put("part0", "501"); runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true); partitionMap.clear(); partitionMap.put("PART1", "p1value2"); - partitionMap.put("PART0", "p0value2"); + partitionMap.put("PART0", "502"); runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); @@ -121,7 +119,7 @@ public void testHCatPartitionedTable() throws Exception { exc = null; partitionMap.clear(); partitionMap.put("px1", "p1value2"); - partitionMap.put("px0", "p0value2"); + partitionMap.put("px0", "502"); try { Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); @@ -136,7 +134,7 @@ public void testHCatPartitionedTable() throws Exception { //Test for publish with missing partition key values exc = null; partitionMap.clear(); - partitionMap.put("px", "p1value2"); + partitionMap.put("px", "512"); try { runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); @@ -171,17 +169,17 @@ public void testHCatPartitionedTable() throws Exception { //Read with partition filter runMRRead(10, "part1 = \"p1value1\""); - runMRRead(10, "part0 = \"p0value1\""); + runMRRead(10, "part0 = \"501\""); if (isTableImmutable()){ runMRRead(20, "part1 = \"p1value2\""); runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\""); - runMRRead(20, "part0 = \"p0value2\""); - runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\""); + runMRRead(20, "part0 = \"502\""); + runMRRead(30, "part0 = \"501\" or part0 = \"502\""); } else { runMRRead(40, "part1 = \"p1value2\""); runMRRead(50, "part1 = \"p1value1\" or part1 = \"p1value2\""); - runMRRead(40, "part0 = \"p0value2\""); - runMRRead(50, "part0 = \"p0value1\" or part0 = \"p0value2\""); + runMRRead(40, "part0 = \"502\""); + runMRRead(50, "part0 = \"501\" or part0 = \"502\""); } tableSchemaTest(); @@ -214,7 +212,7 @@ private void tableSchemaTest() throws Exception { Map partitionMap = new HashMap(); partitionMap.put("part1", "p1value5"); - partitionMap.put("part0", "p0value5"); + partitionMap.put("part0", "505"); runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true); @@ -231,7 +229,7 @@ private void tableSchemaTest() throws Exception { //Test that changing column data type fails partitionMap.clear(); partitionMap.put("part1", "p1value6"); - partitionMap.put("part0", "p0value6"); + partitionMap.put("part0", "506"); partitionColumns = new ArrayList(); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""))); @@ -276,7 +274,7 @@ private void tableSchemaTest() throws Exception { List records = runMRRead(20, "part1 = \"p1value6\""); assertEquals(20, records.size()); - records = runMRRead(20, "part0 = \"p0value6\""); + records = runMRRead(20, "part0 = \"506\""); assertEquals(20, records.size()); Integer i = 0; for (HCatRecord rec : records) { @@ -285,7 +283,7 @@ private void tableSchemaTest() throws Exception { assertEquals(rec.get(1), "c2value" + i); assertEquals(rec.get(2), "c3value" + i); assertEquals(rec.get(3), "p1value6"); - assertEquals(rec.get(4), "p0value6"); + assertEquals(rec.get(4), 506); i++; } } @@ -317,7 +315,7 @@ private void columnOrderChangeTest() throws Exception { Map partitionMap = new HashMap(); partitionMap.put("part1", "p1value8"); - partitionMap.put("part0", "p0value8"); + partitionMap.put("part0", "508"); Exception exc = null; try {