Index: mapreduce/HCatRecordReader.java =================================================================== --- mapreduce/HCatRecordReader.java (revision 1241662) +++ mapreduce/HCatRecordReader.java (working copy) @@ -18,15 +18,17 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; +import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hcatalog.data.DefaultHCatRecord; + import org.apache.hcatalog.data.HCatRecord; /** The HCat wrapper for the underlying RecordReader, this ensures that the initialize on @@ -53,19 +55,43 @@ this.storageDriver = storageDriver; } + public HCatRecordReader(HiveStorageHandler storageHandler, + org.apache.hadoop.mapred.RecordReader baseRecordReader) { + //FIXME + //needs integration here + //this.baseRecordReader = baseRecordReader; + //this.storageHandler = storageHandler + //next few lines not required + this.baseRecordReader = null; + this.storageDriver = null; + } + + //FIXME + //needs integration here + //Function not needed + @Override + public void initialize(org.apache.hadoop.mapreduce.InputSplit split, + TaskAttemptContext taskContext) + throws IOException, InterruptedException { + } + /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) */ - @Override - public void initialize(InputSplit split, TaskAttemptContext taskContext) +// @Override + public void initialize(org.apache.hadoop.mapred.InputSplit split, + TaskAttemptContext taskContext) throws IOException, InterruptedException { - InputSplit baseSplit = split; + org.apache.hadoop.mapred.InputSplit baseSplit = split; if( split instanceof HCatSplit ) { baseSplit = ((HCatSplit) split).getBaseSplit(); } - baseRecordReader.initialize(baseSplit, taskContext); + //FIXME + //needs Integration here + //baseRecordReader.initialize(baseSplit, taskContext); } /* (non-Javadoc) Index: mapreduce/HCatBaseInputFormat.java =================================================================== --- mapreduce/HCatBaseInputFormat.java (revision 1241662) +++ mapreduce/HCatBaseInputFormat.java (working copy) @@ -21,22 +21,43 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; +import java.util.Map; +import java.util.LinkedHashMap; import java.util.List; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -public abstract class HCatBaseInputFormat extends InputFormat { +public abstract class HCatBaseInputFormat + extends InputFormat { /** * get the schema for the HCatRecord data returned by HCatInputFormat. @@ -44,8 +65,14 @@ * @param context the jobContext * @throws IllegalArgumentException */ + private Class inputFileFormatClass; + + /** Jobproperties as required by Hive */ + private Map jobProperties; + public static HCatSchema getOutputSchema(JobContext context) throws Exception { - String os = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); + String os = context.getConfiguration().get( + HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); if (os == null) { return getTableSchema(context); } else { @@ -58,11 +85,21 @@ * @param job the job object * @param hcatSchema the schema to use as the consolidated schema */ - public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception { - job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema)); + public static void setOutputSchema(Job job,HCatSchema hcatSchema) + throws Exception { + job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, + HCatUtil.serialize(hcatSchema)); } + public static + org.apache.hadoop.mapred.InputFormat + getInputFormat (JobConf job, Class inputFormatClass) throws IOException { + return ( + org.apache.hadoop.mapred.InputFormat) + ReflectionUtils.newInstance(inputFormatClass, job); + } + /** * Logically split the set of input files for the job. Returns the * underlying InputFormat's splits @@ -84,6 +121,14 @@ throw new IOException(e); } + // To be able to call the storage handlers of hive, + // we need to convert the jobContext into a jobConf + // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) + // begin conversion.. + Configuration conf = jobContext.getConfiguration(); + JobConf jobConf = new JobConf(conf); + // ..end of conversion + List splits = new ArrayList(); List partitionInfoList = inputJobInfo.getPartitions(); if(partitionInfoList == null ) { @@ -91,16 +136,31 @@ return splits; } + Path[] dirs = FileInputFormat.getInputPaths(jobConf); + HiveConf hiveConf = InitializeInput.getHiveConf(inputJobInfo, conf); + HiveStorageHandler storageHandler; //For each matching partition, call getSplits on the underlying InputFormat for(PartInfo partitionInfo : partitionInfoList) { Job localJob = new Job(jobContext.getConfiguration()); - HCatInputStorageDriver storageDriver; try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); - } catch (Exception e) { - throw new IOException(e); + storageHandler = + HiveUtils.getStorageHandler(hiveConf, + partitionInfo.getInputStorageHandlerClass()); + if (storageHandler != null) { + Map jobProperties = new LinkedHashMap(); + storageHandler.configureTableJobProperties( + partitionInfo.getTableDesc(), jobProperties); + if (!jobProperties.isEmpty()) { + partitionInfo.getTableDesc().setJobProperties(jobProperties); + } + } + } catch (HiveException ex) { + throw new RuntimeException(ex); } + setInputPath(jobContext, partitionInfo.getLocation()); + HCatSchema allCols = new HCatSchema(new LinkedList()); for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields()) allCols.append(field); @@ -108,18 +168,23 @@ allCols.append(field); //Pass all required information to the storage driver - initStorageDriver(storageDriver, localJob, partitionInfo, allCols); + Utilities.copyTableJobPropertiesToConf(partitionInfo.getTableDesc(), + jobConf); //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getInputFormat(jobConf, inputFormatClass); //Call getSplit on the storage drivers InputFormat, create an //HCatSplit for each underlying split - List baseSplits = inputFormat.getSplits(localJob); + //NumSplits is 0 for our purposes + org.apache.hadoop.mapred.InputSplit[] baseSplits = + inputFormat.getSplits(jobConf, 0); - for(InputSplit split : baseSplits) { + for(org.apache.hadoop.mapred.InputSplit split : baseSplits) { splits.add(new HCatSplit( + hiveConf, partitionInfo, split, allCols)); @@ -141,36 +206,62 @@ * @throws IOException or InterruptedException */ @Override - public RecordReader createRecordReader(InputSplit split, + public RecordReader + createRecordReader(InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException { HCatSplit hcatSplit = (HCatSplit) split; PartInfo partitionInfo = hcatSplit.getPartitionInfo(); + JobContext jobContext = (JobContext)taskContext; - //If running through a Pig job, the InputJobInfo will not be available in the - //backend process context (since HCatLoader works on a copy of the JobContext and does + //If running through a Pig job, + //the InputJobInfo will not be available in the + //backend process context (since HCatLoader works + //on a copy of the JobContext and does //not call HCatInputFormat.setInput in the backend process). //So this function should NOT attempt to read the InputJobInfo. - HCatInputStorageDriver storageDriver; + HiveStorageHandler storageHandler; try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); - } catch (Exception e) { - throw new IOException(e); + storageHandler = + HiveUtils.getStorageHandler(hcatSplit.getHiveConf(), + partitionInfo.getInputStorageHandlerClass()); + if (storageHandler != null) { + Map jobProperties = new LinkedHashMap(); + storageHandler.configureTableJobProperties( + partitionInfo.getTableDesc(), jobProperties); + if (!jobProperties.isEmpty()) { + partitionInfo.getTableDesc().setJobProperties(jobProperties); + } + } + } catch (HiveException ex) { + throw new RuntimeException(ex); } + setInputPath(jobContext, partitionInfo.getLocation()); + + // To be able to call the storage handlers of hive, + // we need to convert the jobContext into a jobConf + // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) + // begin conversion.. + Configuration conf = jobContext.getConfiguration(); + JobConf jobConf = new JobConf(conf); + // ..end of conversion + + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getInputFormat(jobConf, inputFormatClass); + //Pass all required information to the storage driver - initStorageDriver(storageDriver, taskContext, partitionInfo, hcatSplit.getTableSchema()); + Utilities.copyTableJobPropertiesToConf(partitionInfo.getTableDesc(), + jobConf); - //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); - //Create the underlying input formats record record and an HCat wrapper - RecordReader recordReader = - inputFormat.createRecordReader(hcatSplit.getBaseSplit(), taskContext); + //need to pass reporter to the underlying getRecordReader + org.apache.hadoop.mapred.RecordReader recordReader = + inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, null); - return new HCatRecordReader(storageDriver,recordReader); + return new HCatRecordReader(storageHandler,recordReader); } /** @@ -208,62 +299,61 @@ return (InputJobInfo) HCatUtil.deserialize(jobString); } + public void setInputPath(JobContext jobContext, String location) throws IOException{ - /** - * Initializes the storage driver instance. Passes on the required - * schema information, path info and arguments for the supported - * features to the storage driver. - * @param storageDriver the storage driver - * @param context the job context - * @param partitionInfo the partition info - * @param tableSchema the table level schema - * @throws IOException Signals that an I/O exception has occurred. - */ - private void initStorageDriver(HCatInputStorageDriver storageDriver, - JobContext context, PartInfo partitionInfo, - HCatSchema tableSchema) throws IOException { + // ideally we should just call FileInputFormat.setInputPaths() here - but + // that won't work since FileInputFormat.setInputPaths() needs + // a Job object instead of a JobContext which we are handed here - storageDriver.setInputPath(context, partitionInfo.getLocation()); + int length = location.length(); + int curlyOpen = 0; + int pathStart = 0; + boolean globPattern = false; + List pathStrings = new ArrayList(); - if( partitionInfo.getPartitionSchema() != null ) { - storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema()); + for (int i=0; i driverClass = - (Class) - Class.forName(inputStorageDriverClass); - return driverClass.newInstance(); - } catch(Exception e) { - throw new Exception("error creating storage driver " + - inputStorageDriverClass, e); - } - } - } Index: mapreduce/HCatInputFormat.java =================================================================== --- mapreduce/HCatInputFormat.java (revision 1241662) +++ mapreduce/HCatInputFormat.java (working copy) @@ -20,8 +20,13 @@ import java.io.IOException; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hcatalog.data.HCatRecord; + /** The InputFormat to use to read data from HCat */ public class HCatInputFormat extends HCatBaseInputFormat { @@ -42,6 +47,5 @@ throw new IOException(e); } } - - + } Index: mapreduce/InitializeInput.java =================================================================== --- mapreduce/InitializeInput.java (revision 1241662) +++ mapreduce/InitializeInput.java (working copy) @@ -27,14 +27,30 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.Deserializer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; + import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -55,12 +71,104 @@ static final String HCAT_KEY_PREFIX = "hcat."; private static HiveConf hiveConf; - private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, InputJobInfo inputJobInfo) throws Exception { + private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, + InputJobInfo inputJobInfo) throws Exception { hiveConf = getHiveConf(inputJobInfo, conf); return new HiveMetaStoreClient(hiveConf, null); } + public static Deserializer getDeserializer(Table table) { + try { + return MetaStoreUtils.getDeserializer(hiveConf, table); + } catch (MetaException e) { + throw new RuntimeException(e); + //FIXME + /* needs review + } catch (HiveException e) { + throw new RuntimeException(e); + */ + } + } + + public static String getProperty(Table table, String name) { + return table.getParameters().get(name); + } + + public static Properties getSchema(Table table) { + return MetaStoreUtils.getSchema(table); + } + + public static HiveStorageHandler getStorageHandler(Table table) { + HiveStorageHandler storageHandler; + try { + storageHandler = HiveUtils.getStorageHandler(hiveConf, + InitializeInput.getProperty(table, + org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE)); + } catch (Exception e) { + throw new RuntimeException(e); + } + return storageHandler; + } + + + public static Class + getInputFormatClass(Table table) { + Class inputFormatClass; + try { + String className = table.getSd().getInputFormat(); + if (className == null) { + if (getStorageHandler(table) == null) { + return null; + } + inputFormatClass = getStorageHandler(table).getInputFormatClass(); + } else { + inputFormatClass = + (Class) + Class.forName(className, true, JavaUtils.getClassLoader()); + } + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + return inputFormatClass; + } + + public static Class + getOutputFormatClass (Table table) { + Class outputFormatClass; + + try { + String className = table.getSd().getOutputFormat(); + Class c; + if (className == null) { + if (getStorageHandler(table) == null) { + return null; + } + c = getStorageHandler(table).getOutputFormatClass(); + } else { + c = Class.forName(className, true, + JavaUtils.getClassLoader()); + } + if (!HiveOutputFormat.class.isAssignableFrom(c)) { + outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c); + } else { + outputFormatClass = (Class)c; + } + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + return outputFormatClass; + } + + public static TableDesc getTableDesc(Table table) { + return new TableDesc( + InitializeInput.getDeserializer(table).getClass(), + InitializeInput.getInputFormatClass(table), + InitializeInput.getOutputFormatClass(table), + InitializeInput.getSchema(table)); + } + /** * Set the input to use for the Job. This queries the metadata server with the specified partition predicates, * gets the matching partitions, puts the information in the configuration object. @@ -112,6 +220,7 @@ for (Partition ptn : parts){ PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters()); partInfo.setPartitionValues(createPtnKeyValueMap(table,ptn)); + partInfo.setTableDesc(InitializeInput.getTableDesc(table)); partInfoList.add(partInfo); } @@ -119,6 +228,7 @@ //Non partitioned table PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters()); partInfo.setPartitionValues(new HashMap()); + partInfo.setTableDesc(InitializeInput.getTableDesc(table)); partInfoList.add(partInfo); } inputJobInfo.setPartitions(partInfoList); Index: mapreduce/PartInfo.java =================================================================== --- mapreduce/PartInfo.java (revision 1241662) +++ mapreduce/PartInfo.java (working copy) @@ -21,6 +21,9 @@ import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.TableDesc; + import org.apache.hcatalog.data.schema.HCatSchema; /** The Class used to serialize the partition information read from the metadata server that maps to a partition */ @@ -44,6 +47,9 @@ /** The map of partition key names and their values. */ private Map partitionValues; + /** Table Description for hive use */ + private TableDesc tableDesc; + /** * Instantiates a new hcat partition info. * @param partitionSchema the partition schema @@ -71,7 +77,7 @@ * Gets the value of input storage driver class name. * @return the input storage driver class name */ - public String getInputStorageDriverClass() { + public String getInputStorageHandlerClass() { return inputStorageDriverClass; } @@ -80,7 +86,7 @@ * Gets the value of hcatProperties. * @return the hcatProperties */ - public Properties getInputStorageDriverProperties() { + public Properties getInputStorageHandlerProperties() { return hcatProperties; } @@ -107,4 +113,12 @@ public Map getPartitionValues() { return partitionValues; } + + public void setTableDesc(TableDesc tblDesc) { + this.tableDesc = tblDesc; + } + + public TableDesc getTableDesc() { + return tableDesc; + } } Index: mapreduce/HCatEximInputFormat.java =================================================================== --- mapreduce/HCatEximInputFormat.java (revision 1241662) +++ mapreduce/HCatEximInputFormat.java (working copy) @@ -33,9 +33,14 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; + import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; @@ -137,4 +142,5 @@ } return partInfos; } + } Index: mapreduce/HCatSplit.java =================================================================== --- mapreduce/HCatSplit.java (revision 1241662) +++ mapreduce/HCatSplit.java (working copy) @@ -22,16 +22,20 @@ import java.io.IOException; import java.lang.reflect.Constructor; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; + import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; /** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */ -public class HCatSplit extends InputSplit implements Writable,org.apache.hadoop.mapred.InputSplit { +public class HCatSplit extends InputSplit + implements Writable,org.apache.hadoop.mapred.InputSplit { Log LOG = LogFactory.getLog(HCatSplit.class); @@ -40,9 +44,13 @@ /** The split returned by the underlying InputFormat split. */ private InputSplit baseSplit; + private org.apache.hadoop.mapred.InputSplit baseMapRedSplit; /** The schema for the HCatTable */ private HCatSchema tableSchema; + + private HiveConf hiveConf; + /** * Instantiates a new hcat split. */ @@ -56,9 +64,20 @@ * @param baseSplit the base split * @param tableSchema the table level schema */ - public HCatSplit(PartInfo partitionInfo, InputSplit baseSplit, HCatSchema tableSchema) { + public HCatSplit(PartInfo partitionInfo, + InputSplit baseSplit, HCatSchema tableSchema) + { + this.partitionInfo = partitionInfo; + this.baseSplit = baseSplit; + this.tableSchema = tableSchema; + } + + public HCatSplit(HiveConf hiveConf, PartInfo partitionInfo, + org.apache.hadoop.mapred.InputSplit baseMapRedSplit, + HCatSchema tableSchema) { + this.hiveConf = hiveConf; this.partitionInfo = partitionInfo; - this.baseSplit = baseSplit; + this.baseMapRedSplit = baseMapRedSplit; this.tableSchema = tableSchema; } @@ -74,8 +93,8 @@ * Gets the underlying InputSplit. * @return the baseSplit */ - public InputSplit getBaseSplit() { - return baseSplit; + public org.apache.hadoop.mapred.InputSplit getBaseSplit() { + return baseMapRedSplit; } /** @@ -180,4 +199,8 @@ String tableSchemaString = HCatUtil.serialize(tableSchema); WritableUtils.writeString(output, tableSchemaString); } + + public HiveConf getHiveConf() { + return hiveConf; + } }