diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java index 00912ef..83e51bd 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java @@ -247,10 +247,7 @@ public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata switch(type){ case BINARY: - ByteArrayRef ba = new ByteArrayRef(); - byte[] bytes = (null == pigObj) ? new byte[0] : ((DataByteArray)pigObj).get(); - ba.setData(bytes); - return ba; + return (null == pigObj) ? new byte[0] : ((DataByteArray)pigObj).get(); case STRUCT: if (pigObj == null) { diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java index 4e788d4..1ff4b4a 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java @@ -27,7 +27,7 @@ import java.util.Properties; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.Credentials; diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java index 696081f..bdb16f1 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.common.HCatConstants; @@ -142,7 +142,7 @@ public class PigHCatUtil { HiveMetaStoreClient client = null; try { client = createHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class); - table = client.getTable(dbName, tableName); + table = HCatUtil.getTable(client, dbName, tableName); } catch (NoSuchObjectException nsoe){ throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend } catch (Exception e) { @@ -320,7 +320,7 @@ public class PigHCatUtil { Type itemType = hfs.getType(); switch (itemType){ case BINARY: - result = (o == null) ? null : new DataByteArray(((ByteArrayRef)o).getData()); + result = (o == null) ? null : new DataByteArray((byte[]) o); break; case STRUCT: result = transformToTuple((List)o,hfs); diff --git hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java index 942e93d..62547ad 100644 --- hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java +++ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java @@ -22,14 +22,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; -import junit.framework.TestCase; - -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.mapreduce.HCatBaseTest; import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigServer; @@ -37,28 +32,12 @@ import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.LogUtils; +import org.junit.Assert; +import org.junit.Test; -public class TestHCatStorer extends TestCase { - private static final String TEST_DATA_DIR = System.getProperty("user.dir") + - "/build/test/data/" + TestHCatStorer.class.getCanonicalName(); - private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; +public class TestHCatStorer extends HCatBaseTest { private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data"; - private Driver driver; - - @Override - protected void setUp() throws Exception { - if (driver == null) { - HiveConf hiveConf = new HiveConf(this.getClass()); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); - driver = new Driver(hiveConf); - SessionState.start(new CliSessionState(hiveConf)); - } - } - // public void testStoreFuncMap() throws IOException{ // // driver.run("drop table junit_unparted"); @@ -90,10 +69,11 @@ public class TestHCatStorer extends TestCase { // Iterator itr = res.iterator(); // System.out.println(itr.next()); // System.out.println(itr.next()); -// assertFalse(itr.hasNext()); +// Assert.assertFalse(itr.hasNext()); // // } + @Test public void testPartColsInData() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); @@ -118,16 +98,17 @@ public class TestHCatStorer extends TestCase { while(itr.hasNext()){ Tuple t = itr.next(); - assertEquals(2, t.size()); - assertEquals(t.get(0), i); - assertEquals(t.get(1), "1"); + Assert.assertEquals(2, t.size()); + Assert.assertEquals(t.get(0), i); + Assert.assertEquals(t.get(1), "1"); i++; } - assertFalse(itr.hasNext()); - assertEquals(11, i); + Assert.assertFalse(itr.hasNext()); + Assert.assertEquals(11, i); } + @Test public void testMultiPartColsInData() throws IOException, CommandNeedRetryException{ driver.run("drop table employee"); @@ -161,15 +142,16 @@ public class TestHCatStorer extends TestCase { driver.run("select * from employee"); ArrayList results = new ArrayList(); driver.getResults(results); - assertEquals(4, results.size()); + Assert.assertEquals(4, results.size()); Collections.sort(results); - assertEquals(inputData[0], results.get(0)); - assertEquals(inputData[1], results.get(1)); - assertEquals(inputData[2], results.get(2)); - assertEquals(inputData[3], results.get(3)); + Assert.assertEquals(inputData[0], results.get(0)); + Assert.assertEquals(inputData[1], results.get(1)); + Assert.assertEquals(inputData[2], results.get(2)); + Assert.assertEquals(inputData[3], results.get(3)); driver.run("drop table employee"); } + @Test public void testStoreInPartiitonedTbl() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); @@ -194,16 +176,17 @@ public class TestHCatStorer extends TestCase { while(itr.hasNext()){ Tuple t = itr.next(); - assertEquals(2, t.size()); - assertEquals(t.get(0), i); - assertEquals(t.get(1), "1"); + Assert.assertEquals(2, t.size()); + Assert.assertEquals(t.get(0), i); + Assert.assertEquals(t.get(1), "1"); i++; } - assertFalse(itr.hasNext()); - assertEquals(11, i); + Assert.assertFalse(itr.hasNext()); + Assert.assertEquals(11, i); } + @Test public void testNoAlias() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_parted"); String createTable = "create table junit_parted(a int, b string) partitioned by (ds string) stored as RCFILE"; @@ -222,12 +205,12 @@ public class TestHCatStorer extends TestCase { } catch(PigException fe){ PigException pe = LogUtils.getPigException(fe); - assertTrue(pe instanceof FrontendException); - assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode()); - assertTrue(pe.getMessage().contains("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.")); + Assert.assertTrue(pe instanceof FrontendException); + Assert.assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode()); + Assert.assertTrue(pe.getMessage().contains("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.")); errCaught = true; } - assertTrue(errCaught); + Assert.assertTrue(errCaught); errCaught = false; try{ server.setBatchOn(); @@ -238,15 +221,16 @@ public class TestHCatStorer extends TestCase { } catch(PigException fe){ PigException pe = LogUtils.getPigException(fe); - assertTrue(pe instanceof FrontendException); - assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode()); - assertTrue(pe.getMessage().contains("Column names should all be in lowercase. Invalid name found: B")); + Assert.assertTrue(pe instanceof FrontendException); + Assert.assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode()); + Assert.assertTrue(pe.getMessage().contains("Column names should all be in lowercase. Invalid name found: B")); errCaught = true; } driver.run("drop table junit_parted"); - assertTrue(errCaught); + Assert.assertTrue(errCaught); } + @Test public void testStoreMultiTables() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); @@ -294,13 +278,14 @@ public class TestHCatStorer extends TestCase { Iterator itr = res.iterator(); for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) { - assertEquals( input[i] ,itr.next()); + Assert.assertEquals( input[i] ,itr.next()); } - assertFalse(itr.hasNext()); + Assert.assertFalse(itr.hasNext()); } + @Test public void testStoreWithNoSchema() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); @@ -332,13 +317,14 @@ public class TestHCatStorer extends TestCase { driver.run("drop table junit_unparted"); Iterator itr = res.iterator(); for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) { - assertEquals( input[i] ,itr.next()); + Assert.assertEquals( input[i] ,itr.next()); } - assertFalse(itr.hasNext()); + Assert.assertFalse(itr.hasNext()); } + @Test public void testStoreWithNoCtorArgs() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); @@ -370,13 +356,14 @@ public class TestHCatStorer extends TestCase { driver.run("drop table junit_unparted"); Iterator itr = res.iterator(); for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) { - assertEquals( input[i] ,itr.next()); + Assert.assertEquals( input[i] ,itr.next()); } - assertFalse(itr.hasNext()); + Assert.assertFalse(itr.hasNext()); } + @Test public void testEmptyStore() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); @@ -408,10 +395,11 @@ public class TestHCatStorer extends TestCase { driver.getResults(res); driver.run("drop table junit_unparted"); Iterator itr = res.iterator(); - assertFalse(itr.hasNext()); + Assert.assertFalse(itr.hasNext()); } + @Test public void testBagNStruct() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); String createTable = "create table junit_unparted(b string,a struct, arr_of_struct array, " + @@ -438,12 +426,13 @@ public class TestHCatStorer extends TestCase { driver.getResults(res); driver.run("drop table junit_unparted"); Iterator itr = res.iterator(); - assertEquals("zookeeper\t{\"a1\":2}\t[\"pig\"]\t[{\"s1\":\"pnuts\",\"s2\":\"hdfs\"}]\t[{\"s3\":\"hadoop\"},{\"s3\":\"hcat\"}]", itr.next()); - assertEquals("chubby\t{\"a1\":2}\t[\"sawzall\"]\t[{\"s1\":\"bigtable\",\"s2\":\"gfs\"}]\t[{\"s3\":\"mapreduce\"},{\"s3\":\"hcat\"}]",itr.next()); - assertFalse(itr.hasNext()); + Assert.assertEquals("zookeeper\t{\"a1\":2}\t[\"pig\"]\t[{\"s1\":\"pnuts\",\"s2\":\"hdfs\"}]\t[{\"s3\":\"hadoop\"},{\"s3\":\"hcat\"}]", itr.next()); + Assert.assertEquals("chubby\t{\"a1\":2}\t[\"sawzall\"]\t[{\"s1\":\"bigtable\",\"s2\":\"gfs\"}]\t[{\"s3\":\"mapreduce\"},{\"s3\":\"hcat\"}]",itr.next()); + Assert.assertFalse(itr.hasNext()); } + @Test public void testStoreFuncAllSimpleTypes() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); @@ -473,32 +462,25 @@ public class TestHCatStorer extends TestCase { driver.getResults(res); Iterator itr = res.iterator(); - assertEquals( "0\tNULL\tNULL\tNULL\tNULL\t\tnull" ,itr.next()); - assertEquals( "NULL\t4.2\t2.2\t4\tlets hcat\tbinary-data\tnull" ,itr.next()); - assertEquals( "3\t6.2999997\t3.3000000000000003\t6\tlets hcat\tbinary-data\tnull",itr.next()); - assertFalse(itr.hasNext()); + Assert.assertEquals( "0\tNULL\tNULL\tNULL\tNULL\t\tnull" ,itr.next()); + Assert.assertEquals( "NULL\t4.2\t2.2\t4\tlets hcat\tbinary-data\tnull" ,itr.next()); + Assert.assertEquals( "3\t6.2999997\t3.3000000000000003\t6\tlets hcat\tbinary-data\tnull",itr.next()); + Assert.assertFalse(itr.hasNext()); server.registerQuery("B = load 'junit_unparted' using "+HCatLoader.class.getName()+";"); Iterator iter = server.openIterator("B"); int count = 0; while(iter.hasNext()){ Tuple t = iter.next(); - assertTrue(t.get(5) instanceof DataByteArray); - assertNull(t.get(6)); + Assert.assertTrue(t.get(5) instanceof DataByteArray); + Assert.assertNull(t.get(6)); count++; } - assertEquals(3, count); + Assert.assertEquals(3, count); driver.run("drop table junit_unparted"); } - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - - - + @Test public void testStoreFuncSimple() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); @@ -532,14 +514,14 @@ public class TestHCatStorer extends TestCase { for(int i = 1; i <= LOOP_SIZE; i++) { String si = i + ""; for(int j=1;j<=LOOP_SIZE;j++) { - assertEquals( si + "\t"+j,itr.next()); + Assert.assertEquals( si + "\t"+j,itr.next()); } } - assertFalse(itr.hasNext()); + Assert.assertFalse(itr.hasNext()); } - + @Test public void testDynamicPartitioningMultiPartColsInDataPartialSpec() throws IOException, CommandNeedRetryException{ driver.run("drop table if exists employee"); @@ -567,15 +549,16 @@ public class TestHCatStorer extends TestCase { driver.run("select * from employee"); ArrayList results = new ArrayList(); driver.getResults(results); - assertEquals(4, results.size()); + Assert.assertEquals(4, results.size()); Collections.sort(results); - assertEquals(inputData[0], results.get(0)); - assertEquals(inputData[1], results.get(1)); - assertEquals(inputData[2], results.get(2)); - assertEquals(inputData[3], results.get(3)); + Assert.assertEquals(inputData[0], results.get(0)); + Assert.assertEquals(inputData[1], results.get(1)); + Assert.assertEquals(inputData[2], results.get(2)); + Assert.assertEquals(inputData[3], results.get(3)); driver.run("drop table employee"); } + @Test public void testDynamicPartitioningMultiPartColsInDataNoSpec() throws IOException, CommandNeedRetryException{ driver.run("drop table if exists employee"); @@ -603,15 +586,16 @@ public class TestHCatStorer extends TestCase { driver.run("select * from employee"); ArrayList results = new ArrayList(); driver.getResults(results); - assertEquals(4, results.size()); + Assert.assertEquals(4, results.size()); Collections.sort(results); - assertEquals(inputData[0], results.get(0)); - assertEquals(inputData[1], results.get(1)); - assertEquals(inputData[2], results.get(2)); - assertEquals(inputData[3], results.get(3)); + Assert.assertEquals(inputData[0], results.get(0)); + Assert.assertEquals(inputData[1], results.get(1)); + Assert.assertEquals(inputData[2], results.get(2)); + Assert.assertEquals(inputData[3], results.get(3)); driver.run("drop table employee"); } + @Test public void testDynamicPartitioningMultiPartColsNoDataInDataNoSpec() throws IOException, CommandNeedRetryException{ driver.run("drop table if exists employee"); @@ -636,7 +620,7 @@ public class TestHCatStorer extends TestCase { driver.run("select * from employee"); ArrayList results = new ArrayList(); driver.getResults(results); - assertEquals(0, results.size()); + Assert.assertEquals(0, results.size()); driver.run("drop table employee"); } } diff --git src/java/org/apache/hcatalog/common/HCatUtil.java src/java/org/apache/hcatalog/common/HCatUtil.java index 5b744fe..0b5b624 100644 --- src/java/org/apache/hcatalog/common/HCatUtil.java +++ src/java/org/apache/hcatalog/common/HCatUtil.java @@ -39,9 +39,12 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -155,15 +158,12 @@ public class HCatUtil { } } - public static HCatSchema extractSchemaFromStorageDescriptor( - StorageDescriptor sd) throws HCatException { - if (sd == null) { - throw new HCatException( - "Cannot construct partition info from an empty storage descriptor."); - } - HCatSchema schema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(sd - .getCols())); - return schema; + public static HCatSchema extractSchema(Table table) throws HCatException { + return new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols())); + } + + public static HCatSchema extractSchema(Partition partition) throws HCatException { + return new HCatSchema(HCatUtil.getHCatFieldSchemaList(partition.getCols())); } public static List getFieldSchemaList( @@ -179,14 +179,13 @@ public class HCatUtil { } } - public static Table getTable(HiveMetaStoreClient client, String dbName, - String tableName) throws Exception { - return client.getTable(dbName, tableName); + public static Table getTable(HiveMetaStoreClient client, String dbName, String tableName) + throws NoSuchObjectException, TException, MetaException { + return new Table(client.getTable(dbName, tableName)); } public static HCatSchema getTableSchemaWithPtnCols(Table table) throws IOException { - HCatSchema tableSchema = new HCatSchema(HCatUtil.getHCatFieldSchemaList( - new org.apache.hadoop.hive.ql.metadata.Table(table).getCols())); + HCatSchema tableSchema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols())); if (table.getPartitionKeys().size() != 0) { @@ -207,8 +206,7 @@ public class HCatUtil { * @return HCatSchema instance which contains the partition columns * @throws IOException */ - public static HCatSchema getPartitionColumns(Table table) - throws IOException { + public static HCatSchema getPartitionColumns(Table table) throws IOException { HCatSchema cols = new HCatSchema(new LinkedList()); if (table.getPartitionKeys().size() != 0) { for (FieldSchema fs : table.getPartitionKeys()) { @@ -236,7 +234,7 @@ public class HCatUtil { partitionKeyMap.put(field.getName().toLowerCase(), field); } - List tableCols = table.getSd().getCols(); + List tableCols = table.getCols(); List newFields = new ArrayList(); for (int i = 0; i < partitionSchema.getFields().size(); i++) { diff --git src/java/org/apache/hcatalog/data/LazyHCatRecord.java src/java/org/apache/hcatalog/data/LazyHCatRecord.java index 18c1292..3d2323b 100644 --- src/java/org/apache/hcatalog/data/LazyHCatRecord.java +++ src/java/org/apache/hcatalog/data/LazyHCatRecord.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +43,7 @@ public class LazyHCatRecord extends HCatRecord { public static final Logger LOG = LoggerFactory.getLogger(LazyHCatRecord.class.getName()); - private Object o; + private Object wrappedObject; private StructObjectInspector soi; @Override @@ -50,7 +51,7 @@ public class LazyHCatRecord extends HCatRecord { try { StructField fref = soi.getAllStructFieldRefs().get(fieldNum); return HCatRecordSerDe.serializeField( - soi.getStructFieldData(o, fref), + soi.getStructFieldData(wrappedObject, fref), fref.getFieldObjectInspector()); } catch (SerDeException e) { throw new IllegalStateException("SerDe Exception deserializing",e); @@ -93,10 +94,40 @@ public class LazyHCatRecord extends HCatRecord { } @Override - public Object get(String fieldName, HCatSchema recordSchema) - throws HCatException { + public Object get(String fieldName, HCatSchema recordSchema) throws HCatException { + int idx = recordSchema.getPosition(fieldName); - return get(idx); + StructField structField = soi.getAllStructFieldRefs().get(idx); + Object structFieldData = soi.getStructFieldData(wrappedObject, structField); + + Object result; + try { + result = HCatRecordSerDe.serializeField(structFieldData, + structField.getFieldObjectInspector()); + } catch (SerDeException e) { + throw new IllegalStateException("SerDe Exception deserializing", e); + } + + if (result == null) { + return result; + } + + // By default, Enum fields are handled in the Hive style: struct + // Users can optionally get the string enum value by setting a string field schema. + if (HCatFieldSchema.Type.STRING.equals(recordSchema.get(fieldName).getType()) && + Enum.class.isAssignableFrom(structFieldData.getClass())) { + return soi.getStructFieldData(wrappedObject, structField).toString(); + } + + // By default, Boolean fields are treated as Boolean. Users can convert to integer + // by setting the field schema to integer. This may be useful for pre-boolean-support + // pig users. + if (HCatFieldSchema.Type.INT.equals(recordSchema.get(fieldName).getType()) && + Boolean.class.isAssignableFrom(structFieldData.getClass())) { + return (Boolean) soi.getStructFieldData(wrappedObject, structField) ? 1 : 0; + } + + return result; } @Override @@ -115,7 +146,7 @@ public class LazyHCatRecord extends HCatRecord { throw new UnsupportedOperationException("not allowed to run copy() on LazyHCatRecord"); } - public LazyHCatRecord(Object o, ObjectInspector oi) + public LazyHCatRecord(Object wrappedObject, ObjectInspector oi) throws Exception { if (oi.getCategory() != Category.STRUCT) { @@ -126,7 +157,7 @@ public class LazyHCatRecord extends HCatRecord { } this.soi = (StructObjectInspector)oi; - this.o = o; + this.wrappedObject = wrappedObject; } @Override diff --git src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java index a6b7d96..b5bcb47 100644 --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapred.HCatMapRedUtil; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; @@ -183,8 +183,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); if (dynamicPartitioningUsed){ src = new Path(getPartitionRootLocation( - jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize() - )); + jobInfo.getLocation(), jobInfo.getTableInfo().getTable().getPartitionKeysSize())); }else{ src = new Path(jobInfo.getLocation()); } @@ -238,8 +237,8 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); Configuration conf = context.getConfiguration(); - Table table = jobInfo.getTableInfo().getTable(); - Path tblPath = new Path(table.getSd().getLocation()); + Table table = new Table(jobInfo.getTableInfo().getTable()); + Path tblPath = new Path(table.getTTable().getSd().getLocation()); FileSystem fs = tblPath.getFileSystem(conf); if( table.getPartitionKeys().size() == 0 ) { @@ -275,7 +274,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { HiveConf hiveConf = HCatUtil.getHiveConf(conf); client = HCatUtil.createHiveClient(hiveConf); - StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters()); + StorerInfo storer = InternalUtil.extractStorerInfo(table.getTTable().getSd(),table.getParameters()); updateTableSchema(client, table, jobInfo.getOutputSchema()); @@ -421,7 +420,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { Table table, FileSystem fs, String grpName, FsPermission perms) throws IOException { - StorageDescriptor tblSD = table.getSd(); + StorageDescriptor tblSD = table.getTTable().getSd(); Partition partition = new Partition(); partition.setDbName(table.getDbName()); @@ -495,7 +494,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { private String getFinalDynamicPartitionDestination(Table table, Map partKVs) { // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA -> // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA - Path partPath = new Path(table.getSd().getLocation()); + Path partPath = new Path(table.getTTable().getSd().getLocation()); for(FieldSchema partKey : table.getPartitionKeys()){ partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); } @@ -536,12 +535,12 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { List newColumns = HCatUtil.validatePartitionSchema(table, partitionSchema); if( newColumns.size() != 0 ) { - List tableColumns = new ArrayList(table.getSd().getCols()); + List tableColumns = new ArrayList(table.getTTable().getSd().getCols()); tableColumns.addAll(newColumns); //Update table schema to add the newly added columns - table.getSd().setCols(tableColumns); - client.alter_table(table.getDbName(), table.getTableName(), table); + table.getTTable().getSd().setCols(tableColumns); + client.alter_table(table.getDbName(), table.getTableName(), table.getTTable()); } } diff --git src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java index 3a3fca3..978afb4 100644 --- src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java +++ src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -120,7 +120,7 @@ class FileOutputFormatContainer extends OutputFormatContainer { handleDuplicatePublish(context, jobInfo, client, - jobInfo.getTableInfo().getTable()); + new Table(jobInfo.getTableInfo().getTable())); } catch (MetaException e) { throw new IOException(e); } catch (TException e) { @@ -190,7 +190,7 @@ class FileOutputFormatContainer extends OutputFormatContainer { table, outputInfo.getPartitionValues()); // non-partitioned table - Path tablePath = new Path(table.getSd().getLocation()); + Path tablePath = new Path(table.getTTable().getSd().getLocation()); FileSystem fs = tablePath.getFileSystem(context.getConfiguration()); if ( fs.exists(tablePath) ) { diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java index e76690f..a51cca1 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -215,8 +215,10 @@ public abstract class HCatBaseOutputFormat extends OutputFormat indexList = client.listIndexNames(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName(), Short.MAX_VALUE); @@ -83,7 +84,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a table with an automatic index from Pig/Mapreduce is not supported"); } } - StorageDescriptor sd = table.getSd(); + StorageDescriptor sd = table.getTTable().getSd(); if (sd.isCompressed()) { throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a compressed partition from Pig/Mapreduce is not supported"); @@ -97,7 +98,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with sorted column definition from Pig/Mapreduce is not supported"); } - if (table.getPartitionKeysSize() == 0 ){ + if (table.getTTable().getPartitionKeysSize() == 0 ){ if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())){ // attempt made to save partition values in non-partitioned table - throw error. throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, @@ -117,7 +118,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { } if ((outputJobInfo.getPartitionValues() == null) - || (outputJobInfo.getPartitionValues().size() < table.getPartitionKeysSize())){ + || (outputJobInfo.getPartitionValues().size() < table.getTTable().getPartitionKeysSize())){ // dynamic partition usecase - partition values were null, or not all were specified // need to figure out which keys are not specified. List dynamicPartitioningKeys = new ArrayList(); @@ -128,7 +129,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { } } - if (valueMap.size() + dynamicPartitioningKeys.size() != table.getPartitionKeysSize()){ + if (valueMap.size() + dynamicPartitioningKeys.size() != table.getTTable().getPartitionKeysSize()){ // If this isn't equal, then bogus key values have been inserted, error out. throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified"); } @@ -148,9 +149,9 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { outputJobInfo.setPartitionValues(valueMap); } - StorageDescriptor tblSD = table.getSd(); - HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD); - StorerInfo storerInfo = InternalUtil.extractStorerInfo(tblSD,table.getParameters()); + HCatSchema tableSchema = HCatUtil.extractSchema(table); + StorerInfo storerInfo = + InternalUtil.extractStorerInfo(table.getTTable().getSd(), table.getParameters()); List partitionCols = new ArrayList(); for(FieldSchema schema : table.getPartitionKeys()) { @@ -160,7 +161,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(job.getConfiguration(), storerInfo); //Serialize the output info into the configuration - outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); + outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable())); outputJobInfo.setOutputSchema(tableSchema); harRequested = getHarRequested(hiveConf); outputJobInfo.setHarRequested(harRequested); @@ -169,7 +170,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { HCatUtil.configureOutputStorageHandler(storageHandler,job,outputJobInfo); - Path tblPath = new Path(table.getSd().getLocation()); + Path tblPath = new Path(table.getTTable().getSd().getLocation()); /* Set the umask in conf such that files/dirs get created with table-dir * permissions. Following three assumptions are made: diff --git src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java index 99bebc5..cdd775e 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java +++ src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java @@ -19,8 +19,9 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; import java.util.Map; +import java.util.Properties; -import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -54,7 +55,7 @@ class HCatRecordReader extends RecordReader { /** The storage handler used */ private final HCatStorageHandler storageHandler; - private SerDe serde; + private Deserializer deserializer; private Map valuesNotInDataCols; @@ -82,7 +83,7 @@ class HCatRecordReader extends RecordReader { HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext); - serde = createSerDe(hcatSplit, storageHandler, taskContext); + createDeserializer(hcatSplit, storageHandler, taskContext); // Pull the output schema out of the TaskAttemptContext outputSchema = (HCatSchema) HCatUtil.deserialize( @@ -108,22 +109,44 @@ class HCatRecordReader extends RecordReader { InternalUtil.createReporter(taskContext)); } - private SerDe createSerDe(HCatSplit hcatSplit, HCatStorageHandler storageHandler, + /** + * Create the deserializer. + * + * Hive initializes deserializers with SerDeInfo parameters. HCatalog supplements these + * with some keys expected by LazySimpleSerDe, and the Table properties. Properties are + * added in the following order (last to define a key wins): + *
    + *
  1. LazySimpleSerDe properties
  2. + *
  3. Table properties
  4. + *
  5. SerDeInfo properties
  6. + *
+ */ + private void createDeserializer(HCatSplit hcatSplit, HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException { - SerDe serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), + deserializer = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), taskContext.getConfiguration()); + Properties props = new Properties(); + try { - InternalUtil.initializeInputSerDe(serde, storageHandler.getConf(), - hcatSplit.getPartitionInfo().getTableInfo(), + InternalUtil.setLazySimpleSerDeProperties(props, hcatSplit.getPartitionInfo().tableInfo, hcatSplit.getPartitionInfo().getPartitionSchema()); } catch (SerDeException e) { - throw new IOException("Failed initializing SerDe " - + storageHandler.getSerDeClass().getName(), e); + throw new IOException("Failed setting deserializer properties.", e); + } + + for (Map.Entry e : hcatSplit.getPartitionInfo().getSerDeInfo().getParameters().entrySet()) { + props.put(e.getKey(), e.getValue()); } - return serde; + try { + deserializer.initialize(storageHandler.getConf(), props); + } catch (SerDeException e) { + throw new IOException("Failed initializing deserializer " + + storageHandler.getSerDeClass().getName() + " with parameters " + + hcatSplit.getPartitionInfo().getSerDeInfo().getParameters(), e); + } } /* (non-Javadoc) @@ -139,18 +162,14 @@ class HCatRecordReader extends RecordReader { * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() */ @Override - public HCatRecord getCurrentValue() - throws IOException, InterruptedException { - HCatRecord r; - + public HCatRecord getCurrentValue() throws IOException, InterruptedException { try { - - r = new LazyHCatRecord(serde.deserialize(currentValue),serde.getObjectInspector()); + HCatRecord r = new LazyHCatRecord(deserializer.deserialize(currentValue), + deserializer.getObjectInspector()); DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size()); int i = 0; for (String fieldName : outputSchema.getFieldNames()){ - Integer dataPosn = null; - if ((dataPosn = dataSchema.getPosition(fieldName)) != null){ + if (dataSchema.getPosition(fieldName) != null){ dr.set(i, r.get(fieldName,dataSchema)); } else { dr.set(i, valuesNotInDataCols.get(fieldName)); diff --git src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java index 16423fa..40fb32b 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java +++ src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java @@ -118,7 +118,7 @@ public class HCatTableInfo implements Serializable { } public String getTableLocation() { - return table.getSd().getLocation(); + return table.getSd().getLocation(); } /** @@ -137,14 +137,16 @@ public class HCatTableInfo implements Serializable { * @throws IOException */ static HCatTableInfo valueOf(Table table) throws IOException { - HCatSchema dataColumns = - HCatUtil.extractSchemaFromStorageDescriptor(table.getSd()); - StorerInfo storerInfo = - InternalUtil.extractStorerInfo(table.getSd(), table.getParameters()); - HCatSchema partitionColumns = HCatUtil.getPartitionColumns(table); + org.apache.hadoop.hive.ql.metadata.Table mTable = + new org.apache.hadoop.hive.ql.metadata.Table(table); + HCatSchema schema = HCatUtil.extractSchema(mTable); + + StorerInfo storerInfo = InternalUtil.extractStorerInfo(table.getSd(), table.getParameters()); + + HCatSchema partitionColumns = HCatUtil.getPartitionColumns(mTable); return new HCatTableInfo(table.getDbName(), table.getTableName(), - dataColumns, + schema, partitionColumns, storerInfo, table); diff --git src/java/org/apache/hcatalog/mapreduce/InitializeInput.java src/java/org/apache/hcatalog/mapreduce/InitializeInput.java index 5dcd898..aa013e0 100644 --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; @@ -94,12 +94,13 @@ public class InitializeInput { hiveConf = new HiveConf(HCatInputFormat.class); } client = HCatUtil.createHiveClient(hiveConf); - Table table = client.getTable(inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName()); + + Table table = + HCatUtil.getTable(client, inputJobInfo.getDatabaseName(), inputJobInfo.getTableName()); List partInfoList = new ArrayList(); - inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); + inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable())); if( table.getPartitionKeys().size() != 0 ) { //Partitioned table List parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(), @@ -115,18 +116,20 @@ public class InitializeInput { // populate partition info for (Partition ptn : parts){ - PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(), - job.getConfiguration(), - inputJobInfo); + org.apache.hadoop.hive.ql.metadata.Partition mPartition = + new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn); + HCatSchema schema = HCatUtil.extractSchema(mPartition); + PartInfo partInfo = extractPartInfo(schema, mPartition.getTPartition().getSd(), + ptn.getParameters(), job.getConfiguration(), inputJobInfo); partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); } }else{ //Non partitioned table - PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters(), - job.getConfiguration(), - inputJobInfo); + HCatSchema schema = HCatUtil.extractSchema(table); + PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(), + table.getParameters(), job.getConfiguration(), inputJobInfo); partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); } @@ -160,10 +163,10 @@ public class InitializeInput { return ptnKeyValues; } - static PartInfo extractPartInfo(StorageDescriptor sd, - Map parameters, Configuration conf, - InputJobInfo inputJobInfo) throws IOException{ - HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); + private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd, + Map parameters, Configuration conf, InputJobInfo inputJobInfo) + throws IOException { + StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters); Properties hcatProperties = new Properties(); @@ -181,7 +184,7 @@ public class InitializeInput { // FIXME // Bloating partinfo with inputJobInfo is not good return new PartInfo(schema, storageHandler, - sd.getLocation(), hcatProperties, + sd, hcatProperties, jobProperties, inputJobInfo.getTableInfo()); } diff --git src/java/org/apache/hcatalog/mapreduce/InternalUtil.java src/java/org/apache/hcatalog/mapreduce/InternalUtil.java index fa78a61..212c49c 100644 --- src/java/org/apache/hcatalog/mapreduce/InternalUtil.java +++ src/java/org/apache/hcatalog/mapreduce/InternalUtil.java @@ -132,37 +132,29 @@ class InternalUtil { //TODO this has to find a better home, it's also hardcoded as default in hive would be nice // if the default was decided by the serde - static void initializeOutputSerDe(SerDe serDe, Configuration conf, - OutputJobInfo jobInfo) - throws SerDeException { - initializeSerDe(serDe, conf, jobInfo.getTableInfo(), - jobInfo.getOutputSchema()); + static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo) + throws SerDeException { + serDe.initialize(conf, setLazySimpleSerDeProperties( + new Properties(), jobInfo.getTableInfo(), jobInfo.getOutputSchema())); } - static void initializeInputSerDe(SerDe serDe, Configuration conf, - HCatTableInfo info, HCatSchema s) - throws SerDeException { - initializeSerDe(serDe, conf, info, s); - } - - static void initializeSerDe(SerDe serDe, Configuration conf, - HCatTableInfo info, HCatSchema s) - throws SerDeException { - Properties props = new Properties(); + /** + * Set properties required by @{link LazySimpleSerDe}. + */ + static Properties setLazySimpleSerDeProperties(Properties props, HCatTableInfo info, + HCatSchema s) throws SerDeException { List fields = HCatUtil.getFieldSchemaList(s.getFields()); props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); - // setting these props to match LazySimpleSerde props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_NULL_FORMAT, "\\N"); props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1"); //add props from params set in table schema props.putAll(info.getStorerInfo().getProperties()); - - serDe.initialize(conf,props); + return props; } static Reporter createReporter(TaskAttemptContext context) { diff --git src/java/org/apache/hcatalog/mapreduce/PartInfo.java src/java/org/apache/hcatalog/mapreduce/PartInfo.java index 235233f..4f21300 100644 --- src/java/org/apache/hcatalog/mapreduce/PartInfo.java +++ src/java/org/apache/hcatalog/mapreduce/PartInfo.java @@ -21,13 +21,14 @@ import java.io.Serializable; import java.util.Map; import java.util.Properties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.plan.TableDesc; - +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.mapreduce.HCatStorageHandler; -/** The Class used to serialize the partition information read from the metadata server that maps to a partition. */ +/** + * The Class used to serialize the partition information read from the metadata + * server that maps to a partition. + */ public class PartInfo implements Serializable { /** The serialization version */ @@ -40,7 +41,7 @@ public class PartInfo implements Serializable { private final String storageHandlerClassName; private final String inputFormatClassName; private final String outputFormatClassName; - private final String serdeClassName; + private final SerDeInfo serDeInfo; /** HCat-specific properties set at the partition */ private final Properties hcatProperties; @@ -61,23 +62,23 @@ public class PartInfo implements Serializable { * Instantiates a new hcat partition info. * @param partitionSchema the partition schema * @param storageHandler the storage handler - * @param location the location + * @param storageDescriptor the storage descriptor for this partition * @param hcatProperties hcat-specific properties at the partition * @param jobProperties the job properties * @param tableInfo the table information */ public PartInfo(HCatSchema partitionSchema, HCatStorageHandler storageHandler, - String location, Properties hcatProperties, + StorageDescriptor storageDescriptor, Properties hcatProperties, Map jobProperties, HCatTableInfo tableInfo){ this.partitionSchema = partitionSchema; - this.location = location; + this.location = storageDescriptor.getLocation(); this.hcatProperties = hcatProperties; this.jobProperties = jobProperties; this.tableInfo = tableInfo; + this.serDeInfo = storageDescriptor.getSerdeInfo(); this.storageHandlerClassName = storageHandler.getClass().getName(); this.inputFormatClassName = storageHandler.getInputFormatClass().getName(); - this.serdeClassName = storageHandler.getSerDeClass().getName(); this.outputFormatClassName = storageHandler.getOutputFormatClass().getName(); } @@ -114,7 +115,11 @@ public class PartInfo implements Serializable { * @return the serdeClassName */ public String getSerdeClassName() { - return serdeClassName; + return serDeInfo.getSerializationLib(); + } + + public SerDeInfo getSerDeInfo() { + return serDeInfo; } /** diff --git src/test/org/apache/hcatalog/common/TestHCatUtil.java src/test/org/apache/hcatalog/common/TestHCatUtil.java index 7e658a7..b07b2a5 100644 --- src/test/org/apache/hcatalog/common/TestHCatUtil.java +++ src/test/org/apache/hcatalog/common/TestHCatUtil.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde.Constants; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; @@ -120,9 +120,11 @@ public class TestHCatUtil { "location", "org.apache.hadoop.mapred.TextInputFormat", "org.apache.hadoop.mapred.TextOutputFormat", false, -1, new SerDeInfo(), new ArrayList(), new ArrayList(), new HashMap()); - Table table = new Table("test_tblname", "test_dbname", "test_owner", 0, 0, 0, - sd, new ArrayList(), new HashMap(), - "viewOriginalText", "viewExpandedText", TableType.EXTERNAL_TABLE.name()); + org.apache.hadoop.hive.metastore.api.Table apiTable = + new org.apache.hadoop.hive.metastore.api.Table("test_tblname", "test_dbname", "test_owner", + 0, 0, 0, sd, new ArrayList(), new HashMap(), + "viewOriginalText", "viewExpandedText", TableType.EXTERNAL_TABLE.name()); + Table table = new Table(apiTable); List expectedHCatSchema = Lists.newArrayList(new HCatFieldSchema("username", HCatFieldSchema.Type.STRING, null)); @@ -133,7 +135,7 @@ public class TestHCatUtil { // Add a partition key & ensure its reflected in the schema. List partitionKeys = Lists.newArrayList(new FieldSchema("dt", Constants.STRING_TYPE_NAME, null)); - table.setPartitionKeys(partitionKeys); + table.getTTable().setPartitionKeys(partitionKeys); expectedHCatSchema.add(new HCatFieldSchema("dt", HCatFieldSchema.Type.STRING, null)); Assert.assertEquals(new HCatSchema(expectedHCatSchema), HCatUtil.getTableSchemaWithPtnCols(table)); @@ -163,9 +165,11 @@ public class TestHCatUtil { false, -1, serDeInfo, new ArrayList(), new ArrayList(), new HashMap()); - Table table = new Table("test_tblname", "test_dbname", "test_owner", 0, 0, 0, - sd, new ArrayList(), new HashMap(), - "viewOriginalText", "viewExpandedText", TableType.EXTERNAL_TABLE.name()); + org.apache.hadoop.hive.metastore.api.Table apiTable = + new org.apache.hadoop.hive.metastore.api.Table("test_tblname", "test_dbname", "test_owner", + 0, 0, 0, sd, new ArrayList(), new HashMap(), + "viewOriginalText", "viewExpandedText", TableType.EXTERNAL_TABLE.name()); + Table table = new Table(apiTable); List expectedHCatSchema = Lists.newArrayList( new HCatFieldSchema("myint", HCatFieldSchema.Type.INT, null), diff --git src/test/org/apache/hcatalog/data/TestReaderWriter.java src/test/org/apache/hcatalog/data/TestReaderWriter.java index 78cbe66..3103faf 100644 --- src/test/org/apache/hcatalog/data/TestReaderWriter.java +++ src/test/org/apache/hcatalog/data/TestReaderWriter.java @@ -32,12 +32,8 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.data.transfer.DataTransferFactory; @@ -47,21 +43,19 @@ import org.apache.hcatalog.data.transfer.ReadEntity; import org.apache.hcatalog.data.transfer.ReaderContext; import org.apache.hcatalog.data.transfer.WriteEntity; import org.apache.hcatalog.data.transfer.WriterContext; +import org.apache.hcatalog.mapreduce.HCatBaseTest; import org.junit.Assert; import org.junit.Test; -public class TestReaderWriter { +public class TestReaderWriter extends HCatBaseTest { @Test public void test() throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException { - HiveConf conf = new HiveConf(getClass()); - Driver driver = new Driver(conf); - SessionState.start(new CliSessionState(conf)); driver.run("drop table mytbl"); driver.run("create table mytbl (a string, b int)"); - Iterator> itr = conf.iterator(); + Iterator> itr = hiveConf.iterator(); Map map = new HashMap(); while (itr.hasNext()) { Entry kv = itr.next(); diff --git src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java new file mode 100644 index 0000000..40cd219 --- /dev/null +++ src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java @@ -0,0 +1,98 @@ +package org.apache.hcatalog.mapreduce; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.thrift.test.IntString; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.util.Iterator; + +public class TestHCatHiveThriftCompatibility extends HCatBaseTest { + + private boolean setUpComplete = false; + private Path intStringSeq; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + if (setUpComplete) { + return; + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + TIOStreamTransport transport = new TIOStreamTransport(out); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + IntString intString = new IntString(1, "one", 1); + intString.write(protocol); + BytesWritable bytesWritable = new BytesWritable(out.toByteArray()); + + + intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq"); + LOG.info("Creating data file: " + intStringSeq); + + SequenceFile.Writer seqFileWriter = SequenceFile.createWriter( + intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq, + NullWritable.class, BytesWritable.class); + seqFileWriter.append(NullWritable.get(), bytesWritable); + seqFileWriter.close(); + + setUpComplete = true; + } + + /** + * Create a table with no explicit schema and ensure its correctly + * discovered from the thrift struct. + */ + @Test + public void testDynamicCols() throws Exception { + Assert.assertEquals(0, driver.run("drop table if exists test_thrift").getResponseCode()); + Assert.assertEquals(0, driver.run( + "create external table test_thrift " + + "partitioned by (year string) " + + "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " + + "with serdeproperties ( " + + " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " + + " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " + + "stored as" + + " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" + + " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'") + .getResponseCode()); + Assert.assertEquals(0, + driver.run("alter table test_thrift add partition (year = '2012') location '" + + intStringSeq.getParent() + "'").getResponseCode()); + + PigServer pigServer = new PigServer(ExecType.LOCAL); + pigServer.registerQuery("A = load 'test_thrift' using org.apache.hcatalog.pig.HCatLoader();"); + + Schema expectedSchema = new Schema(); + expectedSchema.add(new Schema.FieldSchema("myint", DataType.INTEGER)); + expectedSchema.add(new Schema.FieldSchema("mystring", DataType.CHARARRAY)); + expectedSchema.add(new Schema.FieldSchema("underscore_int", DataType.INTEGER)); + expectedSchema.add(new Schema.FieldSchema("year", DataType.CHARARRAY)); + + Assert.assertEquals(expectedSchema, pigServer.dumpSchema("A")); + + Iterator iterator = pigServer.openIterator("A"); + Tuple t = iterator.next(); + Assert.assertEquals(1, t.get(0)); + Assert.assertEquals("one", t.get(1)); + Assert.assertEquals(1, t.get(2)); + Assert.assertEquals("2012", t.get(3)); + + Assert.assertFalse(iterator.hasNext()); + } +}