Index: src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (revision 1245259) +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (working copy) @@ -21,22 +21,48 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; +import java.util.Map; +import java.util.LinkedHashMap; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -public abstract class HCatBaseInputFormat extends InputFormat { +public abstract class HCatBaseInputFormat + extends InputFormat { /** * get the schema for the HCatRecord data returned by HCatInputFormat. @@ -44,8 +70,15 @@ * @param context the jobContext * @throws IllegalArgumentException */ - public static HCatSchema getOutputSchema(JobContext context) throws Exception { - String os = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); + private Class inputFileFormatClass; + + /** Jobproperties as required by Hive */ + private Map jobProperties; + + public static HCatSchema getOutputSchema(JobContext context) + throws IOException { + String os = context.getConfiguration().get( + HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); if (os == null) { return getTableSchema(context); } else { @@ -58,10 +91,19 @@ * @param job the job object * @param hcatSchema the schema to use as the consolidated schema */ - public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception { - job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema)); + public static void setOutputSchema(Job job,HCatSchema hcatSchema) + throws IOException { + job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, + HCatUtil.serialize(hcatSchema)); } + private static + org.apache.hadoop.mapred.InputFormat + getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException { + return ( + org.apache.hadoop.mapred.InputFormat) + ReflectionUtils.newInstance(inputFormatClass, job); + } /** * Logically split the set of input files for the job. Returns the @@ -84,6 +126,34 @@ throw new IOException(e); } + StorageDescriptor tableSD; + HiveMetaStoreClient client = null; + JobConf jobConf; + try { + // To be able to call the storage handlers of hive, + // we need to convert the jobContext into a jobConf + // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) + // begin conversion.. + Configuration conf = jobContext.getConfiguration(); + jobConf = new JobConf(conf); + // ..end of conversion + + client = HCatUtil.createHiveClient(null, conf); + Table table = client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName()); + tableSD = table.getSd(); + } catch (Exception e) { + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } + } finally { + if (client != null) { + client.close(); + } + } + List splits = new ArrayList(); List partitionInfoList = inputJobInfo.getPartitions(); if(partitionInfoList == null ) { @@ -91,38 +161,66 @@ return splits; } + Path[] dirs = FileInputFormat.getInputPaths(jobConf); + HCatStorageHandler storageHandler; //For each matching partition, call getSplits on the underlying InputFormat for(PartInfo partitionInfo : partitionInfoList) { Job localJob = new Job(jobContext.getConfiguration()); - HCatInputStorageDriver storageDriver; try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); + storageHandler = HCatUtil.getStorageHandler( + jobContext.getConfiguration(), + partitionInfo.getInputStorageHandlerClass(), + tableSD.getSerdeInfo().getSerializationLib(), + tableSD.getInputFormat(), + tableSD.getOutputFormat()); + if (storageHandler != null) { + Map jobProperties = new LinkedHashMap(); + storageHandler.configureInputJobProperties( + partitionInfo.getTableDesc(), jobProperties); + if (!jobProperties.isEmpty()) { + partitionInfo.getTableDesc().setJobProperties(jobProperties); + } + } } catch (Exception e) { - throw new IOException(e); + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } } + setInputPath(jobContext, partitionInfo.getLocation()); + HCatSchema allCols = new HCatSchema(new LinkedList()); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getDataColumns().getFields()) allCols.append(field); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getPartitionColumns().getFields()) allCols.append(field); //Pass all required information to the storage driver - initStorageDriver(storageDriver, localJob, partitionInfo, allCols); + Utilities.copyTableJobPropertiesToConf(partitionInfo.getTableDesc(), + jobConf); //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getMapRedInputFormat(jobConf, inputFormatClass); //Call getSplit on the storage drivers InputFormat, create an //HCatSplit for each underlying split - List baseSplits = inputFormat.getSplits(localJob); + //NumSplits is 0 for our purposes + org.apache.hadoop.mapred.InputSplit[] baseSplits = + inputFormat.getSplits(jobConf, 0); - for(InputSplit split : baseSplits) { + for(org.apache.hadoop.mapred.InputSplit split : baseSplits) { splits.add(new HCatSplit( partitionInfo, split, - allCols)); + allCols, + tableSD)); } } @@ -141,36 +239,91 @@ * @throws IOException or InterruptedException */ @Override - public RecordReader createRecordReader(InputSplit split, + public RecordReader + createRecordReader(InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException { HCatSplit hcatSplit = (HCatSplit) split; PartInfo partitionInfo = hcatSplit.getPartitionInfo(); + JobContext jobContext = (JobContext)taskContext; + InputJobInfo inputJobInfo; - //If running through a Pig job, the InputJobInfo will not be available in the - //backend process context (since HCatLoader works on a copy of the JobContext and does + inputJobInfo = getJobInfo(jobContext); + + StorageDescriptor tableSD; + HiveMetaStoreClient client = null; + JobConf jobConf; + try { + // To be able to call the storage handlers of hive, + // we need to convert the jobContext into a jobConf + // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) + // begin conversion.. + Configuration conf = jobContext.getConfiguration(); + jobConf = new JobConf(conf); + // ..end of conversion + + client = HCatUtil.createHiveClient(null, conf); + Table table = client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName()); + tableSD = table.getSd(); + } catch (Exception e) { + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } + } finally { + if (client != null) { + client.close(); + } + } + + //If running through a Pig job, + //the InputJobInfo will not be available in the + //backend process context (since HCatLoader works + //on a copy of the JobContext and does //not call HCatInputFormat.setInput in the backend process). //So this function should NOT attempt to read the InputJobInfo. - - HCatInputStorageDriver storageDriver; + HCatStorageHandler storageHandler; try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); + storageHandler = HCatUtil.getStorageHandler( + jobContext.getConfiguration(), + partitionInfo.getInputStorageHandlerClass(), + tableSD.getSerdeInfo().getSerializationLib(), + tableSD.getInputFormat(), + tableSD.getOutputFormat()); + if (storageHandler != null) { + Map jobProperties = new LinkedHashMap(); + storageHandler.configureInputJobProperties( + partitionInfo.getTableDesc(), jobProperties); + if (!jobProperties.isEmpty()) { + partitionInfo.getTableDesc().setJobProperties(jobProperties); + } + } } catch (Exception e) { - throw new IOException(e); + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } } - //Pass all required information to the storage driver - initStorageDriver(storageDriver, taskContext, partitionInfo, hcatSplit.getTableSchema()); + setInputPath(jobContext, partitionInfo.getLocation()); - //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getMapRedInputFormat(jobConf, inputFormatClass); + Utilities.copyTableJobPropertiesToConf(partitionInfo.getTableDesc(), + jobConf); + //Create the underlying input formats record record and an HCat wrapper - RecordReader recordReader = - inputFormat.createRecordReader(hcatSplit.getBaseSplit(), taskContext); + //need to pass reporter to the underlying getRecordReader + org.apache.hadoop.mapred.RecordReader recordReader = + inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, null); - return new HCatRecordReader(storageDriver,recordReader); + return new HCatRecordReader(storageHandler,recordReader); } /** @@ -179,14 +332,18 @@ * has been called for a JobContext. * @param context the context * @return the table schema - * @throws Exception if HCatInputFromat.setInput has not been called for the current context + * @throws IOException if HCatInputFormat.setInput has not been called + * for the current context */ - public static HCatSchema getTableSchema(JobContext context) throws Exception { + public static HCatSchema getTableSchema(JobContext context) + throws IOException { InputJobInfo inputJobInfo = getJobInfo(context); HCatSchema allCols = new HCatSchema(new LinkedList()); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getDataColumns().getFields()) allCols.append(field); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getPartitionColumns().getFields()) allCols.append(field); return allCols; } @@ -197,73 +354,76 @@ * exception since that means HCatInputFormat.setInput has not been called. * @param jobContext the job context * @return the InputJobInfo object - * @throws Exception the exception + * @throws IOException the exception */ - private static InputJobInfo getJobInfo(JobContext jobContext) throws Exception { - String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); + private static InputJobInfo getJobInfo(JobContext jobContext) + throws IOException { + String jobString = jobContext.getConfiguration().get( + HCatConstants.HCAT_KEY_JOB_INFO); if( jobString == null ) { - throw new Exception("job information not found in JobContext. HCatInputFormat.setInput() not called?"); + throw new IOException("job information not found in JobContext." + + " HCatInputFormat.setInput() not called?"); } return (InputJobInfo) HCatUtil.deserialize(jobString); } + private void setInputPath(JobContext jobContext, String location) + throws IOException{ - /** - * Initializes the storage driver instance. Passes on the required - * schema information, path info and arguments for the supported - * features to the storage driver. - * @param storageDriver the storage driver - * @param context the job context - * @param partitionInfo the partition info - * @param tableSchema the table level schema - * @throws IOException Signals that an I/O exception has occurred. - */ - private void initStorageDriver(HCatInputStorageDriver storageDriver, - JobContext context, PartInfo partitionInfo, - HCatSchema tableSchema) throws IOException { + // ideally we should just call FileInputFormat.setInputPaths() here - but + // that won't work since FileInputFormat.setInputPaths() needs + // a Job object instead of a JobContext which we are handed here - storageDriver.setInputPath(context, partitionInfo.getLocation()); + int length = location.length(); + int curlyOpen = 0; + int pathStart = 0; + boolean globPattern = false; + List pathStrings = new ArrayList(); - if( partitionInfo.getPartitionSchema() != null ) { - storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema()); + for (int i=0; i driverClass = - (Class) - Class.forName(inputStorageDriverClass); - return driverClass.newInstance(); - } catch(Exception e) { - throw new Exception("error creating storage driver " + - inputStorageDriverClass, e); - } - } - } Index: src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (revision 1245259) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (working copy) @@ -93,7 +93,7 @@ try { handleDuplicatePublish(context, jobInfo, - HCatOutputFormat.createHiveClient(null,context.getConfiguration()), + HCatUtil.createHiveClient(null,context.getConfiguration()), jobInfo.getTableInfo().getTable()); } catch (MetaException e) { throw new IOException(e); Index: src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (revision 1245259) +++ src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; /** * Part of the DefaultOutput*Container classes @@ -88,7 +89,7 @@ //Cancel HCat and JobTracker tokens try { - HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration()); + HiveMetaStoreClient client = HCatUtil.createHiveClient(null, context.getConfiguration()); String tokenStrForm = client.getTokenStrForm(); if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); Index: src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (revision 1245259) +++ src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (working copy) @@ -18,71 +18,126 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.LazyHCatRecord; -/** The HCat wrapper for the underlying RecordReader, this ensures that the initialize on - * the underlying record reader is done with the underlying split, not with HCatSplit. +/** The HCat wrapper for the underlying RecordReader, + * this ensures that the initialize on + * the underlying record reader is done with the underlying split, + * not with HCatSplit. */ -class HCatRecordReader extends RecordReader - implements org.apache.hadoop.mapred.RecordReader { +class HCatRecordReader extends RecordReader { Log LOG = LogFactory.getLog(HCatRecordReader.class); int lineCount = 0; + WritableComparable currentKey; + Writable currentValue; /** The underlying record reader to delegate to. */ - private final RecordReader baseRecordReader; + //org.apache.hadoop.mapred. + private final org.apache.hadoop.mapred.RecordReader + baseRecordReader; /** The storage driver used */ - private final HCatInputStorageDriver storageDriver; + private final HCatStorageHandler storageHandler; + private ObjectInspector objectInspector; + /** * Instantiates a new hcat record reader. * @param baseRecordReader the base record reader */ - public HCatRecordReader(HCatInputStorageDriver storageDriver, RecordReader baseRecordReader) { - this.baseRecordReader = baseRecordReader; - this.storageDriver = storageDriver; + public HCatRecordReader(HCatStorageHandler storageHandler, + org.apache.hadoop.mapred.RecordReader baseRecordReader) { + this.baseRecordReader = baseRecordReader; + this.storageHandler = storageHandler; } - + /* (non-Javadoc) - * @see org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) + * @see org.apache.hadoop.mapreduce.RecordReader#initialize( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override - public void initialize(InputSplit split, TaskAttemptContext taskContext) + public void initialize(org.apache.hadoop.mapreduce.InputSplit split, + TaskAttemptContext taskContext) throws IOException, InterruptedException { - InputSplit baseSplit = split; + } + // the mapred version of the initialize function + public void initialize(org.apache.hadoop.mapred.InputSplit split, + TaskAttemptContext taskContext) + throws IOException, InterruptedException { + + org.apache.hadoop.mapred.InputSplit baseSplit; + if( split instanceof HCatSplit ) { baseSplit = ((HCatSplit) split).getBaseSplit(); + } else { + throw new IOException("Not a HCatSplit"); } - baseRecordReader.initialize(baseSplit, taskContext); + Properties properties = new Properties(); + for (Map.Entryparam : + ((HCatSplit)split).getStorageDescriptor() + .getParameters().entrySet()) { + properties.setProperty(param.getKey(), param.getValue()); + } + + try { + objectInspector = HCatUtil.getObjectInspector( + storageHandler.getSerDeClass().getName(), + storageHandler.getConf(), + properties); + } catch (Exception e) { + throw new IOException("Unable to create objectInspector " + + "for serde class" + storageHandler.getSerDeClass().getName() + + e); + } } /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey() */ @Override - public WritableComparable getCurrentKey() throws IOException, InterruptedException { - return baseRecordReader.getCurrentKey(); + public WritableComparable getCurrentKey() + throws IOException, InterruptedException { + return currentKey; } /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() */ @Override - public HCatRecord getCurrentValue() throws IOException, InterruptedException { - HCatRecord r = storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue()); - return r; + public HCatRecord getCurrentValue() + throws IOException, InterruptedException { + HCatRecord r; + try { + r = new LazyHCatRecord(currentValue, + objectInspector); + } catch (Exception e) { + throw new IOException(e); + } + return r; } /* (non-Javadoc) @@ -95,9 +150,6 @@ } catch (IOException e) { LOG.warn(e.getMessage()); LOG.warn(e.getStackTrace()); - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); } return 0.0f; // errored } @@ -108,7 +160,10 @@ @Override public boolean nextKeyValue() throws IOException, InterruptedException { lineCount++; - return baseRecordReader.nextKeyValue(); + currentKey = baseRecordReader.createKey(); + currentValue = baseRecordReader.createValue(); + return baseRecordReader.next(currentKey, + currentValue); } /* (non-Javadoc) @@ -119,45 +174,4 @@ baseRecordReader.close(); } - @Override - public Object createKey() { - WritableComparable o = null; - try { - o = getCurrentKey(); - } catch (IOException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); - } - return o; - } - - @Override - public Object createValue() { - return new DefaultHCatRecord(); - } - - @Override - public long getPos() throws IOException { - return lineCount; - } - - @Override - public boolean next(Object key, Object value) throws IOException { - try { - if (!nextKeyValue()){ - return false; - } - - ((HCatRecord)value).copy(getCurrentValue()); - - return true; - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); - } - return false; - } } Index: src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (revision 1245259) +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (working copy) @@ -226,7 +226,8 @@ } static void cancelDelegationTokens(JobContext context, OutputJobInfo outputJobInfo) throws Exception { - HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration()); + HiveMetaStoreClient client = HCatUtil.createHiveClient( + null, context.getConfiguration()); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by // HCatOutputFormat and not if they were supplied by Oozie. In the latter Index: src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (revision 1245259) +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (working copy) @@ -73,7 +73,7 @@ try { Configuration conf = job.getConfiguration(); - client = createHiveClient(null, conf); + client = HCatUtil.createHiveClient(null, conf); Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName()); if (table.getPartitionKeysSize() == 0 ){ @@ -141,7 +141,9 @@ //Serialize the output info into the configuration outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); outputJobInfo.setOutputSchema(tableSchema); + harRequested = HCatUtil.getharRequested(); outputJobInfo.setHarRequested(harRequested); + maxDynamicPartitions = HCatUtil.getmaxDynamicPartitions(); outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions); HCatUtil.configureOutputStorageHandler(storageHandler,job,outputJobInfo); @@ -222,76 +224,5 @@ return getOutputFormat(context).getOutputCommitter(context); } - //TODO remove url component, everything should be encapsulated in HiveConf - static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException { - HiveConf hiveConf = getHiveConf(url, conf); -// HCatUtil.logHiveConf(LOG, hiveConf); - try { - return new HiveMetaStoreClient(hiveConf); - } catch (MetaException e) { - LOG.error("Error connecting to the metastore (conf follows): "+e.getMessage(), e); - HCatUtil.logHiveConf(LOG, hiveConf); - throw e; - } - } - - - static HiveConf getHiveConf(String url, Configuration conf) throws IOException { - HiveConf hiveConf = new HiveConf(HCatOutputFormat.class); - - if( url != null ) { - //User specified a thrift url - - hiveConf.set("hive.metastore.local", "false"); - hiveConf.set(ConfVars.METASTOREURIS.varname, url); - - String kerberosPrincipal = conf.get(HCatConstants.HCAT_METASTORE_PRINCIPAL); - if (kerberosPrincipal == null){ - kerberosPrincipal = conf.get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); - } - if (kerberosPrincipal != null){ - hiveConf.setBoolean(ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); - hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, kerberosPrincipal); - } - } else { - //Thrift url is null, copy the hive conf into the job conf and restore it - //in the backend context - - if( conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null ) { - conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(hiveConf.getAllProperties())); - } else { - //Copy configuration properties into the hive conf - Properties properties = (Properties) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_HIVE_CONF)); - - for(Map.Entry prop : properties.entrySet() ) { - if( prop.getValue() instanceof String ) { - hiveConf.set((String) prop.getKey(), (String) prop.getValue()); - } else if( prop.getValue() instanceof Integer ) { - hiveConf.setInt((String) prop.getKey(), (Integer) prop.getValue()); - } else if( prop.getValue() instanceof Boolean ) { - hiveConf.setBoolean((String) prop.getKey(), (Boolean) prop.getValue()); - } else if( prop.getValue() instanceof Long ) { - hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue()); - } else if( prop.getValue() instanceof Float ) { - hiveConf.setFloat((String) prop.getKey(), (Float) prop.getValue()); - } - } - } - - } - - if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { - hiveConf.set("hive.metastore.token.signature", conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); - } - - // figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo - if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ - maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); - }else{ - maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions - } - harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED); - return hiveConf; - } - + } Index: src/java/org/apache/hcatalog/mapreduce/PartInfo.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/PartInfo.java (revision 1245259) +++ src/java/org/apache/hcatalog/mapreduce/PartInfo.java (working copy) @@ -21,6 +21,9 @@ import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.TableDesc; + import org.apache.hcatalog.data.schema.HCatSchema; /** The Class used to serialize the partition information read from the metadata server that maps to a partition */ @@ -44,6 +47,9 @@ /** The map of partition key names and their values. */ private Map partitionValues; + /** Table Description for hive use */ + private TableDesc tableDesc; + /** * Instantiates a new hcat partition info. * @param partitionSchema the partition schema @@ -71,7 +77,7 @@ * Gets the value of input storage driver class name. * @return the input storage driver class name */ - public String getInputStorageDriverClass() { + public String getInputStorageHandlerClass() { return inputStorageDriverClass; } @@ -80,7 +86,7 @@ * Gets the value of hcatProperties. * @return the hcatProperties */ - public Properties getInputStorageDriverProperties() { + public Properties getInputStorageHandlerProperties() { return hcatProperties; } @@ -107,4 +113,12 @@ public Map getPartitionValues() { return partitionValues; } + + public void setTableDesc(TableDesc tblDesc) { + this.tableDesc = tblDesc; + } + + public TableDesc getTableDesc() { + return tableDesc; + } } Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1245259) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -161,11 +161,13 @@ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); try { - HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, jobContext.getConfiguration()); + HiveMetaStoreClient client = HCatUtil.createHiveClient(null, + jobContext.getConfiguration()); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by - // HCatOutputFormat and not if they were supplied by Oozie. In the latter - // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set + // HCatOutputFormat and not if they were supplied by Oozie. + // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in + // the conf will not be set String tokenStrForm = client.getTokenStrForm(); if(tokenStrForm != null && jobContext.getConfiguration().get (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { @@ -280,7 +282,7 @@ List partitionsAdded = new ArrayList(); try { - client = HCatOutputFormat.createHiveClient(null, conf); + client = HCatUtil.createHiveClient(null, conf); StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters()); Index: src/java/org/apache/hcatalog/mapreduce/HCatSplit.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (revision 1245259) +++ src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (working copy) @@ -22,16 +22,21 @@ import java.io.IOException; import java.lang.reflect.Constructor; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; + import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; /** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */ -public class HCatSplit extends InputSplit implements Writable,org.apache.hadoop.mapred.InputSplit { +public class HCatSplit extends InputSplit + implements Writable,org.apache.hadoop.mapred.InputSplit { Log LOG = LogFactory.getLog(HCatSplit.class); @@ -40,9 +45,14 @@ /** The split returned by the underlying InputFormat split. */ private InputSplit baseSplit; + private org.apache.hadoop.mapred.InputSplit baseMapRedSplit; /** The schema for the HCatTable */ private HCatSchema tableSchema; + + private HiveConf hiveConf; + private StorageDescriptor storageDescriptor; + /** * Instantiates a new hcat split. */ @@ -56,10 +66,14 @@ * @param baseSplit the base split * @param tableSchema the table level schema */ - public HCatSplit(PartInfo partitionInfo, InputSplit baseSplit, HCatSchema tableSchema) { - this.partitionInfo = partitionInfo; - this.baseSplit = baseSplit; - this.tableSchema = tableSchema; + public HCatSplit(PartInfo partitionInfo, + org.apache.hadoop.mapred.InputSplit baseSplit, HCatSchema tableSchema, + StorageDescriptor storageDescriptor) + { + this.partitionInfo = partitionInfo; + this.baseMapRedSplit = baseSplit; + this.tableSchema = tableSchema; + this.storageDescriptor = storageDescriptor; } /** @@ -74,8 +88,8 @@ * Gets the underlying InputSplit. * @return the baseSplit */ - public InputSplit getBaseSplit() { - return baseSplit; + public org.apache.hadoop.mapred.InputSplit getBaseSplit() { + return baseMapRedSplit; } /** @@ -180,4 +194,8 @@ String tableSchemaString = HCatUtil.serialize(tableSchema); WritableUtils.writeString(output, tableSchemaString); } + + StorageDescriptor getStorageDescriptor() { + return storageDescriptor; + } } Index: src/java/org/apache/hcatalog/common/HCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatUtil.java (revision 1245259) +++ src/java/org/apache/hcatalog/common/HCatUtil.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -78,6 +79,9 @@ // static final private Log LOG = LogFactory.getLog(HCatUtil.class); + private static int maxDynamicPartitions; + private static boolean harRequested; + public static boolean checkJobContextIfRunningFromBackend(JobContext j) { if (j.getConfiguration().get("mapred.task.id", "").equals("")) { return false; @@ -539,14 +543,16 @@ return hrsd.getObjectInspector(); } - public static void configureOutputStorageHandler(HCatStorageHandler storageHandler, - JobContext context, - OutputJobInfo outputJobInfo) { - //TODO replace IgnoreKeyTextOutputFormat with a HiveOutputFormatWrapper in StorageHandler + public static void + configureOutputStorageHandler(HCatStorageHandler storageHandler, + JobContext context, + OutputJobInfo outputJobInfo) { + //TODO replace IgnoreKeyTextOutputFormat with a + //HiveOutputFormatWrapper in StorageHandler TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(), - storageHandler.getInputFormatClass(), - IgnoreKeyTextOutputFormat.class, - outputJobInfo.getTableInfo().getStorerInfo().getProperties()); + storageHandler.getInputFormatClass(), + IgnoreKeyTextOutputFormat.class, + outputJobInfo.getTableInfo().getStorerInfo().getProperties()); if(tableDesc.getJobProperties() == null) tableDesc.setJobProperties(new HashMap()); for (Map.Entry el: context.getConfiguration()) { @@ -555,15 +561,19 @@ Map jobProperties = new HashMap(); try { - tableDesc.getJobProperties().put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); + tableDesc.getJobProperties().put( + HCatConstants.HCAT_KEY_OUTPUT_INFO, + HCatUtil.serialize(outputJobInfo)); - storageHandler.configureOutputJobProperties(tableDesc,jobProperties); + storageHandler.configureOutputJobProperties(tableDesc, + jobProperties); for(Map.Entry el: jobProperties.entrySet()) { context.getConfiguration().set(el.getKey(),el.getValue()); } } catch (IOException e) { - throw new IllegalStateException("Failed to configure StorageHandler",e); + throw new IllegalStateException( + "Failed to configure StorageHandler",e); } } @@ -579,4 +589,103 @@ } } + //TODO remove url component, everything should be encapsulated in HiveConf + public static HiveMetaStoreClient createHiveClient(String url, + Configuration conf) throws IOException, MetaException { + HiveConf hiveConf = getHiveConf(url, conf); +// HCatUtil.logHiveConf(LOG, hiveConf); + try { + return new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { +// LOG.error("Error connecting to the metastore (conf follows): " +// + e.getMessage(), e); +// HCatUtil.logHiveConf(LOG, hiveConf); + throw e; + } + } + + + static HiveConf getHiveConf(String url, Configuration conf) + throws IOException { + HiveConf hiveConf = new HiveConf(); + + if( url != null ) { + //User specified a thrift url + + hiveConf.set("hive.metastore.local", "false"); + hiveConf.set(ConfVars.METASTOREURIS.varname, url); + + String kerberosPrincipal = conf.get( + HCatConstants.HCAT_METASTORE_PRINCIPAL); + if (kerberosPrincipal == null){ + kerberosPrincipal = conf.get( + ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); + } + if (kerberosPrincipal != null){ + hiveConf.setBoolean( + ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); + hiveConf.set( + ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, + kerberosPrincipal); + } + } else { + //Thrift url is null, copy the hive conf into + //the job conf and restore it + //in the backend context + + if( conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null ) { + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(hiveConf.getAllProperties())); + } else { + //Copy configuration properties into the hive conf + Properties properties = (Properties) HCatUtil.deserialize( + conf.get(HCatConstants.HCAT_KEY_HIVE_CONF)); + + for(Map.Entry prop : properties.entrySet() ) { + if( prop.getValue() instanceof String ) { + hiveConf.set((String) prop.getKey(), (String) prop.getValue()); + } else if( prop.getValue() instanceof Integer ) { + hiveConf.setInt((String) prop.getKey(), + (Integer) prop.getValue()); + } else if( prop.getValue() instanceof Boolean ) { + hiveConf.setBoolean((String) prop.getKey(), + (Boolean) prop.getValue()); + } else if( prop.getValue() instanceof Long ) { + hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue()); + } else if( prop.getValue() instanceof Float ) { + hiveConf.setFloat((String) prop.getKey(), + (Float) prop.getValue()); + } + } + } + + } + + if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + hiveConf.set("hive.metastore.token.signature", + conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); + } + + // figure out what the maximum number of partitions allowed is, + // so we can pass it on to our outputinfo + if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ + maxDynamicPartitions = hiveConf.getIntVar( + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); + }else{ + // disable bounds checking for maximum number of dynamic partitions + maxDynamicPartitions = -1; + } + harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED); + return hiveConf; + } + + public static int getmaxDynamicPartitions() + { + return maxDynamicPartitions; + } + + public static boolean getharRequested() + { + return harRequested; + } }