Index: pig/HCatStorer.java =================================================================== --- pig/HCatStorer.java (revision 1241662) +++ pig/HCatStorer.java (working copy) @@ -80,15 +80,11 @@ if(userStr.length == 2) { outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], - partitions, - PigHCatUtil.getHCatServerUri(job), - PigHCatUtil.getHCatServerPrincipal(job)); + partitions); } else { outputJobInfo = OutputJobInfo.create(null, userStr[0], - partitions, - PigHCatUtil.getHCatServerUri(job), - PigHCatUtil.getHCatServerPrincipal(job)); + partitions); } Index: mapreduce/HCatRecordReader.java =================================================================== --- mapreduce/HCatRecordReader.java (revision 1241662) +++ mapreduce/HCatRecordReader.java (working copy) @@ -22,50 +22,74 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; +import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hcatalog.data.DefaultHCatRecord; + import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.LazyHCatRecord; /** The HCat wrapper for the underlying RecordReader, this ensures that the initialize on * the underlying record reader is done with the underlying split, not with HCatSplit. */ -class HCatRecordReader extends RecordReader - implements org.apache.hadoop.mapred.RecordReader { +class HCatRecordReader extends RecordReader { Log LOG = LogFactory.getLog(HCatRecordReader.class); int lineCount = 0; /** The underlying record reader to delegate to. */ - private final RecordReader baseRecordReader; + //org.apache.hadoop.mapred. + private final RecordReader + baseRecordReader; /** The storage driver used */ - private final HCatInputStorageDriver storageDriver; + private final HCatStorageHandler storageHandler; /** * Instantiates a new hcat record reader. * @param baseRecordReader the base record reader */ - public HCatRecordReader(HCatInputStorageDriver storageDriver, RecordReader baseRecordReader) { - this.baseRecordReader = baseRecordReader; - this.storageDriver = storageDriver; + public HCatRecordReader(HCatStorageHandler storageHandler, + org.apache.hadoop.mapred.RecordReader baseRecordReader) { + //this.baseRecordReader = baseRecordReader; + this.baseRecordReader = null; + this.storageHandler = storageHandler; } + + //FIXME + //needs integration here + //Function not needed + @Override + public void initialize(org.apache.hadoop.mapreduce.InputSplit split, + TaskAttemptContext taskContext) + throws IOException, InterruptedException { + } /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) */ - @Override - public void initialize(InputSplit split, TaskAttemptContext taskContext) +// @Override + public void initialize(org.apache.hadoop.mapred.InputSplit split, + TaskAttemptContext taskContext) throws IOException, InterruptedException { - InputSplit baseSplit = split; + org.apache.hadoop.mapred.InputSplit baseSplit = split; if( split instanceof HCatSplit ) { baseSplit = ((HCatSplit) split).getBaseSplit(); } - baseRecordReader.initialize(baseSplit, taskContext); + //FIXME + //needs Integration here + /* + JobConf jobConf = new JobConf( + ((JobContext)taskContext).getConfiguration()); + baseRecordReader.initialize(baseSplit, jobConf); + */ } /* (non-Javadoc) @@ -80,9 +104,18 @@ * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() */ @Override - public HCatRecord getCurrentValue() throws IOException, InterruptedException { - HCatRecord r = storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue()); + public HCatRecord getCurrentValue() + throws IOException, InterruptedException { + /* FIXME + HCatRecord r = new LazyHCatRecord(baseRecordReader.getCurrentValue(), + HCatUtils.getObjectInspector( + storageHandler.getSerDeClass(), + storageHandler.getConf(), + null)); return r; + */ + + return null; } /* (non-Javadoc) @@ -119,7 +152,8 @@ baseRecordReader.close(); } - @Override + //FIXME + //@Override public Object createKey() { WritableComparable o = null; try { @@ -134,17 +168,20 @@ return o; } - @Override + //FIXME + //@Override public Object createValue() { return new DefaultHCatRecord(); } - @Override + //FIXME + //@Override public long getPos() throws IOException { return lineCount; } - @Override + //FIXME + //@Override public boolean next(Object key, Object value) throws IOException { try { if (!nextKeyValue()){ Index: mapreduce/FileOutputStorageDriver.java =================================================================== --- mapreduce/FileOutputStorageDriver.java (revision 1241662) +++ mapreduce/FileOutputStorageDriver.java (working copy) @@ -120,7 +120,8 @@ @Override OutputFormatContainer getOutputFormatContainer(OutputFormat outputFormat) { - return new FileOutputFormatContainer(outputFormat); + //broken + return new FileOutputFormatContainer(null); } } Index: mapreduce/HCatOutputStorageDriver.java =================================================================== --- mapreduce/HCatOutputStorageDriver.java (revision 1241662) +++ mapreduce/HCatOutputStorageDriver.java (working copy) @@ -174,8 +174,10 @@ * @param outputFormat format the returned container will contain * @return */ + + //TODO broken this entire class will disappear anyway OutputFormatContainer getOutputFormatContainer(OutputFormat outputFormat) { - return new DefaultOutputFormatContainer(outputFormat); + return new DefaultOutputFormatContainer(null); } } Index: mapreduce/HCatTableInfo.java =================================================================== --- mapreduce/HCatTableInfo.java (revision 1241662) +++ mapreduce/HCatTableInfo.java (working copy) @@ -117,6 +117,10 @@ return storerInfo; } + public String getTableLocation() { + return table.getSd().getLocation(); + } + /** * minimize dependency on hive classes so this is package private * this should eventually no longer be used @@ -134,7 +138,7 @@ */ static HCatTableInfo valueOf(Table table) throws IOException { HCatSchema dataColumns = HCatUtil.extractSchemaFromStorageDescriptor(table.getSd()); - StorerInfo storerInfo = InitializeInput.extractStorerInfo(table.getSd(), table.getParameters()); + StorerInfo storerInfo = InternalUtil.extractStorerInfo(table.getSd(), table.getParameters()); HCatSchema partitionColumns = HCatUtil.getPartitionColumns(table); return new HCatTableInfo(table.getDbName(), table.getTableName(), Index: mapreduce/HCatBaseInputFormat.java =================================================================== --- mapreduce/HCatBaseInputFormat.java (revision 1241662) +++ mapreduce/HCatBaseInputFormat.java (working copy) @@ -21,22 +21,48 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; +import java.util.Map; +import java.util.LinkedHashMap; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -public abstract class HCatBaseInputFormat extends InputFormat { +public abstract class HCatBaseInputFormat + extends InputFormat { /** * get the schema for the HCatRecord data returned by HCatInputFormat. @@ -44,8 +70,15 @@ * @param context the jobContext * @throws IllegalArgumentException */ - public static HCatSchema getOutputSchema(JobContext context) throws Exception { - String os = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); + private Class inputFileFormatClass; + + /** Jobproperties as required by Hive */ + private Map jobProperties; + + public static HCatSchema getOutputSchema(JobContext context) + throws Exception { + String os = context.getConfiguration().get( + HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); if (os == null) { return getTableSchema(context); } else { @@ -58,10 +91,19 @@ * @param job the job object * @param hcatSchema the schema to use as the consolidated schema */ - public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception { - job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema)); + public static void setOutputSchema(Job job,HCatSchema hcatSchema) + throws Exception { + job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, + HCatUtil.serialize(hcatSchema)); } + public static + org.apache.hadoop.mapred.InputFormat + getInputFormat (JobConf job, Class inputFormatClass) throws IOException { + return ( + org.apache.hadoop.mapred.InputFormat) + ReflectionUtils.newInstance(inputFormatClass, job); + } /** * Logically split the set of input files for the job. Returns the @@ -84,6 +126,34 @@ throw new IOException(e); } + StorageDescriptor tableSD; + HiveMetaStoreClient client = null; + JobConf jobConf; + try { + // To be able to call the storage handlers of hive, + // we need to convert the jobContext into a jobConf + // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) + // begin conversion.. + Configuration conf = jobContext.getConfiguration(); + jobConf = new JobConf(conf); + // ..end of conversion + + client = HCatUtil.createHiveClient(null, conf, HCatBaseInputFormat.class); + Table table = client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName()); + tableSD = table.getSd(); + } catch (Exception e) { + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } + } finally { + if (client != null) { + client.close(); + } + } + List splits = new ArrayList(); List partitionInfoList = inputJobInfo.getPartitions(); if(partitionInfoList == null ) { @@ -91,34 +161,61 @@ return splits; } + Path[] dirs = FileInputFormat.getInputPaths(jobConf); + HCatStorageHandler storageHandler; //For each matching partition, call getSplits on the underlying InputFormat for(PartInfo partitionInfo : partitionInfoList) { Job localJob = new Job(jobContext.getConfiguration()); - HCatInputStorageDriver storageDriver; try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); + storageHandler = HCatUtil.getStorageHandler( + jobContext.getConfiguration(), + partitionInfo.getInputStorageHandlerClass(), + tableSD.getSerdeInfo().getSerializationLib(), + tableSD.getInputFormat(), + tableSD.getOutputFormat()); + if (storageHandler != null) { + Map jobProperties = new LinkedHashMap(); + storageHandler.configureInputJobProperties( + partitionInfo.getTableDesc(), jobProperties); + if (!jobProperties.isEmpty()) { + partitionInfo.getTableDesc().setJobProperties(jobProperties); + } + } } catch (Exception e) { - throw new IOException(e); + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } } + setInputPath(jobContext, partitionInfo.getLocation()); + HCatSchema allCols = new HCatSchema(new LinkedList()); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getDataColumns().getFields()) allCols.append(field); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getPartitionColumns().getFields()) allCols.append(field); //Pass all required information to the storage driver - initStorageDriver(storageDriver, localJob, partitionInfo, allCols); + Utilities.copyTableJobPropertiesToConf(partitionInfo.getTableDesc(), + jobConf); //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getInputFormat(jobConf, inputFormatClass); //Call getSplit on the storage drivers InputFormat, create an //HCatSplit for each underlying split - List baseSplits = inputFormat.getSplits(localJob); + //NumSplits is 0 for our purposes + org.apache.hadoop.mapred.InputSplit[] baseSplits = + inputFormat.getSplits(jobConf, 0); - for(InputSplit split : baseSplits) { + for(org.apache.hadoop.mapred.InputSplit split : baseSplits) { splits.add(new HCatSplit( partitionInfo, split, @@ -141,36 +238,95 @@ * @throws IOException or InterruptedException */ @Override - public RecordReader createRecordReader(InputSplit split, + public RecordReader + createRecordReader(InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException { HCatSplit hcatSplit = (HCatSplit) split; PartInfo partitionInfo = hcatSplit.getPartitionInfo(); + JobContext jobContext = (JobContext)taskContext; + InputJobInfo inputJobInfo; - //If running through a Pig job, the InputJobInfo will not be available in the - //backend process context (since HCatLoader works on a copy of the JobContext and does + try { + inputJobInfo = getJobInfo(jobContext); + } catch (Exception e) { + throw new IOException(e); + } + + StorageDescriptor tableSD; + HiveMetaStoreClient client = null; + JobConf jobConf; + try { + // To be able to call the storage handlers of hive, + // we need to convert the jobContext into a jobConf + // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) + // begin conversion.. + Configuration conf = jobContext.getConfiguration(); + jobConf = new JobConf(conf); + // ..end of conversion + + client = HCatUtil.createHiveClient(null, conf, HCatBaseInputFormat.class); + Table table = client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName()); + tableSD = table.getSd(); + } catch (Exception e) { + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } + } finally { + if (client != null) { + client.close(); + } + } + + //If running through a Pig job, + //the InputJobInfo will not be available in the + //backend process context (since HCatLoader works + //on a copy of the JobContext and does //not call HCatInputFormat.setInput in the backend process). //So this function should NOT attempt to read the InputJobInfo. - - HCatInputStorageDriver storageDriver; + HCatStorageHandler storageHandler; try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); + storageHandler = HCatUtil.getStorageHandler( + jobContext.getConfiguration(), + partitionInfo.getInputStorageHandlerClass(), + tableSD.getSerdeInfo().getSerializationLib(), + tableSD.getInputFormat(), + tableSD.getOutputFormat()); + if (storageHandler != null) { + Map jobProperties = new LinkedHashMap(); + storageHandler.configureInputJobProperties( + partitionInfo.getTableDesc(), jobProperties); + if (!jobProperties.isEmpty()) { + partitionInfo.getTableDesc().setJobProperties(jobProperties); + } + } } catch (Exception e) { - throw new IOException(e); + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } } - //Pass all required information to the storage driver - initStorageDriver(storageDriver, taskContext, partitionInfo, hcatSplit.getTableSchema()); + setInputPath(jobContext, partitionInfo.getLocation()); - //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getInputFormat(jobConf, inputFormatClass); + Utilities.copyTableJobPropertiesToConf(partitionInfo.getTableDesc(), + jobConf); + //Create the underlying input formats record record and an HCat wrapper - RecordReader recordReader = - inputFormat.createRecordReader(hcatSplit.getBaseSplit(), taskContext); + //need to pass reporter to the underlying getRecordReader + org.apache.hadoop.mapred.RecordReader recordReader = + inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, null); - return new HCatRecordReader(storageDriver,recordReader); + return new HCatRecordReader(storageHandler,recordReader); } /** @@ -208,62 +364,61 @@ return (InputJobInfo) HCatUtil.deserialize(jobString); } + public void setInputPath(JobContext jobContext, String location) throws IOException{ - /** - * Initializes the storage driver instance. Passes on the required - * schema information, path info and arguments for the supported - * features to the storage driver. - * @param storageDriver the storage driver - * @param context the job context - * @param partitionInfo the partition info - * @param tableSchema the table level schema - * @throws IOException Signals that an I/O exception has occurred. - */ - private void initStorageDriver(HCatInputStorageDriver storageDriver, - JobContext context, PartInfo partitionInfo, - HCatSchema tableSchema) throws IOException { + // ideally we should just call FileInputFormat.setInputPaths() here - but + // that won't work since FileInputFormat.setInputPaths() needs + // a Job object instead of a JobContext which we are handed here - storageDriver.setInputPath(context, partitionInfo.getLocation()); + int length = location.length(); + int curlyOpen = 0; + int pathStart = 0; + boolean globPattern = false; + List pathStrings = new ArrayList(); - if( partitionInfo.getPartitionSchema() != null ) { - storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema()); + for (int i=0; i driverClass = - (Class) - Class.forName(inputStorageDriverClass); - return driverClass.newInstance(); - } catch(Exception e) { - throw new Exception("error creating storage driver " + - inputStorageDriverClass, e); - } - } - } Index: mapreduce/HCatBaseOutputFormat.java =================================================================== --- mapreduce/HCatBaseOutputFormat.java (revision 1241662) +++ mapreduce/HCatBaseOutputFormat.java (working copy) @@ -23,14 +23,14 @@ import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -62,7 +62,7 @@ @Override public void checkOutputSpecs(JobContext context ) throws IOException, InterruptedException { - getOutputFormat(context).checkOutputSpecs(context); + getOutputFormat(context).checkOutputSpecs(context); } /** @@ -73,8 +73,10 @@ */ protected OutputFormat, HCatRecord> getOutputFormat(JobContext context) throws IOException { OutputJobInfo jobInfo = getJobInfo(context); - HCatOutputStorageDriver driver = getOutputDriverInstance(context, jobInfo); - return driver.getOutputFormatContainer(driver.getOutputFormat()); + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + //why do we need this? + configureOutputStorageHandler(context); + return storageHandler.getOutputFormatContainer(ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(),context.getConfiguration())); } /** @@ -97,31 +99,27 @@ /** * Gets the output storage driver instance. * @param jobContext the job context - * @param jobInfo the output job info * @return the output driver instance * @throws IOException */ @SuppressWarnings("unchecked") - static HCatOutputStorageDriver getOutputDriverInstance( - JobContext jobContext, OutputJobInfo jobInfo) throws IOException { - return getOutputDriverInstance(jobContext,jobInfo,(List)null); + static void configureOutputStorageHandler( + JobContext jobContext) throws IOException { + configureOutputStorageHandler(jobContext,(List)null); } /** * Gets the output storage driver instance, with allowing specification of missing dynamic partvals * @param jobContext the job context - * @param jobInfo the output job info * @return the output driver instance * @throws IOException */ @SuppressWarnings("unchecked") - static HCatOutputStorageDriver getOutputDriverInstance( - JobContext jobContext, OutputJobInfo jobInfo, List dynamicPartVals) throws IOException { + static void configureOutputStorageHandler( + JobContext jobContext, List dynamicPartVals) throws IOException { try { - Class driverClass = - (Class) - Class.forName(jobInfo.getTableInfo().getStorerInfo().getOutputSDClass()); - HCatOutputStorageDriver driver = driverClass.newInstance(); + OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(jobContext.getConfiguration(),jobInfo.getTableInfo().getStorerInfo()); Map partitionValues = jobInfo.getPartitionValues(); String location = jobInfo.getLocation(); @@ -139,28 +137,17 @@ partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i)); } - // re-home location, now that we know the rest of the partvals - Table table = jobInfo.getTableInfo().getTable(); - - List partitionCols = new ArrayList(); - for(FieldSchema schema : table.getPartitionKeys()) { - partitionCols.add(schema.getName()); - } - - location = driver.getOutputLocation(jobContext, - table.getSd().getLocation() , partitionCols, - partitionValues,jobContext.getConfiguration().get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)); +// // re-home location, now that we know the rest of the partvals +// Table table = jobInfo.getTableInfo().getTable(); +// +// List partitionCols = new ArrayList(); +// for(FieldSchema schema : table.getPartitionKeys()) { +// partitionCols.add(schema.getName()); +// } + jobInfo.setPartitionValues(partitionValues); } - //Initialize the storage driver - driver.setSchema(jobContext, jobInfo.getOutputSchema()); - driver.setPartitionValues(jobContext, partitionValues); - driver.setOutputPath(jobContext, location); - - driver.initialize(jobContext, jobInfo.getTableInfo().getStorerInfo().getProperties()); - -// HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues); - return driver; + HCatUtil.configureOutputStorageHandler(storageHandler,jobContext,jobInfo); } catch(Exception e) { if (e instanceof HCatException){ throw (HCatException)e; @@ -179,18 +166,18 @@ * @throws IOException */ - protected static HCatOutputStorageDriver getOutputDriverInstance( + protected static void configureOutputStorageHandler( JobContext context, OutputJobInfo jobInfo, Map fullPartSpec) throws IOException { List dynamicPartKeys = jobInfo.getDynamicPartitioningKeys(); if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){ - return getOutputDriverInstance(context,jobInfo,(List)null); + configureOutputStorageHandler(context, (List) null); }else{ List dynKeyVals = new ArrayList(); for (String dynamicPartKey : dynamicPartKeys){ dynKeyVals.add(fullPartSpec.get(dynamicPartKey)); } - return getOutputDriverInstance(context,jobInfo,dynKeyVals); + configureOutputStorageHandler(context, dynKeyVals); } } @@ -239,7 +226,7 @@ } static void cancelDelegationTokens(JobContext context, OutputJobInfo outputJobInfo) throws Exception { - HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(outputJobInfo.getServerUri(), context.getConfiguration()); + HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration()); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by // HCatOutputFormat and not if they were supplied by Oozie. In the latter Index: mapreduce/OutputJobInfo.java =================================================================== --- mapreduce/OutputJobInfo.java (revision 1241662) +++ mapreduce/OutputJobInfo.java (working copy) @@ -52,15 +52,6 @@ /** The partition values to publish to, if used for output*/ private Map partitionValues; - /** The Metadata server uri */ - private final String serverUri; - - /** If the hcat server is configured to work with hadoop security, this - * variable will hold the principal name of the server - this will be used - * in the authentication to the hcat server using kerberos - */ - private final String serverKerberosPrincipal; - private List posOfPartCols; private List posOfDynPartCols; @@ -79,8 +70,6 @@ * @param databaseName the db name * @param tableName the table name * @param partitionValues The partition values to publish to, can be null or empty Map to - * @param serverUri the Metadata server uri - * @param serverKerberosPrincipal If the hcat server is configured to * work with hadoop security, the kerberos principal name of the server - else null * The principal name should be of the form: * /_HOST@ like "hcat/_HOST@myrealm.com" @@ -90,25 +79,17 @@ */ public static OutputJobInfo create(String databaseName, String tableName, - Map partitionValues, - String serverUri, - String serverKerberosPrincipal) { + Map partitionValues) { return new OutputJobInfo(databaseName, tableName, - partitionValues, - serverUri, - serverKerberosPrincipal); + partitionValues); } private OutputJobInfo(String databaseName, String tableName, - Map partitionValues, - String serverUri, - String serverKerberosPrincipal) { + Map partitionValues) { this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName; this.tableName = tableName; - this.serverUri = serverUri; - this.serverKerberosPrincipal = serverKerberosPrincipal; this.partitionValues = partitionValues; this.properties = new Properties(); } @@ -180,7 +161,7 @@ /** * @param location location to write to */ - void setLocation(String location) { + public void setLocation(String location) { this.location = location; } /** @@ -200,20 +181,6 @@ } /** - * @return metastore thrift server URI - */ - public String getServerUri() { - return serverUri; - } - - /** - * @return the serverKerberosPrincipal - */ - public String getServerKerberosPrincipal() { - return serverKerberosPrincipal; - } - - /** * set the tablInfo instance * this should be the same instance * determined by this object's DatabaseName and TableName Index: mapreduce/HCatInputFormat.java =================================================================== --- mapreduce/HCatInputFormat.java (revision 1241662) +++ mapreduce/HCatInputFormat.java (working copy) @@ -20,8 +20,13 @@ import java.io.IOException; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hcatalog.data.HCatRecord; + /** The InputFormat to use to read data from HCat */ public class HCatInputFormat extends HCatBaseInputFormat { @@ -42,6 +47,5 @@ throw new IOException(e); } } - - + } Index: mapreduce/HCatOutputFormat.java =================================================================== --- mapreduce/HCatOutputFormat.java (revision 1241662) +++ mapreduce/HCatOutputFormat.java (working copy) @@ -73,7 +73,7 @@ try { Configuration conf = job.getConfiguration(); - client = createHiveClient(outputJobInfo.getServerUri(), conf); + client = createHiveClient(null, conf); Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName()); if (table.getPartitionKeysSize() == 0 ){ @@ -95,10 +95,8 @@ } } - if ( - (outputJobInfo.getPartitionValues() == null) - || (outputJobInfo.getPartitionValues().size() < table.getPartitionKeysSize()) - ){ + if ((outputJobInfo.getPartitionValues() == null) + || (outputJobInfo.getPartitionValues().size() < table.getPartitionKeysSize())){ // dynamic partition usecase - partition values were null, or not all were specified // need to figure out which keys are not specified. List dynamicPartitioningKeys = new ArrayList(); @@ -131,32 +129,25 @@ StorageDescriptor tblSD = table.getSd(); HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD); - StorerInfo storerInfo = InitializeInput.extractStorerInfo(tblSD,table.getParameters()); + StorerInfo storerInfo = InternalUtil.extractStorerInfo(tblSD,table.getParameters()); List partitionCols = new ArrayList(); for(FieldSchema schema : table.getPartitionKeys()) { partitionCols.add(schema.getName()); } - Class driverClass = - (Class) Class.forName(storerInfo.getOutputSDClass()); - HCatOutputStorageDriver driver = driverClass.newInstance(); + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(job.getConfiguration(), storerInfo); - String tblLocation = tblSD.getLocation(); - String location = driver.getOutputLocation(job, - tblLocation, partitionCols, - outputJobInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)); - //Serialize the output info into the configuration outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); outputJobInfo.setOutputSchema(tableSchema); - outputJobInfo.setLocation(location); outputJobInfo.setHarRequested(harRequested); outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions); - conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); - Path tblPath = new Path(tblLocation); + HCatUtil.configureOutputStorageHandler(storageHandler,job,outputJobInfo); + Path tblPath = new Path(table.getSd().getLocation()); + /* Set the umask in conf such that files/dirs get created with table-dir * permissions. Following three assumptions are made: * 1. Actual files/dirs creation is done by RecordWriter of underlying @@ -231,6 +222,7 @@ return getOutputFormat(context).getOutputCommitter(context); } + //TODO remove url component, everything should be encapsulated in HiveConf static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException { HiveConf hiveConf = getHiveConf(url, conf); // HCatUtil.logHiveConf(LOG, hiveConf); Index: mapreduce/OutputFormatContainer.java =================================================================== --- mapreduce/OutputFormatContainer.java (revision 1241662) +++ mapreduce/OutputFormatContainer.java (working copy) @@ -33,19 +33,19 @@ * such as partitioning isn't supported. */ abstract class OutputFormatContainer extends OutputFormat, HCatRecord> { - private OutputFormat, ? super Writable> of; + private org.apache.hadoop.mapred.OutputFormat, ? super Writable> of; /** * @param of OutputFormat this instance will contain */ - public OutputFormatContainer(OutputFormat,? super Writable> of) { + public OutputFormatContainer(org.apache.hadoop.mapred.OutputFormat,? super Writable> of) { this.of = of; } /** * @return underlying OutputFormat */ - public OutputFormat getBaseOutputFormat() { + public org.apache.hadoop.mapred.OutputFormat getBaseOutputFormat() { return of; } Index: mapreduce/FileOutputFormatContainer.java =================================================================== --- mapreduce/FileOutputFormatContainer.java (revision 1241662) +++ mapreduce/FileOutputFormatContainer.java (working copy) @@ -29,13 +29,15 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.thrift.TException; @@ -49,7 +51,6 @@ * This implementation supports the following HCatalog features: partitioning, dynamic partitioning, Hadoop Archiving, etc. */ class FileOutputFormatContainer extends OutputFormatContainer { - private OutputFormat, ? super Writable> of; private static final PathFilter hiddenFileFilter = new PathFilter(){ public boolean accept(Path p){ @@ -61,19 +62,29 @@ /** * @param of base OutputFormat to contain */ - public FileOutputFormatContainer(OutputFormat, ? super Writable> of) { + public FileOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, ? super Writable> of) { super(of); - this.of = of; } @Override public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + //this needs to be manually set, under normal circumstances MR Task does this + setWorkOutputPath(context); + // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null. // (That's because records can't be written until the values of the dynamic partitions are deduced. // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.) - return new FileRecordWriterContainer(HCatOutputFormat.getJobInfo(context) - .isDynamicPartitioningUsed()? null : of.getRecordWriter(context), - context); + RecordWriter, HCatRecord> rw = + new FileRecordWriterContainer( + HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed()? + null: + getBaseOutputFormat() + .getRecordWriter(null, + new JobConf(context.getConfiguration()), + context.getTaskAttemptID().toString(), + InternalUtil.createReporter(context)), + context); + return rw; } @Override @@ -82,7 +93,7 @@ try { handleDuplicatePublish(context, jobInfo, - HCatOutputFormat.createHiveClient(jobInfo.getServerUri(),context.getConfiguration()), + HCatOutputFormat.createHiveClient(null,context.getConfiguration()), jobInfo.getTableInfo().getTable()); } catch (MetaException e) { throw new IOException(e); @@ -91,12 +102,23 @@ } catch (NoSuchObjectException e) { throw new IOException(e); } - of.checkOutputSpecs(context); + + if(!jobInfo.isDynamicPartitioningUsed()) { + JobConf jobConf = new JobConf(context.getConfiguration()); + getBaseOutputFormat().checkOutputSpecs(null, jobConf); + //checkoutputspecs might've set some properties we need to have context reflect that + HCatUtil.copyConf(jobConf,context.getConfiguration()); + } } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return new FileOutputCommitterContainer(context,of.getOutputCommitter(context)); + //this needs to be manually set, under normal circumstances MR Task does this + setWorkOutputPath(context); + return new FileOutputCommitterContainer(context, + HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed()? + null: + new JobConf(context.getConfiguration()).getOutputCommitter()); } /** @@ -186,4 +208,13 @@ return values; } + + static void setWorkOutputPath(TaskAttemptContext context) throws IOException { + String outputPath = context.getConfiguration().get("mapred.output.dir"); + //we need to do this to get the task path and set it for mapred implementation + //since it can't be done automatically because of mapreduce->mapred abstraction + if(outputPath != null) + context.getConfiguration().set("mapred.work.output.dir", + new FileOutputCommitter(new Path(outputPath), context).getWorkPath().toString()); + } } Index: mapreduce/FileRecordWriterContainer.java =================================================================== --- mapreduce/FileRecordWriterContainer.java (revision 1241662) +++ mapreduce/FileRecordWriterContainer.java (working copy) @@ -18,14 +18,24 @@ package org.apache.hcatalog.mapreduce; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.HCatMapRedUtil; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import java.io.IOException; @@ -40,15 +50,19 @@ */ class FileRecordWriterContainer extends RecordWriterContainer { - private final HCatOutputStorageDriver storageDriver; + private final HCatStorageHandler storageHandler; + private final SerDe serDe; + private final ObjectInspector objectInspector; private boolean dynamicPartitioningUsed = false; // static final private Log LOG = LogFactory.getLog(FileRecordWriterContainer.class); - private final Map, ? super Writable>> baseDynamicWriters; - private final Map baseDynamicStorageDrivers; - private final Map baseDynamicCommitters; + private final Map, ? super Writable>> baseDynamicWriters; + private final Map baseDynamicSerDe; + private final Map baseDynamicCommitters; + private final Map dynamicContexts; + private final Map dynamicObjectInspectors; private final List partColsToDel; @@ -64,12 +78,21 @@ * @throws IOException * @throws InterruptedException */ - public FileRecordWriterContainer(RecordWriter, ? super Writable> baseWriter, + public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseWriter, TaskAttemptContext context) throws IOException, InterruptedException { super(context,baseWriter); this.context = context; jobInfo = HCatOutputFormat.getJobInfo(context); + storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),context.getConfiguration()); + objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); + try { + InternalUtil.inializeSerDe(serDe, context.getConfiguration(), jobInfo); + } catch (SerDeException e) { + throw new IOException("Failed to inialize SerDe",e); + } + // If partition columns occur in data, we want to remove them. partColsToDel = jobInfo.getPosOfPartCols(); dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed(); @@ -83,54 +106,60 @@ if (!dynamicPartitioningUsed) { - storageDriver = HCatOutputFormat.getOutputDriverInstance(context,jobInfo); - this.baseDynamicStorageDrivers = null; + this.baseDynamicSerDe = null; this.baseDynamicWriters = null; this.baseDynamicCommitters = null; - prepareForStorageDriverOutput(context); + this.dynamicContexts = null; + this.dynamicObjectInspectors = null; } else { - storageDriver = null; - this.baseDynamicStorageDrivers = new HashMap(); - this.baseDynamicWriters = new HashMap, ? super Writable>>(); - this.baseDynamicCommitters = new HashMap(); + this.baseDynamicSerDe = new HashMap(); + this.baseDynamicWriters = new HashMap, ? super Writable>>(); + this.baseDynamicCommitters = new HashMap(); + this.dynamicContexts = new HashMap(); + this.dynamicObjectInspectors = new HashMap(); } } /** * @return the storageDriver */ - public HCatOutputStorageDriver getStorageDriver() { - return storageDriver; + public HCatStorageHandler getStorageHandler() { + return storageHandler; } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { + Reporter reporter = InternalUtil.createReporter(context); if (dynamicPartitioningUsed){ - for (RecordWriter, ? super Writable> bwriter : baseDynamicWriters.values()){ - bwriter.close(context); + for (org.apache.hadoop.mapred.RecordWriter, ? super Writable> bwriter : baseDynamicWriters.values()){ + //We are in RecordWriter.close() make sense that the context would be TaskInputOutput + bwriter.close(reporter); } - for(Map.Entryentry : baseDynamicCommitters.entrySet()) { -// for (HCatOutputStorageDriver osd : baseDynamicStorageDrivers.values()){ + for(Map.Entryentry : baseDynamicCommitters.entrySet()) { + org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); OutputCommitter baseOutputCommitter = entry.getValue(); - if (baseOutputCommitter.needsTaskCommit(context)){ - baseOutputCommitter.commitTask(context); + if (baseOutputCommitter.needsTaskCommit(currContext)){ + baseOutputCommitter.commitTask(currContext); } } } else { - getBaseRecordWriter().close(context); + getBaseRecordWriter().close(reporter); } } @Override public void write(WritableComparable key, HCatRecord value) throws IOException, InterruptedException { - RecordWriter localWriter; - HCatOutputStorageDriver localDriver; -// HCatUtil.logList(LOG, "HCatRecord to write", value.getAll()); + org.apache.hadoop.mapred.RecordWriter localWriter; + org.apache.hadoop.mapred.TaskAttemptContext localContext; + ObjectInspector localObjectInspector; + SerDe localSerDe; + OutputJobInfo localJobInfo = null; + if (dynamicPartitioningUsed){ // calculate which writer to use from the remaining values - this needs to be done before we delete cols List dynamicPartValues = new ArrayList(); @@ -138,11 +167,9 @@ dynamicPartValues.add(value.get(colToAppend).toString()); } - int dynHashCode = dynamicPartValues.hashCode(); - if (!baseDynamicWriters.containsKey(dynHashCode)){ -// LOG.info("Creating new storage driver["+baseDynamicStorageDrivers.size() -// +"/"+maxDynamicPartitions+ "] for "+dynamicPartValues.toString()); - if ((maxDynamicPartitions != -1) && (baseDynamicStorageDrivers.size() > maxDynamicPartitions)){ + String dynKey = dynamicPartValues.toString(); + if (!baseDynamicWriters.containsKey(dynKey)){ + if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)){ throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, "Number of dynamic partitions being created " + "exceeds configured max allowable partitions[" @@ -151,54 +178,82 @@ + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + "] if needed."); } -// HCatUtil.logList(LOG, "dynamicpartvals", dynamicPartValues); -// HCatUtil.logList(LOG, "dynamicpartCols", dynamicPartCols); - HCatOutputStorageDriver localOsd = createDynamicStorageDriver(dynamicPartValues); + org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); + configureDynamicStorageHandler(currTaskContext, dynamicPartValues); + localJobInfo= HCatBaseOutputFormat.getJobInfo(currTaskContext); - RecordWriter baseRecordWriter = localOsd.getOutputFormat().getRecordWriter(context); - OutputCommitter baseOutputCommitter = localOsd.getOutputFormat().getOutputCommitter(context); - baseOutputCommitter.setupJob(context); - baseOutputCommitter.setupTask(context); - prepareForStorageDriverOutput(localOsd,context); - baseDynamicWriters.put(dynHashCode, baseRecordWriter); - baseDynamicStorageDrivers.put(dynHashCode,localOsd); - baseDynamicCommitters.put(dynHashCode,baseOutputCommitter); + //setup serDe + SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf()); + try { + InternalUtil.inializeSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo); + } catch (SerDeException e) { + throw new IOException("Failed to initialize SerDe",e); + } + + //create base OutputFormat + org.apache.hadoop.mapred.OutputFormat baseOF = + ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf()); + //check outputSpecs + baseOF.checkOutputSpecs(null,currTaskContext.getJobConf()); + //get Output Committer + org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter(); + //create currJobContext the latest so it gets all the config changes + org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext); + //setupJob() + baseOutputCommitter.setupJob(currJobContext); + //recreate to refresh jobConf of currTask context + currTaskContext = + HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), + currTaskContext.getTaskAttemptID(), + currTaskContext.getProgressible()); + //set temp location + currTaskContext.getConfiguration().set("mapred.work.output.dir", + new FileOutputCommitter(new Path(localJobInfo.getLocation()),currTaskContext).getWorkPath().toString()); + //setupTask() + baseOutputCommitter.setupTask(currTaskContext); + + org.apache.hadoop.mapred.RecordWriter baseRecordWriter = + baseOF.getRecordWriter(null, + currTaskContext.getJobConf(), + FileOutputFormat.getUniqueFile(currTaskContext, "part", ""), + InternalUtil.createReporter(currTaskContext)); + + baseDynamicWriters.put(dynKey, baseRecordWriter); + baseDynamicSerDe.put(dynKey,currSerDe); + baseDynamicCommitters.put(dynKey,baseOutputCommitter); + dynamicContexts.put(dynKey,currTaskContext); + dynamicObjectInspectors.put(dynKey,InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); } - - localWriter = baseDynamicWriters.get(dynHashCode); - localDriver = baseDynamicStorageDrivers.get(dynHashCode); - }else{ + localJobInfo = HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey)); + localWriter = baseDynamicWriters.get(dynKey); + localSerDe = baseDynamicSerDe.get(dynKey); + localContext = dynamicContexts.get(dynKey); + localObjectInspector = dynamicObjectInspectors.get(dynKey); + } + else{ + localJobInfo = HCatBaseOutputFormat.getJobInfo(context); localWriter = getBaseRecordWriter(); - localDriver = storageDriver; + localSerDe = serDe; + localContext = HCatMapRedUtil.createTaskAttemptContext(context); + localObjectInspector = objectInspector; } for(Integer colToDel : partColsToDel){ value.remove(colToDel); } + //The key given by user is ignored - WritableComparable generatedKey = localDriver.generateKey(value); - Writable convertedValue = localDriver.convertValue(value); - localWriter.write(generatedKey, convertedValue); - } - - protected HCatOutputStorageDriver createDynamicStorageDriver(List dynamicPartVals) throws IOException { - HCatOutputStorageDriver localOsd = HCatOutputFormat.getOutputDriverInstance(context,jobInfo,dynamicPartVals); - return localOsd; - } - - public void prepareForStorageDriverOutput(TaskAttemptContext context) throws IOException { - // Set permissions and group on freshly created files. - if (!dynamicPartitioningUsed){ - HCatOutputStorageDriver localOsd = this.getStorageDriver(); - prepareForStorageDriverOutput(localOsd,context); + try { + localWriter.write(null, localSerDe.serialize(value.getAll(), localObjectInspector)); + } catch (SerDeException e) { + throw new IOException("Failed to serialize object",e); } } - private void prepareForStorageDriverOutput(HCatOutputStorageDriver localOsd, - TaskAttemptContext context) throws IOException { - FileOutputStorageDriver.prepareOutputLocation(localOsd, context); + protected void configureDynamicStorageHandler(JobContext context, List dynamicPartVals) throws IOException { + HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals); } } Index: mapreduce/RecordWriterContainer.java =================================================================== --- mapreduce/RecordWriterContainer.java (revision 1241662) +++ mapreduce/RecordWriterContainer.java (working copy) @@ -31,21 +31,21 @@ */ abstract class RecordWriterContainer extends RecordWriter, HCatRecord> { - private final RecordWriter, ? super Writable> baseRecordWriter; + private final org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter; /** * @param context current JobContext * @param baseRecordWriter RecordWriter that this instance will contain */ public RecordWriterContainer(TaskAttemptContext context, - RecordWriter, ? super Writable> baseRecordWriter) { + org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter) { this.baseRecordWriter = baseRecordWriter; } /** * @return underlying RecordWriter */ - public RecordWriter getBaseRecordWriter() { + public org.apache.hadoop.mapred.RecordWriter getBaseRecordWriter() { return baseRecordWriter; } Index: mapreduce/DefaultOutputFormatContainer.java =================================================================== --- mapreduce/DefaultOutputFormatContainer.java (revision 1241662) +++ mapreduce/DefaultOutputFormatContainer.java (working copy) @@ -21,11 +21,12 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import java.io.IOException; @@ -37,7 +38,7 @@ */ class DefaultOutputFormatContainer extends OutputFormatContainer { - public DefaultOutputFormatContainer(OutputFormat, Writable> of) { + public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, Writable> of) { super(of); } @@ -52,7 +53,8 @@ @Override public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - return new DefaultRecordWriterContainer(context, getBaseOutputFormat().getRecordWriter(context)); + return new DefaultRecordWriterContainer(context, + getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()),null, InternalUtil.createReporter(context))); } @@ -67,8 +69,7 @@ @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - OutputFormat outputFormat = getBaseOutputFormat(); - return new DefaultOutputCommitterContainer(context, getBaseOutputFormat().getOutputCommitter(context)); + return new DefaultOutputCommitterContainer(context, new JobConf(context.getConfiguration()).getOutputCommitter()); } /** @@ -78,8 +79,10 @@ */ @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { - OutputFormat, ? super Writable> outputFormat = getBaseOutputFormat(); - outputFormat.checkOutputSpecs(context); + org.apache.hadoop.mapred.OutputFormat, ? super Writable> outputFormat = getBaseOutputFormat(); + JobConf jobConf = new JobConf(context.getConfiguration()); + outputFormat.checkOutputSpecs(null,jobConf); + HCatUtil.copyConf(jobConf, context.getConfiguration()); } } Index: mapreduce/InitializeInput.java =================================================================== --- mapreduce/InitializeInput.java (revision 1241662) +++ mapreduce/InitializeInput.java (working copy) @@ -27,14 +27,30 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.Deserializer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; + import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -55,12 +71,104 @@ static final String HCAT_KEY_PREFIX = "hcat."; private static HiveConf hiveConf; - private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, InputJobInfo inputJobInfo) throws Exception { + private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, + InputJobInfo inputJobInfo) throws Exception { hiveConf = getHiveConf(inputJobInfo, conf); return new HiveMetaStoreClient(hiveConf, null); } + public static Deserializer getDeserializer(Table table) { + try { + return MetaStoreUtils.getDeserializer(hiveConf, table); + } catch (MetaException e) { + throw new RuntimeException(e); + //FIXME + /* needs review + } catch (HiveException e) { + throw new RuntimeException(e); + */ + } + } + + public static String getProperty(Table table, String name) { + return table.getParameters().get(name); + } + + public static Properties getSchema(Table table) { + return MetaStoreUtils.getSchema(table); + } + + public static HiveStorageHandler getStorageHandler(Table table) { + HiveStorageHandler storageHandler; + try { + storageHandler = HiveUtils.getStorageHandler(hiveConf, + InitializeInput.getProperty(table, + org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE)); + } catch (Exception e) { + throw new RuntimeException(e); + } + return storageHandler; + } + + + public static Class + getInputFormatClass(Table table) { + Class inputFormatClass; + try { + String className = table.getSd().getInputFormat(); + if (className == null) { + if (getStorageHandler(table) == null) { + return null; + } + inputFormatClass = getStorageHandler(table).getInputFormatClass(); + } else { + inputFormatClass = + (Class) + Class.forName(className, true, JavaUtils.getClassLoader()); + } + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + return inputFormatClass; + } + + public static Class + getOutputFormatClass (Table table) { + Class outputFormatClass; + + try { + String className = table.getSd().getOutputFormat(); + Class c; + if (className == null) { + if (getStorageHandler(table) == null) { + return null; + } + c = getStorageHandler(table).getOutputFormatClass(); + } else { + c = Class.forName(className, true, + JavaUtils.getClassLoader()); + } + if (!HiveOutputFormat.class.isAssignableFrom(c)) { + outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c); + } else { + outputFormatClass = (Class)c; + } + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + return outputFormatClass; + } + + public static TableDesc getTableDesc(Table table) { + return new TableDesc( + InitializeInput.getDeserializer(table).getClass(), + InitializeInput.getInputFormatClass(table), + InitializeInput.getOutputFormatClass(table), + InitializeInput.getSchema(table)); + } + /** * Set the input to use for the Job. This queries the metadata server with the specified partition predicates, * gets the matching partitions, puts the information in the configuration object. @@ -112,6 +220,7 @@ for (Partition ptn : parts){ PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters()); partInfo.setPartitionValues(createPtnKeyValueMap(table,ptn)); + partInfo.setTableDesc(InitializeInput.getTableDesc(table)); partInfoList.add(partInfo); } @@ -119,6 +228,7 @@ //Non partitioned table PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters()); partInfo.setPartitionValues(new HashMap()); + partInfo.setTableDesc(InitializeInput.getTableDesc(table)); partInfoList.add(partInfo); } inputJobInfo.setPartitions(partInfoList); Index: mapreduce/PartInfo.java =================================================================== --- mapreduce/PartInfo.java (revision 1241662) +++ mapreduce/PartInfo.java (working copy) @@ -21,6 +21,9 @@ import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.TableDesc; + import org.apache.hcatalog.data.schema.HCatSchema; /** The Class used to serialize the partition information read from the metadata server that maps to a partition */ @@ -44,6 +47,9 @@ /** The map of partition key names and their values. */ private Map partitionValues; + /** Table Description for hive use */ + private TableDesc tableDesc; + /** * Instantiates a new hcat partition info. * @param partitionSchema the partition schema @@ -71,7 +77,7 @@ * Gets the value of input storage driver class name. * @return the input storage driver class name */ - public String getInputStorageDriverClass() { + public String getInputStorageHandlerClass() { return inputStorageDriverClass; } @@ -80,7 +86,7 @@ * Gets the value of hcatProperties. * @return the hcatProperties */ - public Properties getInputStorageDriverProperties() { + public Properties getInputStorageHandlerProperties() { return hcatProperties; } @@ -107,4 +113,12 @@ public Map getPartitionValues() { return partitionValues; } + + public void setTableDesc(TableDesc tblDesc) { + this.tableDesc = tblDesc; + } + + public TableDesc getTableDesc() { + return tableDesc; + } } Index: mapreduce/DefaultRecordWriterContainer.java =================================================================== --- mapreduce/DefaultRecordWriterContainer.java (revision 1241662) +++ mapreduce/DefaultRecordWriterContainer.java (working copy) @@ -20,10 +20,15 @@ import java.io.IOException; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; /** @@ -32,9 +37,10 @@ */ class DefaultRecordWriterContainer extends RecordWriterContainer { - private final HCatOutputStorageDriver storageDriver; - private final RecordWriter, ? super Writable> baseRecordWriter; + private final HCatStorageHandler storageHandler; + private final SerDe serDe; private final OutputJobInfo jobInfo; + private final ObjectInspector hcatRecordOI; /** * @param context current JobContext @@ -43,29 +49,34 @@ * @throws InterruptedException */ public DefaultRecordWriterContainer(TaskAttemptContext context, - RecordWriter, ? super Writable> baseRecordWriter) throws IOException, InterruptedException { + org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter) throws IOException, InterruptedException { super(context,baseRecordWriter); jobInfo = HCatOutputFormat.getJobInfo(context); - this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo); - this.baseRecordWriter = baseRecordWriter; + storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + HCatOutputFormat.configureOutputStorageHandler(context); + serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),context.getConfiguration()); + hcatRecordOI = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); + try { + InternalUtil.inializeSerDe(serDe, context.getConfiguration(), jobInfo); + } catch (SerDeException e) { + throw new IOException("Failed to initialize SerDe",e); + } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - baseRecordWriter.close(context); + getBaseRecordWriter().close(InternalUtil.createReporter(context)); } @Override public void write(WritableComparable key, HCatRecord value) throws IOException, InterruptedException { - WritableComparable generatedKey = storageDriver.generateKey(value); - Writable convertedValue = storageDriver.convertValue(value); - baseRecordWriter.write(generatedKey, convertedValue); + try { + getBaseRecordWriter().write(null, serDe.serialize(value, hcatRecordOI)); + } catch (SerDeException e) { + throw new IOException("Failed to serialize object",e); + } } - @Override - public RecordWriter, ? super Writable> getBaseRecordWriter() { - return baseRecordWriter; - } } Index: mapreduce/FileOutputCommitterContainer.java =================================================================== --- mapreduce/FileOutputCommitterContainer.java (revision 1241662) +++ mapreduce/FileOutputCommitterContainer.java (working copy) @@ -33,11 +33,11 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.mapred.HCatMapRedUtil; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus.State; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.AccessControlException; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; @@ -47,7 +47,6 @@ import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; import org.apache.hcatalog.har.HarOutputCommitterPostProcessor; -import org.apache.hcatalog.shims.HCatHadoopShims; import org.apache.thrift.TException; import java.io.IOException; @@ -69,7 +68,8 @@ private boolean partitionsDiscovered; private Map> partitionsDiscoveredByPath; - private Map storageDriversDiscoveredByPath; + private Map contextDiscoveredByPath; + private final HCatStorageHandler cachedStorageHandler; HarOutputCommitterPostProcessor harProcessor = new HarOutputCommitterPostProcessor(); @@ -83,34 +83,39 @@ * @throws IOException */ public FileOutputCommitterContainer(JobContext context, - OutputCommitter baseCommitter) throws IOException { + org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException { super(context, baseCommitter); jobInfo = HCatOutputFormat.getJobInfo(context); dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed(); this.partitionsDiscovered = !dynamicPartitioningUsed; + cachedStorageHandler = HCatUtil.getStorageHandler(context.getConfiguration(),jobInfo.getTableInfo().getStorerInfo()); } @Override public void abortTask(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed){ - getBaseOutputCommitter().abortTask(context); + getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context)); } } @Override public void commitTask(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed){ - getBaseOutputCommitter().commitTask(context); - }else{ - // called explicitly through FileRecordWriterContainer.close() if dynamic + OutputJobInfo outputJobInfo = HCatOutputFormat.getJobInfo(context); + //TODO fix this hack, something wrong with pig + //running multiple storers in a single job, the real output dir got overwritten or something + //the location in OutputJobInfo is still correct so we'll use that + //TestHCatStorer.testMultiPartColsInData() used to fail without this + context.getConfiguration().set("mapred.output.dir",outputJobInfo.getLocation()); + getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context)); } } @Override public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed){ - return getBaseOutputCommitter().needsTaskCommit(context); + return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context)); }else{ // called explicitly through FileRecordWriterContainer.close() if dynamic - return false by default return false; @@ -120,7 +125,7 @@ @Override public void setupJob(JobContext context) throws IOException { if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().setupJob(context); + getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context)); } // in dynamic usecase, called through FileRecordWriterContainer } @@ -128,28 +133,25 @@ @Override public void setupTask(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed){ - getBaseOutputCommitter().setupTask(context); - }else{ - // called explicitly through FileRecordWriterContainer.write() if dynamic + getBaseOutputCommitter().setupTask(HCatMapRedUtil.createTaskAttemptContext(context)); } } @Override public void abortJob(JobContext jobContext, State state) throws IOException { + org.apache.hadoop.mapred.JobContext + marpedJobContext = HCatMapRedUtil.createJobContext(jobContext); if (dynamicPartitioningUsed){ discoverPartitions(jobContext); } if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().abortJob(jobContext, state); + getBaseOutputCommitter().abortJob(marpedJobContext, state); } else if (dynamicPartitioningUsed){ - for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){ + for(JobContext currContext : contextDiscoveredByPath.values()){ try { - baseOsd.abortOutputCommitterJob( - HCatHadoopShims.Instance.get().createTaskAttemptContext( - jobContext.getConfiguration(), TaskAttemptID.forName(ptnRootLocation) - ),state); + new JobConf(currContext.getConfiguration()).getOutputCommitter().abortJob(currContext, state); } catch (Exception e) { throw new IOException(e); } @@ -159,8 +161,7 @@ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); try { - HiveMetaStoreClient client = HCatOutputFormat.createHiveClient( - jobInfo.getServerUri(), jobContext.getConfiguration()); + HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, jobContext.getConfiguration()); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by // HCatOutputFormat and not if they were supplied by Oozie. In the latter @@ -215,7 +216,7 @@ discoverPartitions(jobContext); } if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().commitJob(jobContext); + getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext)); } // create _SUCCESS FILE if so requested. OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); @@ -237,6 +238,7 @@ @Override public void cleanupJob(JobContext context) throws IOException { + if (dynamicPartitioningUsed){ discoverPartitions(context); } @@ -251,15 +253,13 @@ if( table.getPartitionKeys().size() == 0 ) { //non partitioned table if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().cleanupJob(context); + getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); } else if (dynamicPartitioningUsed){ - for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){ + for(JobContext currContext : contextDiscoveredByPath.values()){ try { - baseOsd.cleanupOutputCommitterJob( - HCatHadoopShims.Instance.get().createTaskAttemptContext( - context.getConfiguration(), TaskAttemptID.forName(ptnRootLocation) - )); + JobConf jobConf = new JobConf(currContext.getConfiguration()); + jobConf.getOutputCommitter().cleanupJob(currContext); } catch (Exception e) { throw new IOException(e); } @@ -280,7 +280,7 @@ List partitionsAdded = new ArrayList(); try { - client = HCatOutputFormat.createHiveClient(jobInfo.getServerUri(), conf); + client = HCatOutputFormat.createHiveClient(null, conf); StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters()); @@ -361,7 +361,7 @@ } if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().cleanupJob(context); + getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); } //Cancel HCat and JobTracker tokens @@ -627,7 +627,6 @@ private void discoverPartitions(JobContext context) throws IOException { if (!partitionsDiscovered){ // LOG.info("discover ptns called"); - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); harProcessor.setEnabled(jobInfo.getHarRequested()); @@ -639,17 +638,15 @@ FileSystem fs = loadPath.getFileSystem(context.getConfiguration()); // construct a path pattern (e.g., /*/*) to find all dynamically generated paths - String dynPathSpec = loadPath.toUri().getPath(); dynPathSpec = dynPathSpec.replaceAll("__HIVE_DEFAULT_PARTITION__", "*"); - // TODO : replace this with a param pull from HiveConf // LOG.info("Searching for "+dynPathSpec); - Path pathPattern = new Path(loadPath, dynPathSpec); + Path pathPattern = new Path(dynPathSpec); FileStatus[] status = fs.globStatus(pathPattern); partitionsDiscoveredByPath = new LinkedHashMap>(); - storageDriversDiscoveredByPath = new LinkedHashMap(); + contextDiscoveredByPath = new LinkedHashMap(); if (status.length == 0) { @@ -672,8 +669,9 @@ LinkedHashMap fullPartSpec = new LinkedHashMap(); Warehouse.makeSpecFromName(fullPartSpec, st.getPath()); partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec); - storageDriversDiscoveredByPath.put(st.getPath().toString(), - HCatOutputFormat.getOutputDriverInstance(context, jobInfo, fullPartSpec)); + JobContext currContext = new JobContext(context.getConfiguration(),context.getJobID()); + HCatOutputFormat.configureOutputStorageHandler(context, jobInfo, fullPartSpec); + contextDiscoveredByPath.put(st.getPath().toString(),currContext); } } Index: mapreduce/OutputCommitterContainer.java =================================================================== --- mapreduce/OutputCommitterContainer.java (revision 1241662) +++ mapreduce/OutputCommitterContainer.java (working copy) @@ -26,13 +26,13 @@ * See {@link OutputFormatContainer} for more information about containers. */ abstract class OutputCommitterContainer extends OutputCommitter { - private final OutputCommitter committer; + private final org.apache.hadoop.mapred.OutputCommitter committer; /** * @param context current JobContext * @param committer OutputCommitter that this instance will contain */ - public OutputCommitterContainer(JobContext context, OutputCommitter committer) { + public OutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter committer) { this.committer = committer; } Index: mapreduce/StorerInfo.java =================================================================== --- mapreduce/StorerInfo.java (revision 1241662) +++ mapreduce/StorerInfo.java (working copy) @@ -25,23 +25,34 @@ /** The serialization version */ private static final long serialVersionUID = 1L; + //TODO remove this /** The name of the input storage driver class */ private String inputSDClass; + //TODO remove this /** The name of the output storage driver class */ private String outputSDClass; /** The properties for the storage driver */ private Properties properties; + private String ofClass; + private String ifClass; + + private String serdeClass; + + private String storageHandlerClass; + + + //TODO remove this /** * Initialize the storage driver * @param inputSDClass * @param outputSDClass * @param properties */ - StorerInfo(String inputSDClass, String outputSDClass, Properties properties) { + public StorerInfo(String inputSDClass, String outputSDClass, Properties properties) { super(); this.inputSDClass = inputSDClass; this.outputSDClass = outputSDClass; @@ -49,17 +60,27 @@ } /** - * @return the inputSDClass + * Initialize the storage driver + * @param inputSDClass + * @param outputSDClass + * @param properties */ - public String getInputSDClass() { - return inputSDClass; + public StorerInfo(String inputSDClass, String outputSDClass, String ifClass, String ofClass, String serdeClass, String storageHandlerClass, Properties properties) { + super(); + this.inputSDClass = inputSDClass; + this.outputSDClass = outputSDClass; + this.ifClass =ifClass; + this.ofClass = ofClass; + this.serdeClass = serdeClass; + this.storageHandlerClass = storageHandlerClass; + this.properties = properties; } /** - * @param inputSDClass the inputSDClass to set + * @return the inputSDClass */ - public void setInputSDClass(String inputSDClass) { - this.inputSDClass = inputSDClass; + public String getInputSDClass() { + return inputSDClass; } /** @@ -69,13 +90,26 @@ return outputSDClass; } - /** - * @param outputSDClass the outputSDClass to set - */ - public void setOutputSDClass(String outputSDClass) { - this.outputSDClass = outputSDClass; +public String getIfClass() { + return ifClass; } + public void setIfClass(String ifClass) { + this.ifClass = ifClass; + } + + public String getOfClass() { + return ofClass; + } + + public String getSerdeClass() { + return serdeClass; + } + + public String getStorageHandlerClass() { + return storageHandlerClass; + } + /** * @return the properties */ Index: mapreduce/HCatEximInputFormat.java =================================================================== --- mapreduce/HCatEximInputFormat.java (revision 1241662) +++ mapreduce/HCatEximInputFormat.java (working copy) @@ -33,9 +33,14 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; + import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; @@ -137,4 +142,5 @@ } return partInfos; } + } Index: mapreduce/HCatSplit.java =================================================================== --- mapreduce/HCatSplit.java (revision 1241662) +++ mapreduce/HCatSplit.java (working copy) @@ -22,16 +22,20 @@ import java.io.IOException; import java.lang.reflect.Constructor; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; + import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; /** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */ -public class HCatSplit extends InputSplit implements Writable,org.apache.hadoop.mapred.InputSplit { +public class HCatSplit extends InputSplit + implements Writable,org.apache.hadoop.mapred.InputSplit { Log LOG = LogFactory.getLog(HCatSplit.class); @@ -40,9 +44,13 @@ /** The split returned by the underlying InputFormat split. */ private InputSplit baseSplit; + private org.apache.hadoop.mapred.InputSplit baseMapRedSplit; /** The schema for the HCatTable */ private HCatSchema tableSchema; + + private HiveConf hiveConf; + /** * Instantiates a new hcat split. */ @@ -56,9 +64,20 @@ * @param baseSplit the base split * @param tableSchema the table level schema */ - public HCatSplit(PartInfo partitionInfo, InputSplit baseSplit, HCatSchema tableSchema) { + public HCatSplit(PartInfo partitionInfo, + org.apache.hadoop.mapred.InputSplit baseSplit, HCatSchema tableSchema) + { + this.partitionInfo = partitionInfo; + this.baseMapRedSplit = baseSplit; + this.tableSchema = tableSchema; + } + + public HCatSplit(HiveConf hiveConf, PartInfo partitionInfo, + org.apache.hadoop.mapred.InputSplit baseMapRedSplit, + HCatSchema tableSchema) { + this.hiveConf = hiveConf; this.partitionInfo = partitionInfo; - this.baseSplit = baseSplit; + this.baseMapRedSplit = baseMapRedSplit; this.tableSchema = tableSchema; } @@ -74,8 +93,8 @@ * Gets the underlying InputSplit. * @return the baseSplit */ - public InputSplit getBaseSplit() { - return baseSplit; + public org.apache.hadoop.mapred.InputSplit getBaseSplit() { + return baseMapRedSplit; } /** @@ -180,4 +199,8 @@ String tableSchemaString = HCatUtil.serialize(tableSchema); WritableUtils.writeString(output, tableSchemaString); } + + public HiveConf getHiveConf() { + return hiveConf; + } } Index: mapreduce/DefaultOutputCommitterContainer.java =================================================================== --- mapreduce/DefaultOutputCommitterContainer.java (revision 1241662) +++ mapreduce/DefaultOutputCommitterContainer.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.mapred.HCatMapRedUtil; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -38,54 +39,56 @@ * @param baseCommitter OutputCommitter to contain * @throws IOException */ - public DefaultOutputCommitterContainer(JobContext context, OutputCommitter baseCommitter) throws IOException { + public DefaultOutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException { super(context,baseCommitter); } @Override public void abortTask(TaskAttemptContext context) throws IOException { - getBaseOutputCommitter().abortTask(context); + getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context)); } @Override public void commitTask(TaskAttemptContext context) throws IOException { - getBaseOutputCommitter().commitTask(context); + getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context)); } @Override public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { - return getBaseOutputCommitter().needsTaskCommit(context); + return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context)); } @Override public void setupJob(JobContext context) throws IOException { - getBaseOutputCommitter().setupJob(context); + getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context)); } @Override public void setupTask(TaskAttemptContext context) throws IOException { - getBaseOutputCommitter().setupTask(context); + getBaseOutputCommitter().setupTask(HCatMapRedUtil.createTaskAttemptContext(context)); } @Override public void abortJob(JobContext jobContext, State state) throws IOException { - getBaseOutputCommitter().abortJob(jobContext, state); + getBaseOutputCommitter().abortJob(HCatMapRedUtil.createJobContext(jobContext), state); + cleanupJob(jobContext); } @Override public void commitJob(JobContext jobContext) throws IOException { - getBaseOutputCommitter().commitJob(jobContext); + getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext)); + cleanupJob(jobContext); } @Override public void cleanupJob(JobContext context) throws IOException { - getBaseOutputCommitter().cleanupJob(context); + getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); //Cancel HCat and JobTracker tokens try { - HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(jobInfo.getServerUri(), context.getConfiguration()); + HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration()); String tokenStrForm = client.getTokenStrForm(); if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); Index: cli/SemanticAnalysis/CreateTableHook.java =================================================================== --- cli/SemanticAnalysis/CreateTableHook.java (revision 1241662) +++ cli/SemanticAnalysis/CreateTableHook.java (working copy) @@ -17,6 +17,7 @@ */ package org.apache.hcatalog.cli.SemanticAnalysis; +import java.io.IOException; import java.io.Serializable; import java.util.HashMap; import java.util.List; @@ -47,11 +48,9 @@ import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.pig.drivers.LoadFuncBasedInputDriver; -import org.apache.hcatalog.pig.drivers.StoreFuncBasedOutputDriver; +import org.apache.hcatalog.mapreduce.HCatStorageHandler; import org.apache.hcatalog.rcfile.RCFileInputDriver; import org.apache.hcatalog.rcfile.RCFileOutputDriver; -import org.apache.hcatalog.storagehandler.HCatStorageHandler; final class CreateTableHook extends AbstractSemanticAnalyzerHook { @@ -220,7 +219,6 @@ } CreateTableDesc desc = ((DDLTask) rootTasks.get(rootTasks.size() - 1)) .getWork().getCreateTblDesc(); - Map tblProps = desc.getTblProps(); if (tblProps == null) { // tblProps will be null if user didnt use tblprops in his CREATE @@ -243,19 +241,19 @@ // to authorize. try { HCatStorageHandler storageHandlerInst = HCatUtil - .getStorageHandler(context.getConf(), storageHandler); + .getStorageHandler(context.getConf(), + desc.getStorageHandler(), + desc.getSerName(), + desc.getInputFormat(), + desc.getOutputFormat()); HiveAuthorizationProvider auth = storageHandlerInst .getAuthorizationProvider(); // TBD: To pass in the exact read and write privileges. String databaseName = context.getHive().newTable(desc.getTableName()).getDbName(); auth.authorize(context.getHive().getDatabase(databaseName), null, null); - - tblProps.put(HCatConstants.HCAT_ISD_CLASS, storageHandlerInst - .getInputStorageDriver().getName()); - tblProps.put(HCatConstants.HCAT_OSD_CLASS, storageHandlerInst - .getOutputStorageDriver().getName()); - + } catch (IOException e) { + throw new SemanticException(e); } catch (HiveException e) { throw new SemanticException(e); } Index: common/HCatUtil.java =================================================================== --- common/HCatUtil.java (revision 1241662) +++ common/HCatUtil.java (working copy) @@ -31,21 +31,26 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; @@ -57,17 +62,23 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.data.DataType; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.HCatRecordSerDe; import org.apache.hcatalog.data.Pair; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.mapreduce.FosterStorageHandler; import org.apache.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hcatalog.storagehandler.HCatStorageHandler; +import org.apache.hcatalog.mapreduce.HCatStorageHandler; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.mapreduce.StorerInfo; import org.apache.thrift.TException; public class HCatUtil { - // static final private Log LOG = LogFactory.getLog(HCatUtil.class); +// static final private Log LOG = LogFactory.getLog(HCatUtil.class); public static boolean checkJobContextIfRunningFromBackend(JobContext j) { if (j.getConfiguration().get("mapred.task.id", "").equals("")) { @@ -391,7 +402,7 @@ public static void logStackTrace(Log logger) { StackTraceElement[] stackTrace = new Exception().getStackTrace(); for (int i = 1; i < stackTrace.length; i++) { - logger.info("\t" + stackTrace[i].toString()); + logger.debug("\t" + stackTrace[i].toString()); } } @@ -408,9 +419,9 @@ public static void logList(Log logger, String itemName, List list) { - logger.info(itemName + ":"); + logger.debug(itemName + ":"); for (Object item : list) { - logger.info("\t[" + item + "]"); + logger.debug("\t[" + item + "]"); } } @@ -449,20 +460,58 @@ logger.info("\tservice : " + t.getService()); } + /** + * Create an instance of a storage handler defined in storerInfo. If one cannot be found + * then FosterStorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe. + * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system. + * @param conf job's configuration will be used to configure the Configurable StorageHandler + * @param storerInfo StorerInfo to definining the StorageHandler and InputFormat, OutputFormat and SerDe + * @return storageHandler instance + * @throws IOException + */ + public static HCatStorageHandler getStorageHandler(Configuration conf, StorerInfo storerInfo) throws IOException { + return getStorageHandler(conf, + storerInfo.getStorageHandlerClass(), + storerInfo.getSerdeClass(), + storerInfo.getIfClass(), + storerInfo.getOfClass()); + } + + /** + * Create an instance of a storage handler. If storageHandler == null, + * then surrrogate StorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe. + * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system. + * @param conf job's configuration will be used to configure the Configurable StorageHandler + * @param storageHandler fully qualified class name of the desired StorageHandle instance + * @param serDe fully qualified class name of the desired SerDe instance + * @param inputFormat fully qualified class name of the desired InputFormat instance + * @param outputFormat fully qualified class name of the desired outputFormat instance + * @return storageHandler instance + * @throws IOException + */ public static HCatStorageHandler getStorageHandler(Configuration conf, - String className) throws HiveException { + String storageHandler, + String serDe, + String inputFormat, + String outputFormat) throws IOException { - if (className == null) { - return null; + + if (storageHandler == null) { + try { + return new FosterStorageHandler(inputFormat, + outputFormat, + serDe); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to load foster storage handler",e); + } } + try { Class handlerClass = (Class) Class - .forName(className, true, JavaUtils.getClassLoader()); - HCatStorageHandler storageHandler = (HCatStorageHandler) ReflectionUtils - .newInstance(handlerClass, conf); - return storageHandler; + .forName(storageHandler, true, JavaUtils.getClassLoader()); + return (HCatStorageHandler)ReflectionUtils.newInstance(handlerClass, conf); } catch (ClassNotFoundException e) { - throw new HiveException("Error in loading storage handler." + throw new IOException("Error in loading storage handler." + e.getMessage(), e); } } @@ -478,4 +527,167 @@ +". or
. Got " + tableName); } } + + public static void configureOutputStorageHandler(HCatStorageHandler storageHandler, + JobContext context, + OutputJobInfo outputJobInfo) { + //TODO replace IgnoreKeyTextOutputFormat with a HiveOutputFormatWrapper in StorageHandler + TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(), + storageHandler.getInputFormatClass(), + IgnoreKeyTextOutputFormat.class, + outputJobInfo.getTableInfo().getStorerInfo().getProperties()); + if(tableDesc.getJobProperties() == null) + tableDesc.setJobProperties(new HashMap()); + for (Map.Entry el: context.getConfiguration()) { + tableDesc.getJobProperties().put(el.getKey(),el.getValue()); + } + + Map jobProperties = new HashMap(); + try { + tableDesc.getJobProperties().put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); + + storageHandler.configureOutputJobProperties(tableDesc,jobProperties); + + for(Map.Entry el: jobProperties.entrySet()) { + context.getConfiguration().set(el.getKey(),el.getValue()); + } + } catch (IOException e) { + throw new IllegalStateException("Failed to configure StorageHandler",e); + } + } + + /** + * Replace the contents of dest with the contents of src + * @param src + * @param dest + */ + public static void copyConf(Configuration src, Configuration dest) { + dest.clear(); + for(Map.Entry el : src) { + dest.set(el.getKey(),el.getValue()); + } + } + + /** + * Get the input reader for this input format. This is responsible for + * ensuring the input is correctly read. + * @param context of the task + * @return an input split + * @throws IOException + * @throws InterruptedException + */ + public static HiveMetaStoreClient createHiveClient(String url, + Configuration conf, Class cls) + throws IOException, MetaException { + HiveConf hiveConf = getHiveConf(url, conf, cls); + try { + return new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + // LOG.error("Error connecting to the metastore (conf follows): " + // + e.getMessage(), e); + // HCatUtil.logHiveConf(LOG, hiveConf); + throw e; + } + } + + public static HiveConf getHiveConf (String url, + Configuration conf, Class cls) + throws IOException { + HiveConf hiveConf = new HiveConf(cls); + if (url != null) { + // User specified a thrift url + hiveConf.set("hive.metastore.local", "false"); + hiveConf.set(ConfVars.METASTOREURIS.varname, url); + + String kerberosPrincipal = conf.get( + HCatConstants.HCAT_METASTORE_PRINCIPAL); + if (kerberosPrincipal == null) { + kerberosPrincipal = conf.get( + ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); + } + if (kerberosPrincipal != null) { + hiveConf.setBoolean(ConfVars.METASTORE_USE_THRIFT_SASL.varname, + true); + hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, + kerberosPrincipal); + } + } else { + // Thrift url is null, + // copy the hive conf into the job conf and restore it + // in the backend + if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) { + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(hiveConf.getAllProperties())); + } else { + // Copy configuration properties into the hive conf + Properties properties = (Properties) HCatUtil.deserialize( + conf.get(HCatConstants.HCAT_KEY_HIVE_CONF)); + for (Map.Entry prop: properties.entrySet()) { + if (prop.getValue() instanceof String) { + hiveConf.set((String)prop.getKey(), (String)prop.getValue()); + } else if (prop.getValue() instanceof Integer) { + hiveConf.setInt((String)prop.getKey(), (Integer)prop.getValue()); + } else if (prop.getValue() instanceof Boolean) { + hiveConf.setBoolean((String)prop.getKey(), + (Boolean)prop.getValue()); + } else if (prop.getValue() instanceof Long) { + hiveConf.setLong((String)prop.getKey(), + (Long)prop.getValue()); + } else if (prop.getValue() instanceof Float) { + hiveConf.setFloat((String)prop.getKey(), (Float)prop.getValue()); + } + } + } + } + + if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + hiveConf.set("hive.metastore.token.signature", + conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); + } + + return hiveConf; + } + + public static int compareRecords(HCatRecord first, HCatRecord second) { + try { + return compareRecordContents(first.getAll(), second.getAll()); + + } catch (HCatException e) { + // uh-oh - we've hit a deserialization error in all likelihood + // we're likely not going to recover from this. Alright, the best + // we can do is throw a ClassCastException (the only thing throwable + // from a compareTo, and in a sense, inability to read the object + // is what got us here. + throw new ClassCastException(e.getMessage()); + } + } + + public static int compareRecordContents(List first, List second) { + int mySz = first.size(); + int urSz = second.size(); + if(mySz != urSz) { + return mySz - urSz; + } else { + for (int i = 0; i < first.size(); i++) { + int c = DataType.compare(first.get(i), second.get(i)); + if (c != 0) { + return c; + } + } + return 0; + } + } + + public static ObjectInspector getObjectInspector(String serdeClassName, + Configuration conf, Properties tbl) throws Exception { + SerDe s = (SerDe) Class.forName(serdeClassName).newInstance(); + s.initialize(conf, tbl); + return s.getObjectInspector(); + } + + public static ObjectInspector getHCatRecordObjectInspector(HCatSchema hsch) throws Exception{ + HCatRecordSerDe hrsd = new HCatRecordSerDe(); + hrsd.initialize(hsch); + return hrsd.getObjectInspector(); + } } Index: mapred/HCatMapredOutputFormat.java =================================================================== --- mapred/HCatMapredOutputFormat.java (revision 1241662) +++ mapred/HCatMapredOutputFormat.java (working copy) @@ -139,7 +139,7 @@ OutputJobInfo outputJobInfo = OutputJobInfo.create( dbAndTableName.first, dbAndTableName.second, - ptnValues, null, null); + ptnValues); Job job = new Job(new Configuration()); // TODO : verify with thw if this needs to be shim-ed. There exists no current Shim Index: data/HCatRecordSerDe.java =================================================================== --- data/HCatRecordSerDe.java (revision 1241662) +++ data/HCatRecordSerDe.java (working copy) @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Writable; import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatSchema; /** * SerDe class for serializing to and from HCatRecord @@ -108,7 +109,19 @@ cachedObjectInspector = HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector(rowTypeInfo); } + + public void initialize(HCatSchema hsch) throws SerDeException { + if (LOG.isDebugEnabled()){ + LOG.debug("Initializing HCatRecordSerDe through HCatSchema" + hsch.toString()); + } + + rowTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(hsch.toString()); + cachedObjectInspector = HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector(rowTypeInfo); + + } + + /** * The purpose of a deserialize method is to turn a data blob * which is a writable representation of the data into an @@ -156,7 +169,7 @@ * @param soi : StructObjectInspector * @return HCatRecord */ - private List serializeStruct(Object obj, StructObjectInspector soi) + private static List serializeStruct(Object obj, StructObjectInspector soi) throws SerDeException { List fields = soi.getAllStructFieldRefs(); @@ -181,7 +194,7 @@ * Return underlying Java Object from an object-representation * that is readable by a provided ObjectInspector. */ - private Object serializeField(Object field, + public static Object serializeField(Object field, ObjectInspector fieldObjectInspector) throws SerDeException { Object res = null; if (fieldObjectInspector.getCategory() == Category.PRIMITIVE){ @@ -193,7 +206,7 @@ } else if (fieldObjectInspector.getCategory() == Category.MAP){ res = serializeMap(field,(MapObjectInspector)fieldObjectInspector); } else { - throw new SerDeException(getClass().toString() + throw new SerDeException(HCatRecordSerDe.class.toString() + " does not know what to do with fields of unknown category: " + fieldObjectInspector.getCategory() + " , type: " + fieldObjectInspector.getTypeName()); } @@ -205,7 +218,7 @@ * an object-representation that is readable by a provided * MapObjectInspector */ - private Map serializeMap(Object f, MapObjectInspector moi) throws SerDeException { + private static Map serializeMap(Object f, MapObjectInspector moi) throws SerDeException { ObjectInspector koi = moi.getMapKeyObjectInspector(); ObjectInspector voi = moi.getMapValueObjectInspector(); Map m = new TreeMap(); @@ -221,7 +234,7 @@ return m; } - private List serializeList(Object f, ListObjectInspector loi) throws SerDeException { + private static List serializeList(Object f, ListObjectInspector loi) throws SerDeException { List l = loi.getList(f); ObjectInspector eloi = loi.getListElementObjectInspector(); if (eloi.getCategory() == Category.PRIMITIVE){ @@ -244,7 +257,7 @@ } throw new SerDeException("HCatSerDe map type unimplemented"); } else { - throw new SerDeException(getClass().toString() + throw new SerDeException(HCatRecordSerDe.class.toString() + " does not know what to do with fields of unknown category: " + eloi.getCategory() + " , type: " + eloi.getTypeName()); } @@ -274,4 +287,5 @@ return null; } + } Index: data/HCatRecordable.java =================================================================== --- data/HCatRecordable.java (revision 1241662) +++ data/HCatRecordable.java (working copy) @@ -20,6 +20,7 @@ import java.util.List; import org.apache.hadoop.io.WritableComparable; +import org.apache.hcatalog.common.HCatException; /** * Interface that determines whether we can implement a HCatRecord on top of it @@ -30,21 +31,22 @@ * Gets the field at the specified index. * @param fieldNum the field number * @return the object at the specified index + * @throws HCatException */ - Object get(int fieldNum); + Object get(int fieldNum) throws HCatException; /** * Gets all the fields of the hcat record. * @return the list of fields */ - List getAll(); + List getAll() throws HCatException; /** * Sets the field at the specified index. * @param fieldNum the field number * @param value the value to set */ - void set(int fieldNum, Object value); + void set(int fieldNum, Object value) throws HCatException; /** * Gets the size of the hcat record. Index: data/DefaultHCatRecord.java =================================================================== --- data/DefaultHCatRecord.java (revision 1241662) +++ data/DefaultHCatRecord.java (working copy) @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; public class DefaultHCatRecord extends HCatRecord { @@ -92,34 +93,6 @@ } @Override - public int compareTo(Object that) { - - if(that instanceof HCatRecord) { - HCatRecord other = (HCatRecord)that; - int mySz = this.size(); - int urSz = other.size(); - if(mySz != urSz) { - return mySz - urSz; - } else{ - for (int i = 0; i < mySz;i++) { - int c = DataType.compare(get(i), other.get(i)); - if (c != 0) { - return c; - } - } - } - return 0; - } else { - return DataType.compare(this, that); - } - } - - @Override - public boolean equals(Object other) { - return (compareTo(other) == 0); - } - - @Override public int hashCode() { int hash = 1; for (Object o : contents) { Index: data/HCatRecordObjectInspector.java =================================================================== --- data/HCatRecordObjectInspector.java (revision 1241662) +++ data/HCatRecordObjectInspector.java (working copy) @@ -19,12 +19,19 @@ import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; public class HCatRecordObjectInspector extends StandardStructObjectInspector { + public static final Log LOG = LogFactory + .getLog(HCatRecordObjectInspector.class.getName()); + protected HCatRecordObjectInspector(List structFieldNames, List structFieldObjectInspectors) { super(structFieldNames, structFieldObjectInspectors); @@ -39,12 +46,24 @@ int fieldID = ((MyField) fieldRef).getFieldID(); assert (fieldID >= 0 && fieldID < fields.size()); - return ((HCatRecord) data).get(fieldID); + try { + return ((HCatRecord) data).get(fieldID); + } catch (HCatException e) { + LOG.debug(e.getMessage()); + HCatUtil.logStackTrace(LOG); + return null; + } } @Override public List getStructFieldsDataAsList(Object o) { - return ((HCatRecord) o).getAll(); + try { + return ((HCatRecord) o).getAll(); + } catch (HCatException e) { + LOG.debug(e.getMessage()); + HCatUtil.logStackTrace(LOG); + return null; + } } } Index: data/HCatRecord.java =================================================================== --- data/HCatRecord.java (revision 1241662) +++ data/HCatRecord.java (working copy) @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; /** @@ -134,4 +135,18 @@ set(fieldName,recordSchema,value); } + @Override + public int compareTo(Object that) { + if(that instanceof HCatRecord) { + return HCatUtil.compareRecords(this,(HCatRecord)that); + } else { + return DataType.compare(this, that); + } + } + + @Override + public boolean equals(Object other) { + return (compareTo(other) == 0); + } + }