diff --git src/java/org/apache/hcatalog/common/HCatUtil.java src/java/org/apache/hcatalog/common/HCatUtil.java index 3492d06..53fb712 100644 --- src/java/org/apache/hcatalog/common/HCatUtil.java +++ src/java/org/apache/hcatalog/common/HCatUtil.java @@ -26,6 +26,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -183,6 +184,22 @@ public class HCatUtil { } /** + * return the partition columns from a table instance + * @param table the instance to extract partition columns from + * @return HCatSchema instance which contains the partition columns + * @throws IOException + */ + public static HCatSchema getPartitionColumns(Table table) throws IOException{ + HCatSchema cols = new HCatSchema(new LinkedList()); + if( table.getPartitionKeys().size() != 0 ) { + for (FieldSchema fs : table.getPartitionKeys()){ + cols.append(HCatSchemaUtils.getHCatFieldSchema(fs)); + } + } + return cols; + } + + /** * Validate partition schema, checks if the column types match between the partition * and the existing table schema. Returns the list of columns present in the partition * but not in the table. diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java index 011bd1b..16c52d2 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -20,6 +20,7 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import org.apache.hadoop.io.WritableComparable; @@ -32,6 +33,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; public abstract class HCatBaseInputFormat extends InputFormat { @@ -75,15 +77,15 @@ public abstract class HCatBaseInputFormat extends InputFormat splits = new ArrayList(); - List partitionInfoList = jobInfo.getPartitions(); + List partitionInfoList = inputJobInfo.getPartitions(); if(partitionInfoList == null ) { //No partitions match the specified partition filter return splits; @@ -99,8 +101,14 @@ public abstract class HCatBaseInputFormat extends InputFormat()); + for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields()) + allCols.append(field); + for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields()) + allCols.append(field); + //Pass all required information to the storage driver - initStorageDriver(storageDriver, localJob, partitionInfo, jobInfo.getTableSchema()); + initStorageDriver(storageDriver, localJob, partitionInfo, allCols); //Get the input format for the storage driver InputFormat inputFormat = @@ -114,7 +122,7 @@ public abstract class HCatBaseInputFormat extends InputFormat()); + for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields()) + allCols.append(field); + for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields()) + allCols.append(field); + return allCols; } /** - * Gets the JobInfo object by reading the Configuration and deserializing - * the string. If JobInfo is not present in the configuration, throws an + * Gets the InputJobInfo object by reading the Configuration and deserializing + * the string. If InputJobInfo is not present in the configuration, throws an * exception since that means HCatInputFormat.setInput has not been called. * @param jobContext the job context - * @return the JobInfo object + * @return the InputJobInfo object * @throws Exception the exception */ - private static JobInfo getJobInfo(JobContext jobContext) throws Exception { + private static InputJobInfo getJobInfo(JobContext jobContext) throws Exception { String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); if( jobString == null ) { throw new Exception("job information not found in JobContext. HCatInputFormat.setInput() not called?"); } - return (JobInfo) HCatUtil.deserialize(jobString); + return (InputJobInfo) HCatUtil.deserialize(jobString); } diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java index 5b4de72..d289714 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -52,7 +52,7 @@ public abstract class HCatBaseOutputFormat extends OutputFormat driverClass = (Class) - Class.forName(jobInfo.getStorerInfo().getOutputSDClass()); + Class.forName(jobInfo.getTableInfo().getStorerInfo().getOutputSDClass()); HCatOutputStorageDriver driver = driverClass.newInstance(); - Map partitionValues = jobInfo.getTableInfo().getPartitionValues(); + Map partitionValues = jobInfo.getPartitionValues(); String location = jobInfo.getLocation(); if (dynamicPartVals != null){ // dynamic part vals specified - List dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys(); + List dynamicPartKeys = jobInfo.getDynamicPartitioningKeys(); if (dynamicPartVals.size() != dynamicPartKeys.size()){ throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, "Unable to instantiate dynamic partitioning storage driver, mismatch between" @@ -145,7 +145,7 @@ public abstract class HCatBaseOutputFormat extends OutputFormat partitionCols = new ArrayList(); for(FieldSchema schema : table.getPartitionKeys()) { @@ -164,7 +164,7 @@ public abstract class HCatBaseOutputFormat extends OutputFormat fullPartSpec) throws IOException { - List dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys(); + List dynamicPartKeys = jobInfo.getDynamicPartitioningKeys(); if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){ return getOutputDriverInstance(context,jobInfo,(List)null); }else{ @@ -227,8 +227,8 @@ public abstract class HCatBaseOutputFormat extends OutputFormat> tp = EximUtil .readMetaData(fs, metadataPath); org.apache.hadoop.hive.metastore.api.Table table = tp.getKey(); - HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(null, - null, table.getDbName(), table.getTableName()); + InputJobInfo inputInfo = InputJobInfo.create(table.getDbName(), table.getTableName(),null,null,null); List partCols = table.getPartitionKeys(); List partInfoList = null; if (partCols.size() > 0) { @@ -98,11 +97,11 @@ public class HCatEximInputFormat extends HCatBaseInputFormat { PartInfo partInfo = new PartInfo(schema, inputStorageDriverClass, location + "/data", hcatProperties); partInfoList.add(partInfo); } - JobInfo hcatJobInfo = new JobInfo(inputInfo, - HCatUtil.getTableSchemaWithPtnCols(table), partInfoList); + inputInfo.setPartitions(partInfoList); + inputInfo.setTableInfo(HCatTableInfo.valueOf(table)); job.getConfiguration().set( HCatConstants.HCAT_KEY_JOB_INFO, - HCatUtil.serialize(hcatJobInfo)); + HCatUtil.serialize(inputInfo)); List rv = new ArrayList(2); rv.add(HCatSchemaUtils.getHCatSchema(table.getSd().getCols())); rv.add(HCatSchemaUtils.getHCatSchema(partCols)); diff --git src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java index 53431cc..4e5c4d1 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java +++ src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java @@ -65,7 +65,7 @@ public class HCatEximOutputCommitter extends HCatBaseOutputCommitter { Configuration conf = jobContext.getConfiguration(); FileSystem fs; try { - fs = FileSystem.get(new URI(jobInfo.getTable().getSd().getLocation()), conf); + fs = FileSystem.get(new URI(jobInfo.getTableInfo().getTable().getSd().getLocation()), conf); } catch (URISyntaxException e) { throw new IOException(e); } @@ -75,7 +75,7 @@ public class HCatEximOutputCommitter extends HCatBaseOutputCommitter { private static void doCleanup(OutputJobInfo jobInfo, FileSystem fs) throws IOException, HCatException { try { - Table ttable = jobInfo.getTable(); + Table ttable = jobInfo.getTableInfo().getTable(); org.apache.hadoop.hive.ql.metadata.Table table = new org.apache.hadoop.hive.ql.metadata.Table( ttable); StorageDescriptor tblSD = ttable.getSd(); @@ -96,7 +96,7 @@ public class HCatEximOutputCommitter extends HCatBaseOutputCommitter { } } if (!table.getPartitionKeys().isEmpty()) { - Map partitionValues = jobInfo.getTableInfo().getPartitionValues(); + Map partitionValues = jobInfo.getPartitionValues(); org.apache.hadoop.hive.ql.metadata.Partition partition = new org.apache.hadoop.hive.ql.metadata.Partition(table, partitionValues, diff --git src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java index 0f9104a..47ddae3 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java @@ -128,8 +128,7 @@ public class HCatEximOutputFormat extends HCatBaseOutputFormat { } } StorerInfo storerInfo = new StorerInfo(isdname, osdname, new Properties()); - HCatTableInfo outputInfo = HCatTableInfo.getOutputTableInfo(null, null, dbname, tablename, - partSpec); + OutputJobInfo outputJobInfo = OutputJobInfo.create(dbname,tablename,partSpec,null,null); org.apache.hadoop.hive.ql.metadata.Table tbl = new org.apache.hadoop.hive.ql.metadata.Table(dbname, tablename); Table table = tbl.getTTable(); @@ -146,16 +145,17 @@ public class HCatEximOutputFormat extends HCatBaseOutputFormat { StorageDescriptor sd = table.getSd(); sd.setLocation(location); String dataLocation = location + "/" + partname; - OutputJobInfo jobInfo = new OutputJobInfo(outputInfo, - columnSchema, columnSchema, storerInfo, dataLocation, table); - setPartDetails(jobInfo, columnSchema, partSpec); - sd.setCols(HCatUtil.getFieldSchemaList(jobInfo.getOutputSchema().getFields())); + outputJobInfo.setTableInfo(new HCatTableInfo(dbname,tablename,columnSchema,null,storerInfo,table)); + outputJobInfo.setOutputSchema(columnSchema); + outputJobInfo.setLocation(dataLocation); + setPartDetails(outputJobInfo, columnSchema, partSpec); + sd.setCols(HCatUtil.getFieldSchemaList(outputJobInfo.getOutputSchema().getFields())); sd.setInputFormat(ifname); sd.setOutputFormat(ofname); SerDeInfo serdeInfo = sd.getSerdeInfo(); serdeInfo.setSerializationLib(serializationLib); Configuration conf = job.getConfiguration(); - conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); + conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); } catch (IOException e) { throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); } catch (MetaException e) { diff --git src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java index 86299a9..a8f277f 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java @@ -31,13 +31,13 @@ public class HCatInputFormat extends HCatBaseInputFormat { * the information in the conf object. The inputInfo object is updated with * information needed in the client context * @param job the job object - * @param inputInfo the table input info + * @param inputJobInfo the input info for table to read * @throws IOException the exception in communicating with the metadata server */ public static void setInput(Job job, - HCatTableInfo inputInfo) throws IOException { + InputJobInfo inputJobInfo) throws IOException { try { - InitializeInput.setInput(job, inputInfo); + InitializeInput.setInput(job, inputJobInfo); } catch (Exception e) { throw new IOException(e); } diff --git src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java index d7e812f..84a712b 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java @@ -80,7 +80,7 @@ public class HCatOutputCommitter extends OutputCommitter { public HCatOutputCommitter(JobContext context, OutputCommitter baseCommitter) throws IOException { OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); - dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed(); + dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed(); if (!dynamicPartitioningUsed){ this.baseCommitter = baseCommitter; this.partitionsDiscovered = true; @@ -161,7 +161,7 @@ public class HCatOutputCommitter extends OutputCommitter { try { HiveMetaStoreClient client = HCatOutputFormat.createHiveClient( - jobInfo.getTableInfo().getServerUri(), jobContext.getConfiguration()); + jobInfo.getServerUri(), jobContext.getConfiguration()); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by // HCatOutputFormat and not if they were supplied by Oozie. In the latter @@ -189,7 +189,7 @@ public class HCatOutputCommitter extends OutputCommitter { Path src; if (dynamicPartitioningUsed){ src = new Path(getPartitionRootLocation( - jobInfo.getLocation().toString(),jobInfo.getTable().getPartitionKeysSize() + jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize() )); }else{ src = new Path(jobInfo.getLocation()); @@ -244,7 +244,7 @@ public class HCatOutputCommitter extends OutputCommitter { OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); Configuration conf = context.getConfiguration(); - Table table = jobInfo.getTable(); + Table table = jobInfo.getTableInfo().getTable(); Path tblPath = new Path(table.getSd().getLocation()); FileSystem fs = tblPath.getFileSystem(conf); @@ -283,7 +283,7 @@ public class HCatOutputCommitter extends OutputCommitter { List partitionsAdded = new ArrayList(); try { - client = HCatOutputFormat.createHiveClient(tableInfo.getServerUri(), conf); + client = HCatOutputFormat.createHiveClient(jobInfo.getServerUri(), conf); StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters()); @@ -298,7 +298,7 @@ public class HCatOutputCommitter extends OutputCommitter { partitionsToAdd.add( constructPartition( context, - tblPath.toString(), tableInfo.getPartitionValues() + tblPath.toString(), jobInfo.getPartitionValues() ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) ,table, fs ,grpName,perms)); @@ -316,19 +316,19 @@ public class HCatOutputCommitter extends OutputCommitter { //Publish the new partition(s) if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){ - + Path src = new Path(ptnRootLocation); // check here for each dir we're copying out, to see if it already exists, error out if so moveTaskOutputs(fs, src, src, tblPath,true); - + moveTaskOutputs(fs, src, src, tblPath,false); fs.delete(src, true); - - + + // for (Partition partition : partitionsToAdd){ // partitionsAdded.add(client.add_partition(partition)); -// // currently following add_partition instead of add_partitions because latter isn't +// // currently following add_partition instead of add_partitions because latter isn't // // all-or-nothing and we want to be able to roll back partitions we added if need be. // } diff --git src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java index fd43ccc..3fa55c8 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java @@ -94,41 +94,41 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { * Set the info about the output to write for the Job. This queries the metadata server * to find the StorageDriver to use for the table. Throws error if partition is already published. * @param job the job object - * @param outputInfo the table output info + * @param outputJobInfo the table output info * @throws IOException the exception in communicating with the metadata server */ @SuppressWarnings("unchecked") - public static void setOutput(Job job, HCatTableInfo outputInfo) throws IOException { + public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException { HiveMetaStoreClient client = null; try { Configuration conf = job.getConfiguration(); - client = createHiveClient(outputInfo.getServerUri(), conf); - Table table = client.getTable(outputInfo.getDatabaseName(), outputInfo.getTableName()); + client = createHiveClient(outputJobInfo.getServerUri(), conf); + Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName()); if (table.getPartitionKeysSize() == 0 ){ - if ((outputInfo.getPartitionValues() != null) && (!outputInfo.getPartitionValues().isEmpty())){ + if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())){ // attempt made to save partition values in non-partitioned table - throw error. throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, "Partition values specified for non-partitioned table"); } // non-partitioned table - outputInfo.setPartitionValues(new HashMap()); + outputJobInfo.setPartitionValues(new HashMap()); } else { // partitioned table, we expect partition values // convert user specified map to have lower case key names Map valueMap = new HashMap(); - if (outputInfo.getPartitionValues() != null){ - for(Map.Entry entry : outputInfo.getPartitionValues().entrySet()) { + if (outputJobInfo.getPartitionValues() != null){ + for(Map.Entry entry : outputJobInfo.getPartitionValues().entrySet()) { valueMap.put(entry.getKey().toLowerCase(), entry.getValue()); } } if ( - (outputInfo.getPartitionValues() == null) - || (outputInfo.getPartitionValues().size() < table.getPartitionKeysSize()) + (outputJobInfo.getPartitionValues() == null) + || (outputJobInfo.getPartitionValues().size() < table.getPartitionKeysSize()) ){ // dynamic partition usecase - partition values were null, or not all were specified // need to figure out which keys are not specified. @@ -145,7 +145,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified"); } - outputInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys); + outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys); String dynHash; if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){ dynHash = String.valueOf(Math.random()); @@ -157,11 +157,11 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { } - outputInfo.setPartitionValues(valueMap); + outputJobInfo.setPartitionValues(valueMap); } //Handle duplicate publish - handleDuplicatePublish(job, outputInfo, client, table); + handleDuplicatePublish(job, outputJobInfo, client, table); StorageDescriptor tblSD = table.getSd(); HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD); @@ -179,14 +179,15 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { String tblLocation = tblSD.getLocation(); String location = driver.getOutputLocation(job, tblLocation, partitionCols, - outputInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)); + outputJobInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)); //Serialize the output info into the configuration - OutputJobInfo jobInfo = new OutputJobInfo(outputInfo, - tableSchema, tableSchema, storerInfo, location, table); - jobInfo.setHarRequested(harRequested); - jobInfo.setMaximumDynamicPartitions(maxDynamicPartitions); - conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); + outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); + outputJobInfo.setOutputSchema(tableSchema); + outputJobInfo.setLocation(location); + outputJobInfo.setHarRequested(harRequested); + outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions); + conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); Path tblPath = new Path(tblLocation); @@ -233,7 +234,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { // will correctly pick the right tokens which the committer will use and // cancel. - String tokenSignature = getTokenSignature(outputInfo); + String tokenSignature = getTokenSignature(outputJobInfo); if(tokenMap.get(tokenSignature) == null) { // get delegation tokens from hcat server and store them into the "job" // These will be used in the HCatOutputCommitter to publish partitions to @@ -283,17 +284,17 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { // a signature string to associate with a HCatTableInfo - essentially // a concatenation of dbname, tablename and partition keyvalues. - private static String getTokenSignature(HCatTableInfo outputInfo) { + private static String getTokenSignature(OutputJobInfo outputJobInfo) { StringBuilder result = new StringBuilder(""); - String dbName = outputInfo.getDatabaseName(); + String dbName = outputJobInfo.getDatabaseName(); if(dbName != null) { result.append(dbName); } - String tableName = outputInfo.getTableName(); + String tableName = outputJobInfo.getTableName(); if(tableName != null) { result.append("+" + tableName); } - Map partValues = outputInfo.getPartitionValues(); + Map partValues = outputJobInfo.getPartitionValues(); if(partValues != null) { for(Entry entry: partValues.entrySet()) { result.append("+" + entry.getKey() + "=" + entry.getValue()); @@ -314,7 +315,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { * @throws MetaException * @throws TException */ - private static void handleDuplicatePublish(Job job, HCatTableInfo outputInfo, + private static void handleDuplicatePublish(Job job, OutputJobInfo outputInfo, HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException { /* @@ -366,7 +367,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { public static void setSchema(final Job job, final HCatSchema schema) throws IOException { OutputJobInfo jobInfo = getJobInfo(job); - Map partMap = jobInfo.getTableInfo().getPartitionValues(); + Map partMap = jobInfo.getPartitionValues(); setPartDetails(jobInfo, schema, partMap); job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); } @@ -473,7 +474,7 @@ public class HCatOutputFormat extends HCatBaseOutputFormat { OutputJobInfo info = HCatBaseOutputFormat.getJobInfo(context); // Path workFile = osd.getWorkFilePath(context,info.getLocation()); Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir")); - Path tblPath = new Path(info.getTable().getSd().getLocation()); + Path tblPath = new Path(info.getTableInfo().getTable().getSd().getLocation()); FileSystem fs = tblPath.getFileSystem(context.getConfiguration()); FileStatus tblPathStat = fs.getFileStatus(tblPath); diff --git src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java index 573f627..e71b9af 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java +++ src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java @@ -62,7 +62,7 @@ public class HCatRecordWriter extends RecordWriter, HCatRe // If partition columns occur in data, we want to remove them. partColsToDel = jobInfo.getPosOfPartCols(); - dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed(); + dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed(); dynamicPartCols = jobInfo.getPosOfDynPartCols(); maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); diff --git src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java index 14db575..e65f1d0 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java +++ src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java @@ -18,11 +18,13 @@ package org.apache.hcatalog.mapreduce; +import java.io.IOException; import java.io.Serializable; -import java.util.List; -import java.util.Map; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatSchema; /** * @@ -35,136 +37,55 @@ public class HCatTableInfo implements Serializable { private static final long serialVersionUID = 1L; - public enum TableInfoType { - INPUT_INFO, - OUTPUT_INFO - }; - - private final TableInfoType tableInfoType; - - /** The Metadata server uri */ - private final String serverUri; - - /** If the hcat server is configured to work with hadoop security, this - * variable will hold the principal name of the server - this will be used - * in the authentication to the hcat server using kerberos - */ - private final String serverKerberosPrincipal; - /** The db and table names */ - private final String dbName; + private final String databaseName; private final String tableName; - /** The partition filter */ - private String filter; - - /** The partition predicates to filter on, an arbitrary AND/OR filter, if used to input from*/ - private final String partitionPredicates; - - /** The information about the partitions matching the specified query */ - private JobInfo jobInfo; + /** The table schema. */ + private final HCatSchema dataColumns; + private final HCatSchema partitionColumns; - /** The partition values to publish to, if used for output*/ - private Map partitionValues; + /** The table being written to */ + private final Table table; - /** List of keys for which values were not specified at write setup time, to be infered at write time */ - private List dynamicPartitioningKeys; - - - /** - * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat} - * for reading data from a table. - * @param serverUri the Metadata server uri - * @param serverKerberosPrincipal If the hcat server is configured to - * work with hadoop security, the kerberos principal name of the server - else null - * The principal name should be of the form: - * /_HOST@ like "hcat/_HOST@myrealm.com" - * The special string _HOST will be replaced automatically with the correct host name - * @param dbName the db name - * @param tableName the table name - */ - public static HCatTableInfo getInputTableInfo(String serverUri, - String serverKerberosPrincipal, - String dbName, - String tableName) { - return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, (String) null); - } + /** The storer info */ + private StorerInfo storerInfo; /** * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat} * for reading data from a table. - * @param serverUri the Metadata server uri - * @param serverKerberosPrincipal If the hcat server is configured to * work with hadoop security, the kerberos principal name of the server - else null * The principal name should be of the form: * /_HOST@ like "hcat/_HOST@myrealm.com" * The special string _HOST will be replaced automatically with the correct host name - * @param dbName the db name + * @param databaseName the db name * @param tableName the table name - * @param filter the partition filter - */ - public static HCatTableInfo getInputTableInfo(String serverUri, String serverKerberosPrincipal, String dbName, - String tableName, String filter) { - return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, filter); - } - - private HCatTableInfo(String serverUri, String serverKerberosPrincipal, - String dbName, String tableName, String filter) { - this.serverUri = serverUri; - this.serverKerberosPrincipal = serverKerberosPrincipal; - this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; - this.tableName = tableName; - this.partitionPredicates = null; - this.partitionValues = null; - this.tableInfoType = TableInfoType.INPUT_INFO; - this.filter = filter; - } - /** - * Initializes a new HCatTableInfo instance to be used with {@link HCatOutputFormat} - * for writing data from a table. - * @param serverUri the Metadata server uri - * @param serverKerberosPrincipal If the hcat server is configured to - * work with hadoop security, the kerberos principal name of the server - else null - * The principal name should be of the form: - * /_HOST@ like "hcat/_HOST@myrealm.com" - * The special string _HOST will be replaced automatically with the correct host name - * @param dbName the db name - * @param tableName the table name - * @param partitionValues The partition values to publish to, can be null or empty Map to - * indicate write to a unpartitioned table. For partitioned tables, this map should - * contain keys for all partition columns with corresponding values. - */ - public static HCatTableInfo getOutputTableInfo(String serverUri, - String serverKerberosPrincipal, String dbName, String tableName, Map partitionValues){ - return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, - tableName, partitionValues); - } - - private HCatTableInfo(String serverUri, String serverKerberosPrincipal, - String dbName, String tableName, Map partitionValues){ - this.serverUri = serverUri; - this.serverKerberosPrincipal = serverKerberosPrincipal; - this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; + * @param dataColumns schema of columns which contain data + * @param partitionColumns schema of partition columns + * @param storerInfo information about storage descriptor + * @param table hive metastore table class + */ + HCatTableInfo( + String databaseName, + String tableName, + HCatSchema dataColumns, + HCatSchema partitionColumns, + StorerInfo storerInfo, + Table table) { + this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName; this.tableName = tableName; - this.partitionPredicates = null; - this.partitionValues = partitionValues; - this.tableInfoType = TableInfoType.OUTPUT_INFO; + this.dataColumns = dataColumns; + this.table = table; + this.storerInfo = storerInfo; + this.partitionColumns = partitionColumns; } /** - * Gets the value of serverUri - * @return the serverUri - */ - public String getServerUri() { - return serverUri; - } - - /** - * Gets the value of dbName - * @return the dbName + * Gets the value of databaseName + * @return the databaseName */ public String getDatabaseName() { - return dbName; + return databaseName; } /** @@ -176,97 +97,80 @@ public class HCatTableInfo implements Serializable { } /** - * Gets the value of partitionPredicates - * @return the partitionPredicates + * @return return schema of data columns as defined in meta store */ - public String getPartitionPredicates() { - return partitionPredicates; + public HCatSchema getDataColumns() { + return dataColumns; } /** - * Gets the value of partitionValues - * @return the partitionValues + * @return schema of partition columns */ - public Map getPartitionValues() { - return partitionValues; + public HCatSchema getPartitionColumns() { + return partitionColumns; } /** - * Gets the value of job info - * @return the job info + * @return the storerInfo */ - public JobInfo getJobInfo() { - return jobInfo; + public StorerInfo getStorerInfo() { + return storerInfo; } /** - * Sets the value of jobInfo - * @param jobInfo the jobInfo to set + * minimize dependency on hive classes so this is package private + * this should eventually no longer be used + * @return hive metastore representation of table */ - public void setJobInfo(JobInfo jobInfo) { - this.jobInfo = jobInfo; - } - - public TableInfoType getTableType(){ - return this.tableInfoType; + Table getTable() { + return table; } /** - * Sets the value of partitionValues - * @param partitionValues the partition values to set - */ - void setPartitionValues(Map partitionValues) { - this.partitionValues = partitionValues; + * create an HCatTableInfo instance from the supplied Hive Table instance + * @param table to create an instance from + * @return HCatTableInfo + * @throws IOException + */ + static HCatTableInfo valueOf(Table table) throws IOException { + HCatSchema dataColumns = HCatUtil.extractSchemaFromStorageDescriptor(table.getSd()); + StorerInfo storerInfo = InitializeInput.extractStorerInfo(table.getSd(), table.getParameters()); + HCatSchema partitionColumns = HCatUtil.getPartitionColumns(table); + return new HCatTableInfo(table.getDbName(), + table.getTableName(), + dataColumns, + partitionColumns, + storerInfo, + table); } - /** - * Gets the value of partition filter - * @return the filter string - */ - public String getFilter() { - return filter; - } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; - /** - * @return the serverKerberosPrincipal - */ - public String getServerKerberosPrincipal() { - return serverKerberosPrincipal; - } + HCatTableInfo tableInfo = (HCatTableInfo) o; - /** - * Returns whether or not Dynamic Partitioning is used - * @return whether or not dynamic partitioning is currently enabled and used - */ - public boolean isDynamicPartitioningUsed() { - return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty())); - } + if (dataColumns != null ? !dataColumns.equals(tableInfo.dataColumns) : tableInfo.dataColumns != null) return false; + if (databaseName != null ? !databaseName.equals(tableInfo.databaseName) : tableInfo.databaseName != null) return false; + if (partitionColumns != null ? !partitionColumns.equals(tableInfo.partitionColumns) : tableInfo.partitionColumns != null) + return false; + if (storerInfo != null ? !storerInfo.equals(tableInfo.storerInfo) : tableInfo.storerInfo != null) return false; + if (table != null ? !table.equals(tableInfo.table) : tableInfo.table != null) return false; + if (tableName != null ? !tableName.equals(tableInfo.tableName) : tableInfo.tableName != null) return false; - /** - * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys - * @param dynamicPartitioningKeys - */ - public void setDynamicPartitioningKeys(List dynamicPartitioningKeys) { - this.dynamicPartitioningKeys = dynamicPartitioningKeys; - } - - public List getDynamicPartitioningKeys(){ - return this.dynamicPartitioningKeys; + return true; } @Override public int hashCode() { - int result = 17; - result = 31*result + (serverUri == null ? 0 : serverUri.hashCode()); - result = 31*result + (serverKerberosPrincipal == null ? 0 : serverKerberosPrincipal.hashCode()); - result = 31*result + (dbName == null? 0 : dbName.hashCode()); - result = 31*result + tableName.hashCode(); - result = 31*result + (filter == null? 0 : filter.hashCode()); - result = 31*result + (partitionPredicates == null ? 0 : partitionPredicates.hashCode()); - result = 31*result + tableInfoType.ordinal(); - result = 31*result + (partitionValues == null ? 0 : partitionValues.hashCode()); - result = 31*result + (dynamicPartitioningKeys == null ? 0 : dynamicPartitioningKeys.hashCode()); + int result = databaseName != null ? databaseName.hashCode() : 0; + result = 31 * result + (tableName != null ? tableName.hashCode() : 0); + result = 31 * result + (dataColumns != null ? dataColumns.hashCode() : 0); + result = 31 * result + (partitionColumns != null ? partitionColumns.hashCode() : 0); + result = 31 * result + (table != null ? table.hashCode() : 0); + result = 31 * result + (storerInfo != null ? storerInfo.hashCode() : 0); return result; } diff --git src/java/org/apache/hcatalog/mapreduce/InitializeInput.java src/java/org/apache/hcatalog/mapreduce/InitializeInput.java index 6db5118..4e2bac8 100644 --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java @@ -50,15 +50,15 @@ public class InitializeInput { static final String HCAT_KEY_PREFIX = "hcat."; private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class); - private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, HCatTableInfo inputInfo) throws Exception { - if (inputInfo.getServerUri() != null){ + private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, InputJobInfo inputJobInfo) throws Exception { + if (inputJobInfo.getServerUri() != null){ hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, - inputInfo.getServerKerberosPrincipal()); + inputJobInfo.getServerKerberosPrincipal()); hiveConf.set("hive.metastore.local", "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputInfo.getServerUri()); + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputJobInfo.getServerUri()); } return new HiveMetaStoreClient(hiveConf,null); @@ -68,28 +68,29 @@ public class InitializeInput { * Set the input to use for the Job. This queries the metadata server with the specified partition predicates, * gets the matching partitions, puts the information in the configuration object. * @param job the job object - * @param inputInfo the hcat table input info + * @param inputJobInfo information on the Input to read * @throws Exception */ - public static void setInput(Job job, HCatTableInfo inputInfo) throws Exception { + public static void setInput(Job job, InputJobInfo inputJobInfo) throws Exception { - //* Create and initialize an JobInfo object - //* Serialize the JobInfo and save in the Job's Configuration object + //* Create and initialize an InputJobInfo object + //* Serialize the InputJobInfo and save in the Job's Configuration object HiveMetaStoreClient client = null; try { - client = createHiveMetaClient(job.getConfiguration(),inputInfo); - Table table = client.getTable(inputInfo.getDatabaseName(), inputInfo.getTableName()); - HCatSchema tableSchema = HCatUtil.getTableSchemaWithPtnCols(table); + client = createHiveMetaClient(job.getConfiguration(),inputJobInfo); + Table table = client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName()); List partInfoList = new ArrayList(); if( table.getPartitionKeys().size() != 0 ) { //Partitioned table - List parts = client.listPartitionsByFilter( - inputInfo.getDatabaseName(), inputInfo.getTableName(), - inputInfo.getFilter(), (short) -1); + List parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName(), + inputJobInfo.getFilter(), + (short) -1); // Default to 100,000 partitions if hive.metastore.maxpartition is not defined int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000); @@ -110,13 +111,12 @@ public class InitializeInput { partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); } - - JobInfo hcatJobInfo = new JobInfo(inputInfo, tableSchema, partInfoList); - inputInfo.setJobInfo(hcatJobInfo); + inputJobInfo.setPartitions(partInfoList); + inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); job.getConfiguration().set( HCatConstants.HCAT_KEY_JOB_INFO, - HCatUtil.serialize(hcatJobInfo) + HCatUtil.serialize(inputJobInfo) ); } finally { if (client != null ) { diff --git src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java new file mode 100644 index 0000000..b372b7e --- /dev/null +++ src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.mapreduce; + +import org.apache.hadoop.hive.metastore.MetaStoreUtils; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** The class used to serialize and store the information read from the metadata server */ +public class InputJobInfo implements Serializable{ + + /** The serialization version */ + private static final long serialVersionUID = 1L; + + /** The db and table names. */ + private final String databaseName; + private final String tableName; + + /** meta information of the table to be read from */ + private HCatTableInfo tableInfo; + + /** The Metadata server uri */ + private final String serverUri; + + /** If the hcat server is configured to work with hadoop security, this + * variable will hold the principal name of the server - this will be used + * in the authentication to the hcat server using kerberos + */ + private final String serverKerberosPrincipal; + + /** The partition filter */ + private String filter; + + /** The list of partitions matching the filter. */ + private List partitions; + + /** implementation specific job properties */ + private Properties properties; + + /** + * Initializes a new InputJobInfo + * for reading data from a table. + * @param databaseName the db name + * @param tableName the table name + * @param filter the partition filter + * @param serverUri the Metadata server uri + * @param serverKerberosPrincipal If the hcat server is configured to + * work with hadoop security, the kerberos principal name of the server - else null + * The principal name should be of the form: + * /_HOST@ like "hcat/_HOST@myrealm.com" + * The special string _HOST will be replaced automatically with the correct host name + */ + public static InputJobInfo create(String databaseName, + String tableName, + String filter, + String serverUri, + String serverKerberosPrincipal) { + return new InputJobInfo(databaseName,tableName,filter,serverUri,serverKerberosPrincipal); + } + + private InputJobInfo(String databaseName, + String tableName, + String filter, + String serverUri, + String serverKerberosPrincipal) { + this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName; + this.tableName = tableName; + this.serverUri = serverUri; + this.serverKerberosPrincipal = serverKerberosPrincipal; + this.filter = filter; + this.properties = new Properties(); + } + + /** + * Gets the value of databaseName + * @return the databaseName + */ + public String getDatabaseName() { + return databaseName; + } + + /** + * Gets the value of tableName + * @return the tableName + */ + public String getTableName() { + return tableName; + } + + /** + * Gets the table's meta information + * @return the HCatTableInfo + */ + public HCatTableInfo getTableInfo() { + return tableInfo; + } + + /** + * set the tablInfo instance + * this should be the same instance + * determined by this object's DatabaseName and TableName + * @param tableInfo + */ + void setTableInfo(HCatTableInfo tableInfo) { + this.tableInfo = tableInfo; + } + + /** + * @return the serverKerberosPrincipal + */ + public String getServerKerberosPrincipal() { + return serverKerberosPrincipal; + } + + /** + * Gets the value of serverUri + * @return the serverUri + */ + public String getServerUri() { + return serverUri; + } + + /** + * Gets the value of partition filter + * @return the filter string + */ + public String getFilter() { + return filter; + } + + /** + * @return partition info + */ + public List getPartitions() { + return partitions; + } + + /** + * @return partition info list + */ + void setPartitions(List partitions) { + this.partitions = partitions; + } + + /** + * Set/Get Property information to be passed down to *StorageDriver implementation + * put implementation specific storage driver configurations here + * @return + */ + public Properties getProperties() { + return properties; + } + +} diff --git src/java/org/apache/hcatalog/mapreduce/JobInfo.java src/java/org/apache/hcatalog/mapreduce/JobInfo.java deleted file mode 100644 index 800d5c3..0000000 --- src/java/org/apache/hcatalog/mapreduce/JobInfo.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hcatalog.mapreduce; - -import java.io.Serializable; -import java.util.List; - -import org.apache.hcatalog.data.schema.HCatSchema; - -/** The class used to serialize and store the information read from the metadata server */ -public class JobInfo implements Serializable{ - - /** The serialization version */ - private static final long serialVersionUID = 1L; - - /** The db and table names. */ - private final String dbName; - private final String tableName; - - /** The table schema. */ - private final HCatSchema tableSchema; - - /** The list of partitions matching the filter. */ - private final List partitions; - - /** - * Instantiates a new hcat job info. - * @param hcatTableInfo - * @param tableSchema the table schema - * @param partitions the partitions - */ - public JobInfo(HCatTableInfo hcatTableInfo, HCatSchema tableSchema, - List partitions) { - this.tableName = hcatTableInfo.getTableName(); - this.dbName = hcatTableInfo.getDatabaseName(); - this.tableSchema = tableSchema; - this.partitions = partitions; - } - - /** - * Gets the value of dbName - * @return the dbName - */ - public String getDatabaseName() { - return tableName; - } - - /** - * Gets the value of tableName - * @return the tableName - */ - public String getTableName() { - return tableName; - } - - /** - * Gets the value of tableSchema - * @return the tableSchema - */ - public HCatSchema getTableSchema() { - return tableSchema; - } - - /** - * Gets the value of partitions - * @return the partitions - */ - public List getPartitions() { - return partitions; - } - -} diff --git src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java index f3aac0f..9830190 100644 --- src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java +++ src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java @@ -22,75 +22,126 @@ import java.io.Serializable; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.Properties; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hcatalog.data.schema.HCatSchema; /** The class used to serialize and store the output related information */ -class OutputJobInfo implements Serializable { +public class OutputJobInfo implements Serializable { - /** The serialization version. */ - private static final long serialVersionUID = 1L; + /** The db and table names. */ + private final String databaseName; + private final String tableName; - /** The table info provided by user. */ - private final HCatTableInfo tableInfo; + /** The serialization version. */ + private static final long serialVersionUID = 1L; - /** The output schema. This is given to us by user. This wont contain any - * partition columns ,even if user has specified them. - * */ - private HCatSchema outputSchema; + /** The table info provided by user. */ + private HCatTableInfo tableInfo; - /** This is table schema, retrieved from metastore. */ - private final HCatSchema tableSchema; + /** The output schema. This is given to us by user. This wont contain any + * partition columns ,even if user has specified them. + * */ + private HCatSchema outputSchema; - /** The storer info */ - private final StorerInfo storerInfo; + /** The location of the partition being written */ + private String location; - /** The location of the partition being written */ - private final String location; + /** The partition values to publish to, if used for output*/ + private Map partitionValues; - /** The table being written to */ - private final Table table; + /** The Metadata server uri */ + private final String serverUri; - /** This is a list of partition columns which will be deleted from data, if - * data contains partition columns.*/ + /** If the hcat server is configured to work with hadoop security, this + * variable will hold the principal name of the server - this will be used + * in the authentication to the hcat server using kerberos + */ + private final String serverKerberosPrincipal; - private List posOfPartCols; - private List posOfDynPartCols; + private List posOfPartCols; + private List posOfDynPartCols; - private int maxDynamicPartitions; + private Properties properties; - private boolean harRequested; + private int maxDynamicPartitions; - /** - * @return the posOfPartCols - */ - protected List getPosOfPartCols() { - return posOfPartCols; - } + /** List of keys for which values were not specified at write setup time, to be infered at write time */ + private List dynamicPartitioningKeys; - /** - * @return the posOfDynPartCols - */ - protected List getPosOfDynPartCols() { - return posOfDynPartCols; - } + private boolean harRequested; - /** - * @param posOfPartCols the posOfPartCols to set - */ - protected void setPosOfPartCols(List posOfPartCols) { - // sorting the list in the descending order so that deletes happen back-to-front - Collections.sort(posOfPartCols, new Comparator () { - @Override - public int compare(Integer earlier, Integer later) { - return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1); - } - }); - this.posOfPartCols = posOfPartCols; - } + /** + * Initializes a new OutputJobInfo instance + * for writing data from a table. + * @param databaseName the db name + * @param tableName the table name + * @param partitionValues The partition values to publish to, can be null or empty Map to + * @param serverUri the Metadata server uri + * @param serverKerberosPrincipal If the hcat server is configured to + * work with hadoop security, the kerberos principal name of the server - else null + * The principal name should be of the form: + * /_HOST@ like "hcat/_HOST@myrealm.com" + * The special string _HOST will be replaced automatically with the correct host name + * indicate write to a unpartitioned table. For partitioned tables, this map should + * contain keys for all partition columns with corresponding values. + */ + public static OutputJobInfo create(String databaseName, + String tableName, + Map partitionValues, + String serverUri, + String serverKerberosPrincipal) { + return new OutputJobInfo(databaseName, + tableName, + partitionValues, + serverUri, + serverKerberosPrincipal); + } - /** + private OutputJobInfo(String databaseName, + String tableName, + Map partitionValues, + String serverUri, + String serverKerberosPrincipal) { + this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName; + this.tableName = tableName; + this.serverUri = serverUri; + this.serverKerberosPrincipal = serverKerberosPrincipal; + this.partitionValues = partitionValues; + this.properties = new Properties(); + } + + /** + * @return the posOfPartCols + */ + protected List getPosOfPartCols() { + return posOfPartCols; + } + + /** + * @return the posOfDynPartCols + */ + protected List getPosOfDynPartCols() { + return posOfDynPartCols; + } + + /** + * @param posOfPartCols the posOfPartCols to set + */ + protected void setPosOfPartCols(List posOfPartCols) { + // sorting the list in the descending order so that deletes happen back-to-front + Collections.sort(posOfPartCols, new Comparator () { + @Override + public int compare(Integer earlier, Integer later) { + return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1); + } + }); + this.posOfPartCols = posOfPartCols; + } + + /** * @param posOfDynPartCols the posOfDynPartCols to set */ protected void setPosOfDynPartCols(List posOfDynPartCols) { @@ -98,97 +149,153 @@ class OutputJobInfo implements Serializable { this.posOfDynPartCols = posOfDynPartCols; } - public OutputJobInfo(HCatTableInfo tableInfo, HCatSchema outputSchema, HCatSchema tableSchema, - StorerInfo storerInfo, String location, Table table) { - super(); - this.tableInfo = tableInfo; - this.outputSchema = outputSchema; - this.tableSchema = tableSchema; - this.storerInfo = storerInfo; - this.location = location; - this.table = table; - } + /** + * @return the tableInfo + */ + public HCatTableInfo getTableInfo() { + return tableInfo; + } - /** - * @return the tableInfo - */ - public HCatTableInfo getTableInfo() { - return tableInfo; - } + /** + * @return the outputSchema + */ + public HCatSchema getOutputSchema() { + return outputSchema; + } - /** - * @return the outputSchema - */ - public HCatSchema getOutputSchema() { - return outputSchema; - } + /** + * @param schema the outputSchema to set + */ + public void setOutputSchema(HCatSchema schema) { + this.outputSchema = schema; + } - /** - * @param schema the outputSchema to set - */ - public void setOutputSchema(HCatSchema schema) { - this.outputSchema = schema; - } + /** + * @return the location + */ + public String getLocation() { + return location; + } - /** - * @return the tableSchema - */ - public HCatSchema getTableSchema() { - return tableSchema; - } + /** + * @param location location to write to + */ + void setLocation(String location) { + this.location = location; + } + /** + * Sets the value of partitionValues + * @param partitionValues the partition values to set + */ + void setPartitionValues(Map partitionValues) { + this.partitionValues = partitionValues; + } - /** - * @return the storerInfo - */ - public StorerInfo getStorerInfo() { - return storerInfo; - } + /** + * Gets the value of partitionValues + * @return the partitionValues + */ + public Map getPartitionValues() { + return partitionValues; + } - /** - * @return the location - */ - public String getLocation() { - return location; - } + /** + * @return metastore thrift server URI + */ + public String getServerUri() { + return serverUri; + } - /** - * Gets the value of table - * @return the table - */ - public Table getTable() { - return table; - } + /** + * @return the serverKerberosPrincipal + */ + public String getServerKerberosPrincipal() { + return serverKerberosPrincipal; + } - /** - * Set maximum number of allowable dynamic partitions - * @param maxDynamicPartitions - */ - public void setMaximumDynamicPartitions(int maxDynamicPartitions){ - this.maxDynamicPartitions = maxDynamicPartitions; - } - - /** - * Returns maximum number of allowable dynamic partitions - * @return maximum number of allowable dynamic partitions - */ - public int getMaxDynamicPartitions() { - return this.maxDynamicPartitions; - } + /** + * set the tablInfo instance + * this should be the same instance + * determined by this object's DatabaseName and TableName + * @param tableInfo + */ + void setTableInfo(HCatTableInfo tableInfo) { + this.tableInfo = tableInfo; + } - /** - * Sets whether or not hadoop archiving has been requested for this job - * @param harRequested - */ - public void setHarRequested(boolean harRequested){ - this.harRequested = harRequested; - } - - /** - * Returns whether or not hadoop archiving has been requested for this job - * @return whether or not hadoop archiving has been requested for this job - */ - public boolean getHarRequested() { - return this.harRequested; - } + /** + * @return database name of table to write to + */ + public String getDatabaseName() { + return databaseName; + } + + /** + * @return name of table to write to + */ + public String getTableName() { + return tableName; + } + + /** + * Set/Get Property information to be passed down to *StorageDriver implementation + * put implementation specific storage driver configurations here + * @return + */ + public Properties getProperties() { + return properties; + } + + /** + * Set maximum number of allowable dynamic partitions + * @param maxDynamicPartitions + */ + public void setMaximumDynamicPartitions(int maxDynamicPartitions){ + this.maxDynamicPartitions = maxDynamicPartitions; + } + + /** + * Returns maximum number of allowable dynamic partitions + * @return maximum number of allowable dynamic partitions + */ + public int getMaxDynamicPartitions() { + return this.maxDynamicPartitions; + } + + /** + * Sets whether or not hadoop archiving has been requested for this job + * @param harRequested + */ + public void setHarRequested(boolean harRequested){ + this.harRequested = harRequested; + } + + /** + * Returns whether or not hadoop archiving has been requested for this job + * @return whether or not hadoop archiving has been requested for this job + */ + public boolean getHarRequested() { + return this.harRequested; + } + + /** + * Returns whether or not Dynamic Partitioning is used + * @return whether or not dynamic partitioning is currently enabled and used + */ + public boolean isDynamicPartitioningUsed() { + return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty())); + } + + /** + * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys + * @param dynamicPartitioningKeys + */ + public void setDynamicPartitioningKeys(List dynamicPartitioningKeys) { + this.dynamicPartitioningKeys = dynamicPartitioningKeys; + } + + public List getDynamicPartitioningKeys(){ + return this.dynamicPartitioningKeys; + } } diff --git src/java/org/apache/hcatalog/pig/HCatLoader.java src/java/org/apache/hcatalog/pig/HCatLoader.java index 8abcd92..cce938b 100644 --- src/java/org/apache/hcatalog/pig/HCatLoader.java +++ src/java/org/apache/hcatalog/pig/HCatLoader.java @@ -31,7 +31,7 @@ import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.Pair; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hcatalog.mapreduce.HCatTableInfo; +import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.pig.Expression; import org.apache.pig.Expression.BinaryExpression; import org.apache.pig.LoadFunc; @@ -82,14 +82,12 @@ public class HCatLoader extends HCatBaseLoader { // in the hadoop front end mapred.task.id property will not be set in // the Configuration if (!HCatUtil.checkJobContextIfRunningFromBackend(job)){ - - HCatInputFormat.setInput(job, HCatTableInfo.getInputTableInfo( - hcatServerUri!=null ? hcatServerUri : - (hcatServerUri = PigHCatUtil.getHCatServerUri(job)), - PigHCatUtil.getHCatServerPrincipal(job), - dbName, - tableName, - getPartitionFilterString())); + HCatInputFormat.setInput(job, + InputJobInfo.create(dbName, + tableName, + getPartitionFilterString(), + hcatServerUri != null ? hcatServerUri : (hcatServerUri = PigHCatUtil.getHCatServerUri(job)), + PigHCatUtil.getHCatServerPrincipal(job))); } // Need to also push projections by calling setOutputSchema on diff --git src/java/org/apache/hcatalog/pig/HCatStorer.java src/java/org/apache/hcatalog/pig/HCatStorer.java index e81537c..90d14c0 100644 --- src/java/org/apache/hcatalog/pig/HCatStorer.java +++ src/java/org/apache/hcatalog/pig/HCatStorer.java @@ -30,7 +30,7 @@ import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatOutputCommitter; import org.apache.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hcatalog.mapreduce.HCatTableInfo; +import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -72,13 +72,20 @@ public class HCatStorer extends HCatBaseStorer { Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}); String[] userStr = location.split("\\."); - HCatTableInfo tblInfo; + OutputJobInfo outputJobInfo; + if(userStr.length == 2) { - tblInfo = HCatTableInfo.getOutputTableInfo(PigHCatUtil.getHCatServerUri(job), - PigHCatUtil.getHCatServerPrincipal(job), userStr[0],userStr[1],partitions); + outputJobInfo = OutputJobInfo.create(userStr[0], + userStr[1], + partitions, + PigHCatUtil.getHCatServerUri(job), + PigHCatUtil.getHCatServerPrincipal(job)); } else { - tblInfo = HCatTableInfo.getOutputTableInfo(PigHCatUtil.getHCatServerUri(job), - PigHCatUtil.getHCatServerPrincipal(job), null,userStr[0],partitions); + outputJobInfo = OutputJobInfo.create(null, + userStr[0], + partitions, + PigHCatUtil.getHCatServerUri(job), + PigHCatUtil.getHCatServerPrincipal(job)); } @@ -94,7 +101,7 @@ public class HCatStorer extends HCatBaseStorer { throw new FrontendException("Schema for data cannot be determined.", PigHCatUtil.PIG_EXCEPTION_CODE); } try{ - HCatOutputFormat.setOutput(job, tblInfo); + HCatOutputFormat.setOutput(job, outputJobInfo); } catch(HCatException he) { // pass the message to the user - essentially something about the table // information passed to HCatOutputFormat was not right diff --git src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java index 19f88f3..2dee111 100644 --- src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java +++ src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java @@ -263,8 +263,8 @@ public abstract class HCatMapReduceTest extends TestCase { job.setOutputFormatClass(HCatOutputFormat.class); - HCatTableInfo outputInfo = HCatTableInfo.getOutputTableInfo(thriftUri, null, dbName, tableName, partitionValues); - HCatOutputFormat.setOutput(job, outputInfo); + OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues, thriftUri, null); + HCatOutputFormat.setOutput(job, outputJobInfo); job.setMapOutputKeyClass(BytesWritable.class); job.setMapOutputValueClass(DefaultHCatRecord.class); @@ -300,9 +300,8 @@ public abstract class HCatMapReduceTest extends TestCase { job.setInputFormatClass(HCatInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo( - thriftUri, null, dbName, tableName, filter); - HCatInputFormat.setInput(job, inputInfo); + InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,filter,thriftUri,null); + HCatInputFormat.setInput(job, inputJobInfo); job.setMapOutputKeyClass(BytesWritable.class); job.setMapOutputValueClass(Text.class); @@ -333,8 +332,8 @@ public abstract class HCatMapReduceTest extends TestCase { job.setInputFormatClass(HCatInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(thriftUri, null, dbName, tableName); - HCatInputFormat.setInput(job, inputInfo); + InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,null,thriftUri,null); + HCatInputFormat.setInput(job, inputJobInfo); return HCatInputFormat.getTableSchema(job); } diff --git src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java index dcdcb40..506ed4e 100644 --- src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java +++ src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java @@ -142,20 +142,19 @@ public class TestHCatOutputFormat extends TestCase { Map partitionValues = new HashMap(); partitionValues.put("colname", "p1"); //null server url means local mode - HCatTableInfo info = HCatTableInfo.getOutputTableInfo(null, null, dbName, tblName, partitionValues); + OutputJobInfo info = OutputJobInfo.create(dbName, tblName, partitionValues, null, null); HCatOutputFormat.setOutput(job, info); OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(job); assertNotNull(jobInfo.getTableInfo()); - assertEquals(1, jobInfo.getTableInfo().getPartitionValues().size()); - assertEquals("p1", jobInfo.getTableInfo().getPartitionValues().get("colname")); - assertEquals(1, jobInfo.getTableSchema().getFields().size()); - assertEquals("colname", jobInfo.getTableSchema().getFields().get(0).getName()); + assertEquals(1, jobInfo.getPartitionValues().size()); + assertEquals("p1", jobInfo.getPartitionValues().get("colname")); + assertEquals(1, jobInfo.getTableInfo().getDataColumns().getFields().size()); + assertEquals("colname", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName()); - StorerInfo storer = jobInfo.getStorerInfo(); + StorerInfo storer = jobInfo.getTableInfo().getStorerInfo(); assertEquals(RCFileOutputDriver.class.getName(), storer.getOutputSDClass()); - publishTest(job); }