diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java index 4e788d4..1ff4b4a 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java @@ -27,7 +27,7 @@ import java.util.Properties; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.Credentials; diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java index 696081f..f727be4 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.common.HCatConstants; @@ -142,7 +142,7 @@ public class PigHCatUtil { HiveMetaStoreClient client = null; try { client = createHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class); - table = client.getTable(dbName, tableName); + table = HCatUtil.getTable(client, dbName, tableName); } catch (NoSuchObjectException nsoe){ throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend } catch (Exception e) { diff --git src/java/org/apache/hcatalog/common/HCatUtil.java src/java/org/apache/hcatalog/common/HCatUtil.java index 5b744fe..0b5b624 100644 --- src/java/org/apache/hcatalog/common/HCatUtil.java +++ src/java/org/apache/hcatalog/common/HCatUtil.java @@ -39,9 +39,12 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -155,15 +158,12 @@ public class HCatUtil { } } - public static HCatSchema extractSchemaFromStorageDescriptor( - StorageDescriptor sd) throws HCatException { - if (sd == null) { - throw new HCatException( - "Cannot construct partition info from an empty storage descriptor."); - } - HCatSchema schema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(sd - .getCols())); - return schema; + public static HCatSchema extractSchema(Table table) throws HCatException { + return new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols())); + } + + public static HCatSchema extractSchema(Partition partition) throws HCatException { + return new HCatSchema(HCatUtil.getHCatFieldSchemaList(partition.getCols())); } public static List getFieldSchemaList( @@ -179,14 +179,13 @@ public class HCatUtil { } } - public static Table getTable(HiveMetaStoreClient client, String dbName, - String tableName) throws Exception { - return client.getTable(dbName, tableName); + public static Table getTable(HiveMetaStoreClient client, String dbName, String tableName) + throws NoSuchObjectException, TException, MetaException { + return new Table(client.getTable(dbName, tableName)); } public static HCatSchema getTableSchemaWithPtnCols(Table table) throws IOException { - HCatSchema tableSchema = new HCatSchema(HCatUtil.getHCatFieldSchemaList( - new org.apache.hadoop.hive.ql.metadata.Table(table).getCols())); + HCatSchema tableSchema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols())); if (table.getPartitionKeys().size() != 0) { @@ -207,8 +206,7 @@ public class HCatUtil { * @return HCatSchema instance which contains the partition columns * @throws IOException */ - public static HCatSchema getPartitionColumns(Table table) - throws IOException { + public static HCatSchema getPartitionColumns(Table table) throws IOException { HCatSchema cols = new HCatSchema(new LinkedList()); if (table.getPartitionKeys().size() != 0) { for (FieldSchema fs : table.getPartitionKeys()) { @@ -236,7 +234,7 @@ public class HCatUtil { partitionKeyMap.put(field.getName().toLowerCase(), field); } - List tableCols = table.getSd().getCols(); + List tableCols = table.getCols(); List newFields = new ArrayList(); for (int i = 0; i < partitionSchema.getFields().size(); i++) { diff --git src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java index a6b7d96..b5bcb47 100644 --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapred.HCatMapRedUtil; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; @@ -183,8 +183,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); if (dynamicPartitioningUsed){ src = new Path(getPartitionRootLocation( - jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize() - )); + jobInfo.getLocation(), jobInfo.getTableInfo().getTable().getPartitionKeysSize())); }else{ src = new Path(jobInfo.getLocation()); } @@ -238,8 +237,8 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); Configuration conf = context.getConfiguration(); - Table table = jobInfo.getTableInfo().getTable(); - Path tblPath = new Path(table.getSd().getLocation()); + Table table = new Table(jobInfo.getTableInfo().getTable()); + Path tblPath = new Path(table.getTTable().getSd().getLocation()); FileSystem fs = tblPath.getFileSystem(conf); if( table.getPartitionKeys().size() == 0 ) { @@ -275,7 +274,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { HiveConf hiveConf = HCatUtil.getHiveConf(conf); client = HCatUtil.createHiveClient(hiveConf); - StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters()); + StorerInfo storer = InternalUtil.extractStorerInfo(table.getTTable().getSd(),table.getParameters()); updateTableSchema(client, table, jobInfo.getOutputSchema()); @@ -421,7 +420,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { Table table, FileSystem fs, String grpName, FsPermission perms) throws IOException { - StorageDescriptor tblSD = table.getSd(); + StorageDescriptor tblSD = table.getTTable().getSd(); Partition partition = new Partition(); partition.setDbName(table.getDbName()); @@ -495,7 +494,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { private String getFinalDynamicPartitionDestination(Table table, Map partKVs) { // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA -> // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA - Path partPath = new Path(table.getSd().getLocation()); + Path partPath = new Path(table.getTTable().getSd().getLocation()); for(FieldSchema partKey : table.getPartitionKeys()){ partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); } @@ -536,12 +535,12 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { List newColumns = HCatUtil.validatePartitionSchema(table, partitionSchema); if( newColumns.size() != 0 ) { - List tableColumns = new ArrayList(table.getSd().getCols()); + List tableColumns = new ArrayList(table.getTTable().getSd().getCols()); tableColumns.addAll(newColumns); //Update table schema to add the newly added columns - table.getSd().setCols(tableColumns); - client.alter_table(table.getDbName(), table.getTableName(), table); + table.getTTable().getSd().setCols(tableColumns); + client.alter_table(table.getDbName(), table.getTableName(), table.getTTable()); } } diff --git src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java index 3a3fca3..978afb4 100644 --- src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java +++ src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -120,7 +120,7 @@ class FileOutputFormatContainer extends OutputFormatContainer { handleDuplicatePublish(context, jobInfo, client, - jobInfo.getTableInfo().getTable()); + new Table(jobInfo.getTableInfo().getTable())); } catch (MetaException e) { throw new IOException(e); } catch (TException e) { @@ -190,7 +190,7 @@ class FileOutputFormatContainer extends OutputFormatContainer { table, outputInfo.getPartitionValues()); // non-partitioned table - Path tablePath = new Path(table.getSd().getLocation()); + Path tablePath = new Path(table.getTTable().getSd().getLocation()); FileSystem fs = tablePath.getFileSystem(context.getConfiguration()); if ( fs.exists(tablePath) ) { diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java index e76690f..a51cca1 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -215,8 +215,10 @@ public abstract class HCatBaseOutputFormat extends OutputFormat indexList = client.listIndexNames(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName(), Short.MAX_VALUE); @@ -83,7 +84,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a table with an automatic index from Pig/Mapreduce is not supported"); } } - StorageDescriptor sd = table.getSd(); + StorageDescriptor sd = table.getTTable().getSd(); if (sd.isCompressed()) { throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a compressed partition from Pig/Mapreduce is not supported"); @@ -97,7 +98,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with sorted column definition from Pig/Mapreduce is not supported"); } - if (table.getPartitionKeysSize() == 0 ){ + if (table.getTTable().getPartitionKeysSize() == 0 ){ if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())){ // attempt made to save partition values in non-partitioned table - throw error. throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, @@ -117,7 +118,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { } if ((outputJobInfo.getPartitionValues() == null) - || (outputJobInfo.getPartitionValues().size() < table.getPartitionKeysSize())){ + || (outputJobInfo.getPartitionValues().size() < table.getTTable().getPartitionKeysSize())){ // dynamic partition usecase - partition values were null, or not all were specified // need to figure out which keys are not specified. List dynamicPartitioningKeys = new ArrayList(); @@ -128,7 +129,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { } } - if (valueMap.size() + dynamicPartitioningKeys.size() != table.getPartitionKeysSize()){ + if (valueMap.size() + dynamicPartitioningKeys.size() != table.getTTable().getPartitionKeysSize()){ // If this isn't equal, then bogus key values have been inserted, error out. throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified"); } @@ -148,9 +149,9 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { outputJobInfo.setPartitionValues(valueMap); } - StorageDescriptor tblSD = table.getSd(); - HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD); - StorerInfo storerInfo = InternalUtil.extractStorerInfo(tblSD,table.getParameters()); + HCatSchema tableSchema = HCatUtil.extractSchema(table); + StorerInfo storerInfo = + InternalUtil.extractStorerInfo(table.getTTable().getSd(), table.getParameters()); List partitionCols = new ArrayList(); for(FieldSchema schema : table.getPartitionKeys()) { @@ -160,7 +161,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(job.getConfiguration(), storerInfo); //Serialize the output info into the configuration - outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); + outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable())); outputJobInfo.setOutputSchema(tableSchema); harRequested = getHarRequested(hiveConf); outputJobInfo.setHarRequested(harRequested); @@ -169,7 +170,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { HCatUtil.configureOutputStorageHandler(storageHandler,job,outputJobInfo); - Path tblPath = new Path(table.getSd().getLocation()); + Path tblPath = new Path(table.getTTable().getSd().getLocation()); /* Set the umask in conf such that files/dirs get created with table-dir * permissions. Following three assumptions are made: diff --git src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java index 99bebc5..542a9f0 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java +++ src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java @@ -20,7 +20,7 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; import java.util.Map; -import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -54,7 +54,7 @@ class HCatRecordReader extends RecordReader { /** The storage handler used */ private final HCatStorageHandler storageHandler; - private SerDe serde; + private Deserializer deserializer; private Map valuesNotInDataCols; @@ -82,7 +82,7 @@ class HCatRecordReader extends RecordReader { HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext); - serde = createSerDe(hcatSplit, storageHandler, taskContext); + createDeserializer(hcatSplit, storageHandler, taskContext); // Pull the output schema out of the TaskAttemptContext outputSchema = (HCatSchema) HCatUtil.deserialize( @@ -108,22 +108,20 @@ class HCatRecordReader extends RecordReader { InternalUtil.createReporter(taskContext)); } - private SerDe createSerDe(HCatSplit hcatSplit, HCatStorageHandler storageHandler, + private void createDeserializer(HCatSplit hcatSplit, HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException { - SerDe serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), + deserializer = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), taskContext.getConfiguration()); try { - InternalUtil.initializeInputSerDe(serde, storageHandler.getConf(), + InternalUtil.initializeDeserializer(deserializer, storageHandler.getConf(), hcatSplit.getPartitionInfo().getTableInfo(), hcatSplit.getPartitionInfo().getPartitionSchema()); } catch (SerDeException e) { - throw new IOException("Failed initializing SerDe " + throw new IOException("Failed initializing deserializer " + storageHandler.getSerDeClass().getName(), e); } - - return serde; } /* (non-Javadoc) @@ -139,18 +137,14 @@ class HCatRecordReader extends RecordReader { * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() */ @Override - public HCatRecord getCurrentValue() - throws IOException, InterruptedException { - HCatRecord r; - + public HCatRecord getCurrentValue() throws IOException, InterruptedException { try { - - r = new LazyHCatRecord(serde.deserialize(currentValue),serde.getObjectInspector()); + HCatRecord r = new LazyHCatRecord(deserializer.deserialize(currentValue), + deserializer.getObjectInspector()); DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size()); int i = 0; for (String fieldName : outputSchema.getFieldNames()){ - Integer dataPosn = null; - if ((dataPosn = dataSchema.getPosition(fieldName)) != null){ + if (dataSchema.getPosition(fieldName) != null){ dr.set(i, r.get(fieldName,dataSchema)); } else { dr.set(i, valuesNotInDataCols.get(fieldName)); diff --git src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java index 16423fa..40fb32b 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java +++ src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java @@ -118,7 +118,7 @@ public class HCatTableInfo implements Serializable { } public String getTableLocation() { - return table.getSd().getLocation(); + return table.getSd().getLocation(); } /** @@ -137,14 +137,16 @@ public class HCatTableInfo implements Serializable { * @throws IOException */ static HCatTableInfo valueOf(Table table) throws IOException { - HCatSchema dataColumns = - HCatUtil.extractSchemaFromStorageDescriptor(table.getSd()); - StorerInfo storerInfo = - InternalUtil.extractStorerInfo(table.getSd(), table.getParameters()); - HCatSchema partitionColumns = HCatUtil.getPartitionColumns(table); + org.apache.hadoop.hive.ql.metadata.Table mTable = + new org.apache.hadoop.hive.ql.metadata.Table(table); + HCatSchema schema = HCatUtil.extractSchema(mTable); + + StorerInfo storerInfo = InternalUtil.extractStorerInfo(table.getSd(), table.getParameters()); + + HCatSchema partitionColumns = HCatUtil.getPartitionColumns(mTable); return new HCatTableInfo(table.getDbName(), table.getTableName(), - dataColumns, + schema, partitionColumns, storerInfo, table); diff --git src/java/org/apache/hcatalog/mapreduce/InitializeInput.java src/java/org/apache/hcatalog/mapreduce/InitializeInput.java index 5dcd898..db626bc 100644 --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; @@ -94,12 +94,13 @@ public class InitializeInput { hiveConf = new HiveConf(HCatInputFormat.class); } client = HCatUtil.createHiveClient(hiveConf); - Table table = client.getTable(inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName()); + + Table table = + HCatUtil.getTable(client, inputJobInfo.getDatabaseName(), inputJobInfo.getTableName()); List partInfoList = new ArrayList(); - inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); + inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable())); if( table.getPartitionKeys().size() != 0 ) { //Partitioned table List parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(), @@ -115,18 +116,20 @@ public class InitializeInput { // populate partition info for (Partition ptn : parts){ - PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(), - job.getConfiguration(), - inputJobInfo); + org.apache.hadoop.hive.ql.metadata.Partition mPartition = + new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn); + HCatSchema schema = HCatUtil.extractSchema(mPartition); + PartInfo partInfo = extractPartInfo(schema, mPartition.getTPartition().getSd(), + ptn.getParameters(), job.getConfiguration(), inputJobInfo); partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); } }else{ //Non partitioned table - PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters(), - job.getConfiguration(), - inputJobInfo); + HCatSchema schema = HCatUtil.extractSchema(table); + PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(), + table.getParameters(), job.getConfiguration(), inputJobInfo); partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); } @@ -160,10 +163,10 @@ public class InitializeInput { return ptnKeyValues; } - static PartInfo extractPartInfo(StorageDescriptor sd, - Map parameters, Configuration conf, - InputJobInfo inputJobInfo) throws IOException{ - HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); + private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd, + Map parameters, Configuration conf, InputJobInfo inputJobInfo) + throws IOException { + StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters); Properties hcatProperties = new Properties(); diff --git src/java/org/apache/hcatalog/mapreduce/InternalUtil.java src/java/org/apache/hcatalog/mapreduce/InternalUtil.java index fa78a61..86b1fa9 100644 --- src/java/org/apache/hcatalog/mapreduce/InternalUtil.java +++ src/java/org/apache/hcatalog/mapreduce/InternalUtil.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; @@ -132,23 +133,19 @@ class InternalUtil { //TODO this has to find a better home, it's also hardcoded as default in hive would be nice // if the default was decided by the serde - static void initializeOutputSerDe(SerDe serDe, Configuration conf, - OutputJobInfo jobInfo) - throws SerDeException { - initializeSerDe(serDe, conf, jobInfo.getTableInfo(), - jobInfo.getOutputSchema()); + static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo) + throws SerDeException { + serDe.initialize(conf, getSerdeProperties(jobInfo.getTableInfo(), jobInfo.getOutputSchema())); } - static void initializeInputSerDe(SerDe serDe, Configuration conf, - HCatTableInfo info, HCatSchema s) - throws SerDeException { - initializeSerDe(serDe, conf, info, s); + static void initializeDeserializer(Deserializer deserializer, Configuration conf, + HCatTableInfo info, HCatSchema schema) throws SerDeException { + deserializer.initialize(conf, getSerdeProperties(info, schema)); } - static void initializeSerDe(SerDe serDe, Configuration conf, - HCatTableInfo info, HCatSchema s) - throws SerDeException { - Properties props = new Properties(); + private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s) + throws SerDeException { + Properties props = new Properties(); List fields = HCatUtil.getFieldSchemaList(s.getFields()); props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); @@ -162,7 +159,7 @@ class InternalUtil { //add props from params set in table schema props.putAll(info.getStorerInfo().getProperties()); - serDe.initialize(conf,props); + return props; } static Reporter createReporter(TaskAttemptContext context) { diff --git src/test/org/apache/hcatalog/common/TestHCatUtil.java src/test/org/apache/hcatalog/common/TestHCatUtil.java index 7e658a7..b07b2a5 100644 --- src/test/org/apache/hcatalog/common/TestHCatUtil.java +++ src/test/org/apache/hcatalog/common/TestHCatUtil.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde.Constants; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; @@ -120,9 +120,11 @@ public class TestHCatUtil { "location", "org.apache.hadoop.mapred.TextInputFormat", "org.apache.hadoop.mapred.TextOutputFormat", false, -1, new SerDeInfo(), new ArrayList(), new ArrayList(), new HashMap()); - Table table = new Table("test_tblname", "test_dbname", "test_owner", 0, 0, 0, - sd, new ArrayList(), new HashMap(), - "viewOriginalText", "viewExpandedText", TableType.EXTERNAL_TABLE.name()); + org.apache.hadoop.hive.metastore.api.Table apiTable = + new org.apache.hadoop.hive.metastore.api.Table("test_tblname", "test_dbname", "test_owner", + 0, 0, 0, sd, new ArrayList(), new HashMap(), + "viewOriginalText", "viewExpandedText", TableType.EXTERNAL_TABLE.name()); + Table table = new Table(apiTable); List expectedHCatSchema = Lists.newArrayList(new HCatFieldSchema("username", HCatFieldSchema.Type.STRING, null)); @@ -133,7 +135,7 @@ public class TestHCatUtil { // Add a partition key & ensure its reflected in the schema. List partitionKeys = Lists.newArrayList(new FieldSchema("dt", Constants.STRING_TYPE_NAME, null)); - table.setPartitionKeys(partitionKeys); + table.getTTable().setPartitionKeys(partitionKeys); expectedHCatSchema.add(new HCatFieldSchema("dt", HCatFieldSchema.Type.STRING, null)); Assert.assertEquals(new HCatSchema(expectedHCatSchema), HCatUtil.getTableSchemaWithPtnCols(table)); @@ -163,9 +165,11 @@ public class TestHCatUtil { false, -1, serDeInfo, new ArrayList(), new ArrayList(), new HashMap()); - Table table = new Table("test_tblname", "test_dbname", "test_owner", 0, 0, 0, - sd, new ArrayList(), new HashMap(), - "viewOriginalText", "viewExpandedText", TableType.EXTERNAL_TABLE.name()); + org.apache.hadoop.hive.metastore.api.Table apiTable = + new org.apache.hadoop.hive.metastore.api.Table("test_tblname", "test_dbname", "test_owner", + 0, 0, 0, sd, new ArrayList(), new HashMap(), + "viewOriginalText", "viewExpandedText", TableType.EXTERNAL_TABLE.name()); + Table table = new Table(apiTable); List expectedHCatSchema = Lists.newArrayList( new HCatFieldSchema("myint", HCatFieldSchema.Type.INT, null),