Index: mapreduce/HCatSplit.java =================================================================== --- mapreduce/HCatSplit.java (revision 1241662) +++ mapreduce/HCatSplit.java (working copy) @@ -22,16 +22,21 @@ import java.io.IOException; import java.lang.reflect.Constructor; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; + import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; /** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */ -public class HCatSplit extends InputSplit implements Writable,org.apache.hadoop.mapred.InputSplit { +public class HCatSplit extends InputSplit + implements Writable,org.apache.hadoop.mapred.InputSplit { Log LOG = LogFactory.getLog(HCatSplit.class); @@ -40,9 +45,14 @@ /** The split returned by the underlying InputFormat split. */ private InputSplit baseSplit; + private org.apache.hadoop.mapred.InputSplit baseMapRedSplit; /** The schema for the HCatTable */ private HCatSchema tableSchema; + + private HiveConf hiveConf; + private StorageDescriptor storageDescriptor; + /** * Instantiates a new hcat split. */ @@ -56,10 +66,14 @@ * @param baseSplit the base split * @param tableSchema the table level schema */ - public HCatSplit(PartInfo partitionInfo, InputSplit baseSplit, HCatSchema tableSchema) { - this.partitionInfo = partitionInfo; - this.baseSplit = baseSplit; - this.tableSchema = tableSchema; + public HCatSplit(PartInfo partitionInfo, + org.apache.hadoop.mapred.InputSplit baseSplit, HCatSchema tableSchema, + StorageDescriptor storageDescriptor) + { + this.partitionInfo = partitionInfo; + this.baseMapRedSplit = baseSplit; + this.tableSchema = tableSchema; + this.storageDescriptor = storageDescriptor; } /** @@ -74,8 +88,8 @@ * Gets the underlying InputSplit. * @return the baseSplit */ - public InputSplit getBaseSplit() { - return baseSplit; + public org.apache.hadoop.mapred.InputSplit getBaseSplit() { + return baseMapRedSplit; } /** @@ -180,4 +194,8 @@ String tableSchemaString = HCatUtil.serialize(tableSchema); WritableUtils.writeString(output, tableSchemaString); } + + StorageDescriptor getStorageDescriptor() { + return storageDescriptor; + } } Index: common/HCatUtil.java =================================================================== --- common/HCatUtil.java (revision 1241662) +++ common/HCatUtil.java (working copy) @@ -31,21 +31,26 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; @@ -57,17 +62,23 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.data.DataType; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.HCatRecordSerDe; import org.apache.hcatalog.data.Pair; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.mapreduce.FosterStorageHandler; import org.apache.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hcatalog.storagehandler.HCatStorageHandler; +import org.apache.hcatalog.mapreduce.HCatStorageHandler; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.mapreduce.StorerInfo; import org.apache.thrift.TException; public class HCatUtil { - // static final private Log LOG = LogFactory.getLog(HCatUtil.class); +// static final private Log LOG = LogFactory.getLog(HCatUtil.class); public static boolean checkJobContextIfRunningFromBackend(JobContext j) { if (j.getConfiguration().get("mapred.task.id", "").equals("")) { @@ -391,7 +402,7 @@ public static void logStackTrace(Log logger) { StackTraceElement[] stackTrace = new Exception().getStackTrace(); for (int i = 1; i < stackTrace.length; i++) { - logger.info("\t" + stackTrace[i].toString()); + logger.debug("\t" + stackTrace[i].toString()); } } @@ -408,9 +419,9 @@ public static void logList(Log logger, String itemName, List list) { - logger.info(itemName + ":"); + logger.debug(itemName + ":"); for (Object item : list) { - logger.info("\t[" + item + "]"); + logger.debug("\t[" + item + "]"); } } @@ -449,20 +460,58 @@ logger.info("\tservice : " + t.getService()); } + /** + * Create an instance of a storage handler defined in storerInfo. If one cannot be found + * then FosterStorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe. + * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system. + * @param conf job's configuration will be used to configure the Configurable StorageHandler + * @param storerInfo StorerInfo to definining the StorageHandler and InputFormat, OutputFormat and SerDe + * @return storageHandler instance + * @throws IOException + */ + public static HCatStorageHandler getStorageHandler(Configuration conf, StorerInfo storerInfo) throws IOException { + return getStorageHandler(conf, + storerInfo.getStorageHandlerClass(), + storerInfo.getSerdeClass(), + storerInfo.getIfClass(), + storerInfo.getOfClass()); + } + + /** + * Create an instance of a storage handler. If storageHandler == null, + * then surrrogate StorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe. + * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system. + * @param conf job's configuration will be used to configure the Configurable StorageHandler + * @param storageHandler fully qualified class name of the desired StorageHandle instance + * @param serDe fully qualified class name of the desired SerDe instance + * @param inputFormat fully qualified class name of the desired InputFormat instance + * @param outputFormat fully qualified class name of the desired outputFormat instance + * @return storageHandler instance + * @throws IOException + */ public static HCatStorageHandler getStorageHandler(Configuration conf, - String className) throws HiveException { + String storageHandler, + String serDe, + String inputFormat, + String outputFormat) throws IOException { - if (className == null) { - return null; + + if (storageHandler == null) { + try { + return new FosterStorageHandler(inputFormat, + outputFormat, + serDe); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to load foster storage handler",e); + } } + try { Class handlerClass = (Class) Class - .forName(className, true, JavaUtils.getClassLoader()); - HCatStorageHandler storageHandler = (HCatStorageHandler) ReflectionUtils - .newInstance(handlerClass, conf); - return storageHandler; + .forName(storageHandler, true, JavaUtils.getClassLoader()); + return (HCatStorageHandler)ReflectionUtils.newInstance(handlerClass, conf); } catch (ClassNotFoundException e) { - throw new HiveException("Error in loading storage handler." + throw new IOException("Error in loading storage handler." + e.getMessage(), e); } } @@ -478,4 +527,167 @@ +". or
. Got " + tableName); } } + + public static void configureOutputStorageHandler(HCatStorageHandler storageHandler, + JobContext context, + OutputJobInfo outputJobInfo) { + //TODO replace IgnoreKeyTextOutputFormat with a HiveOutputFormatWrapper in StorageHandler + TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(), + storageHandler.getInputFormatClass(), + IgnoreKeyTextOutputFormat.class, + outputJobInfo.getTableInfo().getStorerInfo().getProperties()); + if(tableDesc.getJobProperties() == null) + tableDesc.setJobProperties(new HashMap()); + for (Map.Entry el: context.getConfiguration()) { + tableDesc.getJobProperties().put(el.getKey(),el.getValue()); + } + + Map jobProperties = new HashMap(); + try { + tableDesc.getJobProperties().put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); + + storageHandler.configureOutputJobProperties(tableDesc,jobProperties); + + for(Map.Entry el: jobProperties.entrySet()) { + context.getConfiguration().set(el.getKey(),el.getValue()); + } + } catch (IOException e) { + throw new IllegalStateException("Failed to configure StorageHandler",e); + } + } + + /** + * Replace the contents of dest with the contents of src + * @param src + * @param dest + */ + public static void copyConf(Configuration src, Configuration dest) { + dest.clear(); + for(Map.Entry el : src) { + dest.set(el.getKey(),el.getValue()); + } + } + + /** + * Get the input reader for this input format. This is responsible for + * ensuring the input is correctly read. + * @param context of the task + * @return an input split + * @throws IOException + * @throws InterruptedException + */ + public static HiveMetaStoreClient createHiveClient(String url, + Configuration conf, Class cls) + throws IOException, MetaException { + HiveConf hiveConf = getHiveConf(url, conf, cls); + try { + return new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + // LOG.error("Error connecting to the metastore (conf follows): " + // + e.getMessage(), e); + // HCatUtil.logHiveConf(LOG, hiveConf); + throw e; + } + } + + public static HiveConf getHiveConf (String url, + Configuration conf, Class cls) + throws IOException { + HiveConf hiveConf = new HiveConf(cls); + if (url != null) { + // User specified a thrift url + hiveConf.set("hive.metastore.local", "false"); + hiveConf.set(ConfVars.METASTOREURIS.varname, url); + + String kerberosPrincipal = conf.get( + HCatConstants.HCAT_METASTORE_PRINCIPAL); + if (kerberosPrincipal == null) { + kerberosPrincipal = conf.get( + ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); + } + if (kerberosPrincipal != null) { + hiveConf.setBoolean(ConfVars.METASTORE_USE_THRIFT_SASL.varname, + true); + hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, + kerberosPrincipal); + } + } else { + // Thrift url is null, + // copy the hive conf into the job conf and restore it + // in the backend + if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) { + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(hiveConf.getAllProperties())); + } else { + // Copy configuration properties into the hive conf + Properties properties = (Properties) HCatUtil.deserialize( + conf.get(HCatConstants.HCAT_KEY_HIVE_CONF)); + for (Map.Entry prop: properties.entrySet()) { + if (prop.getValue() instanceof String) { + hiveConf.set((String)prop.getKey(), (String)prop.getValue()); + } else if (prop.getValue() instanceof Integer) { + hiveConf.setInt((String)prop.getKey(), (Integer)prop.getValue()); + } else if (prop.getValue() instanceof Boolean) { + hiveConf.setBoolean((String)prop.getKey(), + (Boolean)prop.getValue()); + } else if (prop.getValue() instanceof Long) { + hiveConf.setLong((String)prop.getKey(), + (Long)prop.getValue()); + } else if (prop.getValue() instanceof Float) { + hiveConf.setFloat((String)prop.getKey(), (Float)prop.getValue()); + } + } + } + } + + if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + hiveConf.set("hive.metastore.token.signature", + conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); + } + + return hiveConf; + } + + public static int compareRecords(HCatRecord first, HCatRecord second) { + try { + return compareRecordContents(first.getAll(), second.getAll()); + + } catch (HCatException e) { + // uh-oh - we've hit a deserialization error in all likelihood + // we're likely not going to recover from this. Alright, the best + // we can do is throw a ClassCastException (the only thing throwable + // from a compareTo, and in a sense, inability to read the object + // is what got us here. + throw new ClassCastException(e.getMessage()); + } + } + + public static int compareRecordContents(List first, List second) { + int mySz = first.size(); + int urSz = second.size(); + if(mySz != urSz) { + return mySz - urSz; + } else { + for (int i = 0; i < first.size(); i++) { + int c = DataType.compare(first.get(i), second.get(i)); + if (c != 0) { + return c; + } + } + return 0; + } + } + + public static ObjectInspector getObjectInspector(String serdeClassName, + Configuration conf, Properties tbl) throws Exception { + SerDe s = (SerDe) Class.forName(serdeClassName).newInstance(); + s.initialize(conf, tbl); + return s.getObjectInspector(); + } + + public static ObjectInspector getHCatRecordObjectInspector(HCatSchema hsch) throws Exception{ + HCatRecordSerDe hrsd = new HCatRecordSerDe(); + hrsd.initialize(hsch); + return hrsd.getObjectInspector(); + } } Index: mapreduce/HCatInputFormat.java =================================================================== --- mapreduce/HCatInputFormat.java (revision 1241662) +++ mapreduce/HCatInputFormat.java (working copy) @@ -20,8 +20,13 @@ import java.io.IOException; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hcatalog.data.HCatRecord; + /** The InputFormat to use to read data from HCat */ public class HCatInputFormat extends HCatBaseInputFormat { @@ -42,6 +47,5 @@ throw new IOException(e); } } - - + } Index: mapreduce/HCatBaseInputFormat.java =================================================================== --- mapreduce/HCatBaseInputFormat.java (revision 1241662) +++ mapreduce/HCatBaseInputFormat.java (working copy) @@ -21,22 +21,48 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; +import java.util.Map; +import java.util.LinkedHashMap; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -public abstract class HCatBaseInputFormat extends InputFormat { +public abstract class HCatBaseInputFormat + extends InputFormat { /** * get the schema for the HCatRecord data returned by HCatInputFormat. @@ -44,8 +70,15 @@ * @param context the jobContext * @throws IllegalArgumentException */ - public static HCatSchema getOutputSchema(JobContext context) throws Exception { - String os = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); + private Class inputFileFormatClass; + + /** Jobproperties as required by Hive */ + private Map jobProperties; + + public static HCatSchema getOutputSchema(JobContext context) + throws Exception { + String os = context.getConfiguration().get( + HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); if (os == null) { return getTableSchema(context); } else { @@ -58,10 +91,19 @@ * @param job the job object * @param hcatSchema the schema to use as the consolidated schema */ - public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception { - job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema)); + public static void setOutputSchema(Job job,HCatSchema hcatSchema) + throws Exception { + job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, + HCatUtil.serialize(hcatSchema)); } + public static + org.apache.hadoop.mapred.InputFormat + getInputFormat (JobConf job, Class inputFormatClass) throws IOException { + return ( + org.apache.hadoop.mapred.InputFormat) + ReflectionUtils.newInstance(inputFormatClass, job); + } /** * Logically split the set of input files for the job. Returns the @@ -84,6 +126,34 @@ throw new IOException(e); } + StorageDescriptor tableSD; + HiveMetaStoreClient client = null; + JobConf jobConf; + try { + // To be able to call the storage handlers of hive, + // we need to convert the jobContext into a jobConf + // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) + // begin conversion.. + Configuration conf = jobContext.getConfiguration(); + jobConf = new JobConf(conf); + // ..end of conversion + + client = HCatUtil.createHiveClient(null, conf, HCatBaseInputFormat.class); + Table table = client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName()); + tableSD = table.getSd(); + } catch (Exception e) { + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } + } finally { + if (client != null) { + client.close(); + } + } + List splits = new ArrayList(); List partitionInfoList = inputJobInfo.getPartitions(); if(partitionInfoList == null ) { @@ -91,38 +161,66 @@ return splits; } + Path[] dirs = FileInputFormat.getInputPaths(jobConf); + HCatStorageHandler storageHandler; //For each matching partition, call getSplits on the underlying InputFormat for(PartInfo partitionInfo : partitionInfoList) { Job localJob = new Job(jobContext.getConfiguration()); - HCatInputStorageDriver storageDriver; try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); + storageHandler = HCatUtil.getStorageHandler( + jobContext.getConfiguration(), + partitionInfo.getInputStorageHandlerClass(), + tableSD.getSerdeInfo().getSerializationLib(), + tableSD.getInputFormat(), + tableSD.getOutputFormat()); + if (storageHandler != null) { + Map jobProperties = new LinkedHashMap(); + storageHandler.configureInputJobProperties( + partitionInfo.getTableDesc(), jobProperties); + if (!jobProperties.isEmpty()) { + partitionInfo.getTableDesc().setJobProperties(jobProperties); + } + } } catch (Exception e) { - throw new IOException(e); + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } } + setInputPath(jobContext, partitionInfo.getLocation()); + HCatSchema allCols = new HCatSchema(new LinkedList()); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getDataColumns().getFields()) allCols.append(field); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getPartitionColumns().getFields()) allCols.append(field); //Pass all required information to the storage driver - initStorageDriver(storageDriver, localJob, partitionInfo, allCols); + Utilities.copyTableJobPropertiesToConf(partitionInfo.getTableDesc(), + jobConf); //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getInputFormat(jobConf, inputFormatClass); //Call getSplit on the storage drivers InputFormat, create an //HCatSplit for each underlying split - List baseSplits = inputFormat.getSplits(localJob); + //NumSplits is 0 for our purposes + org.apache.hadoop.mapred.InputSplit[] baseSplits = + inputFormat.getSplits(jobConf, 0); - for(InputSplit split : baseSplits) { + for(org.apache.hadoop.mapred.InputSplit split : baseSplits) { splits.add(new HCatSplit( partitionInfo, split, - allCols)); + allCols, + tableSD)); } } @@ -141,36 +239,95 @@ * @throws IOException or InterruptedException */ @Override - public RecordReader createRecordReader(InputSplit split, + public RecordReader + createRecordReader(InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException { HCatSplit hcatSplit = (HCatSplit) split; PartInfo partitionInfo = hcatSplit.getPartitionInfo(); + JobContext jobContext = (JobContext)taskContext; + InputJobInfo inputJobInfo; - //If running through a Pig job, the InputJobInfo will not be available in the - //backend process context (since HCatLoader works on a copy of the JobContext and does + try { + inputJobInfo = getJobInfo(jobContext); + } catch (Exception e) { + throw new IOException(e); + } + + StorageDescriptor tableSD; + HiveMetaStoreClient client = null; + JobConf jobConf; + try { + // To be able to call the storage handlers of hive, + // we need to convert the jobContext into a jobConf + // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) + // begin conversion.. + Configuration conf = jobContext.getConfiguration(); + jobConf = new JobConf(conf); + // ..end of conversion + + client = HCatUtil.createHiveClient(null, conf, HCatBaseInputFormat.class); + Table table = client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName()); + tableSD = table.getSd(); + } catch (Exception e) { + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } + } finally { + if (client != null) { + client.close(); + } + } + + //If running through a Pig job, + //the InputJobInfo will not be available in the + //backend process context (since HCatLoader works + //on a copy of the JobContext and does //not call HCatInputFormat.setInput in the backend process). //So this function should NOT attempt to read the InputJobInfo. - - HCatInputStorageDriver storageDriver; + HCatStorageHandler storageHandler; try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); + storageHandler = HCatUtil.getStorageHandler( + jobContext.getConfiguration(), + partitionInfo.getInputStorageHandlerClass(), + tableSD.getSerdeInfo().getSerializationLib(), + tableSD.getInputFormat(), + tableSD.getOutputFormat()); + if (storageHandler != null) { + Map jobProperties = new LinkedHashMap(); + storageHandler.configureInputJobProperties( + partitionInfo.getTableDesc(), jobProperties); + if (!jobProperties.isEmpty()) { + partitionInfo.getTableDesc().setJobProperties(jobProperties); + } + } } catch (Exception e) { - throw new IOException(e); + if (e instanceof HCatException) { + throw (HCatException)e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } } - //Pass all required information to the storage driver - initStorageDriver(storageDriver, taskContext, partitionInfo, hcatSplit.getTableSchema()); + setInputPath(jobContext, partitionInfo.getLocation()); - //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getInputFormat(jobConf, inputFormatClass); + Utilities.copyTableJobPropertiesToConf(partitionInfo.getTableDesc(), + jobConf); + //Create the underlying input formats record record and an HCat wrapper - RecordReader recordReader = - inputFormat.createRecordReader(hcatSplit.getBaseSplit(), taskContext); + //need to pass reporter to the underlying getRecordReader + org.apache.hadoop.mapred.RecordReader recordReader = + inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, null); - return new HCatRecordReader(storageDriver,recordReader); + return new HCatRecordReader(storageHandler,recordReader); } /** @@ -208,62 +365,61 @@ return (InputJobInfo) HCatUtil.deserialize(jobString); } + public void setInputPath(JobContext jobContext, String location) throws IOException{ - /** - * Initializes the storage driver instance. Passes on the required - * schema information, path info and arguments for the supported - * features to the storage driver. - * @param storageDriver the storage driver - * @param context the job context - * @param partitionInfo the partition info - * @param tableSchema the table level schema - * @throws IOException Signals that an I/O exception has occurred. - */ - private void initStorageDriver(HCatInputStorageDriver storageDriver, - JobContext context, PartInfo partitionInfo, - HCatSchema tableSchema) throws IOException { + // ideally we should just call FileInputFormat.setInputPaths() here - but + // that won't work since FileInputFormat.setInputPaths() needs + // a Job object instead of a JobContext which we are handed here - storageDriver.setInputPath(context, partitionInfo.getLocation()); + int length = location.length(); + int curlyOpen = 0; + int pathStart = 0; + boolean globPattern = false; + List pathStrings = new ArrayList(); - if( partitionInfo.getPartitionSchema() != null ) { - storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema()); + for (int i=0; i driverClass = - (Class) - Class.forName(inputStorageDriverClass); - return driverClass.newInstance(); - } catch(Exception e) { - throw new Exception("error creating storage driver " + - inputStorageDriverClass, e); - } - } - }