Index: src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java =================================================================== --- src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (revision 1293367) +++ src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (working copy) @@ -178,7 +178,6 @@ private void verifyWriteRead(String tablename, String pigSchema, String tableSchema, List data, boolean provideSchemaToStorer) throws IOException, CommandNeedRetryException, ExecException, FrontendException { MockLoader.setData(tablename+"Input", data); - try { createTable(tablename, tableSchema); PigServer server = new PigServer(ExecType.LOCAL, props); @@ -188,13 +187,14 @@ server.registerQuery("STORE A into '"+tablename+"' using org.apache.hcatalog.pig.HCatStorer(" + (provideSchemaToStorer ? "'', '"+pigSchema+"'" : "") + ");"); + ExecJob execJob = server.executeBatch().get(0); if (!execJob.getStatistics().isSuccessful()) { throw new RuntimeException("Import failed", execJob.getException()); } - // test that schema was loaded correctly server.registerQuery("X = load '"+tablename+"' using org.apache.hcatalog.pig.HCatLoader();"); + server.dumpSchema("X"); Iterator it = server.openIterator("X"); int i = 0; while (it.hasNext()) { @@ -281,8 +281,8 @@ data.add(t); } - verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true); - verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false); + verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true); + verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false); } } Index: src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java (revision 1293367) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java (working copy) @@ -91,7 +91,6 @@ // assert that the table created has no hcat instrumentation, and that we're still able to read it. Table table = client.getTable("default", "junit_unparted_noisd"); - assertFalse(table.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS)); assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS)); PigServer server = new PigServer(ExecType.LOCAL, props); @@ -115,7 +114,6 @@ // assert that the table created still has no hcat instrumentation Table table2 = client.getTable("default", "junit_unparted_noisd"); - assertFalse(table2.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS)); assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS)); driver.run("drop table junit_unparted_noisd"); @@ -133,7 +131,6 @@ // assert that the table created has no hcat instrumentation, and that we're still able to read it. Table table = client.getTable("default", "junit_parted_noisd"); - assertFalse(table.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS)); assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS)); PigServer server = new PigServer(ExecType.LOCAL, props); @@ -158,14 +155,12 @@ // assert that the table created still has no hcat instrumentation Table table2 = client.getTable("default", "junit_parted_noisd"); - assertFalse(table2.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS)); assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS)); // assert that there is one partition present, and it had hcat instrumentation inserted when it was created. Partition ptn = client.getPartition("default", "junit_parted_noisd", Arrays.asList("42")); assertNotNull(ptn); - assertTrue(ptn.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS)); assertTrue(ptn.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS)); driver.run("drop table junit_unparted_noisd"); } Index: src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (revision 1293367) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (working copy) @@ -161,8 +161,7 @@ Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1")); assertNotNull(part); - StorerInfo storer = InitializeInput.extractStorerInfo(part.getSd(),part.getParameters()); - assertEquals(storer.getInputSDClass(), "testInputClass"); + StorerInfo storer = InternalUtil.extractStorerInfo(part.getSd(),part.getParameters()); assertEquals(storer.getProperties().get("hcat.testarg"), "testArgValue"); assertTrue(part.getSd().getLocation().indexOf("p1") != -1); } Index: src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java =================================================================== --- src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java (revision 1293367) +++ src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java (working copy) @@ -141,10 +141,12 @@ System.out.println("FOUR:"+s4.toString()); // Test LazyHCatRecord init and read - LazyHCatRecord s5 = new LazyHCatRecord(o3,testSD.getObjectInspector()); + LazyHCatRecord s5 = new LazyHCatRecord(o3,testSD.getObjectInspector(), + new HashMap()); System.out.println("FIVE:"+s5.toString()); - LazyHCatRecord s6 = new LazyHCatRecord(s4,hrsd.getObjectInspector()); + LazyHCatRecord s6 = new LazyHCatRecord(s4,hrsd.getObjectInspector(), + new HashMap()); System.out.println("SIX:"+s6.toString()); } Index: src/test/org/apache/hcatalog/data/TestLazyHCatRecord.java =================================================================== --- src/test/org/apache/hcatalog/data/TestLazyHCatRecord.java (revision 1293367) +++ src/test/org/apache/hcatalog/data/TestLazyHCatRecord.java (working copy) @@ -53,32 +53,33 @@ private final long LONG_CONST = 5000000000L; private final double DOUBLE_CONST = 3.141592654; private final String STRING_CONST = "hello world"; + private final String PART_CONST = "20120221"; public void testGet() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()); assertEquals(INT_CONST, ((Integer)r.get(0)).intValue()); assertEquals(LONG_CONST, ((Long)r.get(1)).longValue()); assertEquals(DOUBLE_CONST, ((Double)r.get(2)).doubleValue()); assertEquals(STRING_CONST, (String)r.get(3)); } - // TODO This test fails, but it seems to be an error in the schema, not in - // LazyHCatRecord. It get's an NPE inside getPosition. - /* public void testGetWithName() throws Exception { TypeInfo ti = getTypeInfo(); - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(ti)); - HCatSchema schema = HCatSchemaUtils.getHCatSchema(ti); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(ti), + new HashMap()); + HCatSchema schema = HCatSchemaUtils.getHCatSchema(ti) + .get(0).getStructSubSchema(); assertEquals(INT_CONST, ((Integer)r.get("an_int", schema)).intValue()); assertEquals(LONG_CONST, ((Long)r.get("a_long", schema)).longValue()); assertEquals(DOUBLE_CONST, ((Double)r.get("a_double", schema)).doubleValue()); assertEquals(STRING_CONST, (String)r.get("a_string", schema)); } - */ public void testGetAll() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()); List list = r.getAll(); assertEquals(INT_CONST, ((Integer)list.get(0)).intValue()); assertEquals(LONG_CONST, ((Long)list.get(1)).longValue()); @@ -87,7 +88,8 @@ } public void testSet() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()); boolean sawException = false; try { r.set(3, "Mary had a little lamb"); @@ -98,12 +100,14 @@ } public void testSize() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()); assertEquals(4, r.size()); } public void testReadFields() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()); boolean sawException = false; try { r.readFields(null); @@ -114,7 +118,8 @@ } public void testWrite() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()); boolean sawException = false; try { r.write(null); @@ -125,7 +130,8 @@ } public void testSetWithName() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()); boolean sawException = false; try { r.set("fred", null, "bob"); @@ -136,7 +142,8 @@ } public void testRemove() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()); boolean sawException = false; try { r.remove(0); @@ -147,7 +154,8 @@ } public void testCopy() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()); boolean sawException = false; try { r.copy(null); @@ -157,6 +165,66 @@ assertTrue(sawException); } + public void testGetWritable() throws Exception { + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + new HashMap()).getWritable(); + assertEquals(INT_CONST, ((Integer)r.get(0)).intValue()); + assertEquals(LONG_CONST, ((Long)r.get(1)).longValue()); + assertEquals(DOUBLE_CONST, ((Double)r.get(2)).doubleValue()); + assertEquals(STRING_CONST, (String)r.get(3)); + assertEquals("org.apache.hcatalog.data.DefaultHCatRecord", r.getClass().getName()); + } + + public void testGetPartitioned() throws Exception { + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + getPartCols()); + assertEquals(INT_CONST, ((Integer)r.get(0)).intValue()); + assertEquals(LONG_CONST, ((Long)r.get(1)).longValue()); + assertEquals(DOUBLE_CONST, ((Double)r.get(2)).doubleValue()); + assertEquals(STRING_CONST, (String)r.get(3)); + assertEquals(PART_CONST, (String)r.get(4)); + } + + public void testGetWithNamePartitioned() throws Exception { + TypeInfo ti = getTypeInfo(); + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(ti), + getPartCols()); + HCatSchema schema = HCatSchemaUtils.getHCatSchema(ti) + .get(0).getStructSubSchema(); + assertEquals(INT_CONST, ((Integer)r.get("an_int", schema)).intValue()); + assertEquals(LONG_CONST, ((Long)r.get("a_long", schema)).longValue()); + assertEquals(DOUBLE_CONST, ((Double)r.get("a_double", schema)).doubleValue()); + assertEquals(STRING_CONST, (String)r.get("a_string", schema)); + } + + public void testGetAllPartitioned() throws Exception { + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + getPartCols()); + List list = r.getAll(); + assertEquals(INT_CONST, ((Integer)list.get(0)).intValue()); + assertEquals(LONG_CONST, ((Long)list.get(1)).longValue()); + assertEquals(DOUBLE_CONST, ((Double)list.get(2)).doubleValue()); + assertEquals(STRING_CONST, (String)list.get(3)); + assertEquals(PART_CONST, (String)r.get(4)); + } + + public void testSizePartitioned() throws Exception { + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + getPartCols()); + assertEquals(5, r.size()); + } + + public void testGetWritablePartitioned() throws Exception { + HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(), + getPartCols()).getWritable(); + assertEquals(INT_CONST, ((Integer)r.get(0)).intValue()); + assertEquals(LONG_CONST, ((Long)r.get(1)).longValue()); + assertEquals(DOUBLE_CONST, ((Double)r.get(2)).doubleValue()); + assertEquals(STRING_CONST, (String)r.get(3)); + assertEquals(PART_CONST, (String)r.get(4)); + assertEquals("org.apache.hcatalog.data.DefaultHCatRecord", r.getClass().getName()); + } + private HCatRecord getHCatRecord() throws Exception { List rec_1 = new ArrayList(4); @@ -185,22 +253,19 @@ } - public void testGetWritable() throws Exception { - HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()).getWritable(); - assertEquals(INT_CONST, ((Integer)r.get(0)).intValue()); - assertEquals(LONG_CONST, ((Long)r.get(1)).longValue()); - assertEquals(DOUBLE_CONST, ((Double)r.get(2)).doubleValue()); - assertEquals(STRING_CONST, (String)r.get(3)); - assertEquals("org.apache.hcatalog.data.DefaultHCatRecord", r.getClass().getName()); - } - - private ObjectInspector getObjectInspector(TypeInfo ti) throws Exception { - return HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector((StructTypeInfo)ti); + return HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector( + (StructTypeInfo)ti); } private ObjectInspector getObjectInspector() throws Exception { - return - HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector((StructTypeInfo)getTypeInfo()); + return HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector( + (StructTypeInfo)getTypeInfo()); } + + private Map getPartCols() { + Map pc = new HashMap(1); + pc.put(4, PART_CONST); + return pc; + } } Index: src/java/org/apache/hcatalog/pig/HCatLoader.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatLoader.java (revision 1293367) +++ src/java/org/apache/hcatalog/pig/HCatLoader.java (working copy) @@ -88,11 +88,12 @@ // the Configuration if (!HCatUtil.checkJobContextIfRunningFromBackend(job)){ HCatInputFormat.setInput(job, - InputJobInfo.create(dbName, - tableName, - getPartitionFilterString(), - hcatServerUri != null ? hcatServerUri : (hcatServerUri = PigHCatUtil.getHCatServerUri(job)), - PigHCatUtil.getHCatServerPrincipal(job))); + InputJobInfo.create(dbName, + tableName, + getPartitionFilterString(), + hcatServerUri != null ? hcatServerUri : + (hcatServerUri = PigHCatUtil.getHCatServerUri(job)), + PigHCatUtil.getHCatServerPrincipal(job))); } // Need to also push projections by calling setOutputSchema on Index: src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (working copy) @@ -137,15 +137,17 @@ * @throws IOException */ static HCatTableInfo valueOf(Table table) throws IOException { - HCatSchema dataColumns = HCatUtil.extractSchemaFromStorageDescriptor(table.getSd()); - StorerInfo storerInfo = InternalUtil.extractStorerInfo(table.getSd(), table.getParameters()); + HCatSchema dataColumns = + HCatUtil.extractSchemaFromStorageDescriptor(table.getSd()); + StorerInfo storerInfo = + InternalUtil.extractStorerInfo(table.getSd(), table.getParameters()); HCatSchema partitionColumns = HCatUtil.getPartitionColumns(table); return new HCatTableInfo(table.getDbName(), - table.getTableName(), - dataColumns, - partitionColumns, - storerInfo, - table); + table.getTableName(), + dataColumns, + partitionColumns, + storerInfo, + table); } @Override Index: src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (working copy) @@ -21,22 +21,49 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.SerDe; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -public abstract class HCatBaseInputFormat extends InputFormat { +public abstract class HCatBaseInputFormat + extends InputFormat { /** * get the schema for the HCatRecord data returned by HCatInputFormat. @@ -44,8 +71,13 @@ * @param context the jobContext * @throws IllegalArgumentException */ - public static HCatSchema getOutputSchema(JobContext context) throws Exception { - String os = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); + private Class inputFileFormatClass; + + // TODO needs to go in InitializeInput? as part of InputJobInfo + public static HCatSchema getOutputSchema(JobContext context) + throws IOException { + String os = context.getConfiguration().get( + HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); if (os == null) { return getTableSchema(context); } else { @@ -58,10 +90,19 @@ * @param job the job object * @param hcatSchema the schema to use as the consolidated schema */ - public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception { - job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema)); + public static void setOutputSchema(Job job,HCatSchema hcatSchema) + throws IOException { + job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, + HCatUtil.serialize(hcatSchema)); } + private static + org.apache.hadoop.mapred.InputFormat + getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException { + return ( + org.apache.hadoop.mapred.InputFormat) + ReflectionUtils.newInstance(inputFormatClass, job); + } /** * Logically split the set of input files for the job. Returns the @@ -91,34 +132,39 @@ return splits; } + HCatStorageHandler storageHandler; + JobConf jobConf; + Configuration conf = jobContext.getConfiguration(); //For each matching partition, call getSplits on the underlying InputFormat for(PartInfo partitionInfo : partitionInfoList) { - Job localJob = new Job(jobContext.getConfiguration()); - HCatInputStorageDriver storageDriver; - try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); - } catch (Exception e) { - throw new IOException(e); - } + jobConf = HCatUtil.getJobConfFromContext(jobContext); + setInputPath(jobConf, partitionInfo.getLocation()); + Map jobProperties = partitionInfo.getJobProperties(); HCatSchema allCols = new HCatSchema(new LinkedList()); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getDataColumns().getFields()) allCols.append(field); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getPartitionColumns().getFields()) allCols.append(field); - //Pass all required information to the storage driver - initStorageDriver(storageDriver, localJob, partitionInfo, allCols); + HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); - //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + storageHandler = partitionInfo.getStorageHandler(); - //Call getSplit on the storage drivers InputFormat, create an + //Get the input format + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getMapRedInputFormat(jobConf, inputFormatClass); + + //Call getSplit on the InputFormat, create an //HCatSplit for each underlying split - List baseSplits = inputFormat.getSplits(localJob); + //NumSplits is 0 for our purposes + org.apache.hadoop.mapred.InputSplit[] baseSplits = + inputFormat.getSplits(jobConf, 0); - for(InputSplit split : baseSplits) { + for(org.apache.hadoop.mapred.InputSplit split : baseSplits) { splits.add(new HCatSplit( partitionInfo, split, @@ -141,36 +187,66 @@ * @throws IOException or InterruptedException */ @Override - public RecordReader createRecordReader(InputSplit split, + public RecordReader + createRecordReader(InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException { HCatSplit hcatSplit = (HCatSplit) split; PartInfo partitionInfo = hcatSplit.getPartitionInfo(); + JobContext jobContext = taskContext; - //If running through a Pig job, the InputJobInfo will not be available in the - //backend process context (since HCatLoader works on a copy of the JobContext and does - //not call HCatInputFormat.setInput in the backend process). - //So this function should NOT attempt to read the InputJobInfo. + HCatStorageHandler storageHandler = partitionInfo.getStorageHandler(); + JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext); - HCatInputStorageDriver storageDriver; + Class inputFormatClass = storageHandler.getInputFormatClass(); + org.apache.hadoop.mapred.InputFormat inputFormat = + getMapRedInputFormat(jobConf, inputFormatClass); + + Map jobProperties = partitionInfo.getJobProperties(); + HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); + Reporter reporter = InternalUtil.createReporter(taskContext); + org.apache.hadoop.mapred.RecordReader recordReader = + inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, reporter); + + SerDe serde; try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); + serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), + jobContext.getConfiguration()); + +// HCatUtil.logEntrySet(LOG, "props to serde", properties.entrySet()); + + Configuration conf = storageHandler.getConf(); + InternalUtil.initializeInputSerDe(serde, conf, + partitionInfo.getTableInfo()); + } catch (Exception e) { - throw new IOException(e); + throw new IOException("Unable to create objectInspector " + + "for serde class " + storageHandler.getSerDeClass().getName() + + e); } - //Pass all required information to the storage driver - initStorageDriver(storageDriver, taskContext, partitionInfo, hcatSplit.getTableSchema()); + Map partCols = getPartColsByPosition(partitionInfo, + hcatSplit); - //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + HCatRecordReader hcatRecordReader = new HCatRecordReader(storageHandler, + recordReader, + serde, + partCols); + return hcatRecordReader; + } - //Create the underlying input formats record record and an HCat wrapper - RecordReader recordReader = - inputFormat.createRecordReader(hcatSplit.getBaseSplit(), taskContext); + /** gets the partition columns that are not part of the Hive storage */ + private static Map getPartColsByPosition(PartInfo partInfo, + HCatSplit split) + { + Map partCols = new HashMap(); - return new HCatRecordReader(storageDriver,recordReader); + for (String partitionKey : partInfo.getPartitionValues().keySet()) { + partCols.put(split.getSchema().getPosition(partitionKey), + partInfo.getPartitionValues().get(partitionKey)); + } + + return partCols; } /** @@ -179,14 +255,18 @@ * has been called for a JobContext. * @param context the context * @return the table schema - * @throws Exception if HCatInputFromat.setInput has not been called for the current context + * @throws IOException if HCatInputFormat.setInput has not been called + * for the current context */ - public static HCatSchema getTableSchema(JobContext context) throws Exception { + public static HCatSchema getTableSchema(JobContext context) + throws IOException { InputJobInfo inputJobInfo = getJobInfo(context); HCatSchema allCols = new HCatSchema(new LinkedList()); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getDataColumns().getFields()) allCols.append(field); - for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields()) + for(HCatFieldSchema field: + inputJobInfo.getTableInfo().getPartitionColumns().getFields()) allCols.append(field); return allCols; } @@ -197,73 +277,74 @@ * exception since that means HCatInputFormat.setInput has not been called. * @param jobContext the job context * @return the InputJobInfo object - * @throws Exception the exception + * @throws IOException the exception */ - private static InputJobInfo getJobInfo(JobContext jobContext) throws Exception { - String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); + private static InputJobInfo getJobInfo(JobContext jobContext) + throws IOException { + String jobString = jobContext.getConfiguration().get( + HCatConstants.HCAT_KEY_JOB_INFO); if( jobString == null ) { - throw new Exception("job information not found in JobContext. HCatInputFormat.setInput() not called?"); + throw new IOException("job information not found in JobContext." + + " HCatInputFormat.setInput() not called?"); } return (InputJobInfo) HCatUtil.deserialize(jobString); } + private void setInputPath(JobConf jobConf, String location) + throws IOException{ - /** - * Initializes the storage driver instance. Passes on the required - * schema information, path info and arguments for the supported - * features to the storage driver. - * @param storageDriver the storage driver - * @param context the job context - * @param partitionInfo the partition info - * @param tableSchema the table level schema - * @throws IOException Signals that an I/O exception has occurred. - */ - private void initStorageDriver(HCatInputStorageDriver storageDriver, - JobContext context, PartInfo partitionInfo, - HCatSchema tableSchema) throws IOException { + // ideally we should just call FileInputFormat.setInputPaths() here - but + // that won't work since FileInputFormat.setInputPaths() needs + // a Job object instead of a JobContext which we are handed here - storageDriver.setInputPath(context, partitionInfo.getLocation()); + int length = location.length(); + int curlyOpen = 0; + int pathStart = 0; + boolean globPattern = false; + List pathStrings = new ArrayList(); - if( partitionInfo.getPartitionSchema() != null ) { - storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema()); + for (int i=0; i driverClass = - (Class) - Class.forName(inputStorageDriverClass); - return driverClass.newInstance(); - } catch(Exception e) { - throw new Exception("error creating storage driver " + - inputStorageDriverClass, e); - } - } - } Index: src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java (working copy) @@ -56,6 +56,9 @@ /** implementation specific job properties */ private Properties properties; + /** job properties */ + private Map jobProperties; + /** * Initializes a new InputJobInfo * for reading data from a table. @@ -70,19 +73,21 @@ * The special string _HOST will be replaced automatically with the correct host name */ public static InputJobInfo create(String databaseName, - String tableName, - String filter, - String serverUri, - String serverKerberosPrincipal) { - return new InputJobInfo(databaseName,tableName,filter,serverUri,serverKerberosPrincipal); + String tableName, + String filter, + String serverUri, + String serverKerberosPrincipal) { + return new InputJobInfo(databaseName, tableName, filter, + serverUri, serverKerberosPrincipal); } private InputJobInfo(String databaseName, - String tableName, - String filter, - String serverUri, - String serverKerberosPrincipal) { - this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName; + String tableName, + String filter, + String serverUri, + String serverKerberosPrincipal) { + this.databaseName = (databaseName == null) ? + MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName; this.tableName = tableName; this.serverUri = serverUri; this.serverKerberosPrincipal = serverKerberosPrincipal; @@ -169,5 +174,4 @@ public Properties getProperties() { return properties; } - } Index: src/java/org/apache/hcatalog/mapreduce/InternalUtil.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (working copy) @@ -34,25 +34,19 @@ class InternalUtil { static StorerInfo extractStorerInfo(StorageDescriptor sd, Map properties) throws IOException { - String inputSDClass, outputSDClass; - - if (properties.containsKey(HCatConstants.HCAT_ISD_CLASS)){ - inputSDClass = properties.get(HCatConstants.HCAT_ISD_CLASS); - }else{ - // attempt to default to RCFile if the storage descriptor says it's an RCFile - if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){ - inputSDClass = HCatConstants.HCAT_RCFILE_ISD_CLASS; - }else{ - throw new IOException("No input storage driver classname found for table, cannot write partition"); - } - } - Properties hcatProperties = new Properties(); for (String key : properties.keySet()){ hcatProperties.put(key, properties.get(key)); } + + // also populate with StorageDescriptor->SerDe.Parameters + for (Map.Entryparam : + sd.getSerdeInfo().getParameters().entrySet()) { + hcatProperties.put(param.getKey(), param.getValue()); + } - return new StorerInfo(inputSDClass, null, + + return new StorerInfo(null, null, sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(), properties.get(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE), hcatProperties); @@ -120,9 +114,24 @@ //TODO this has to find a better home, it's also hardcoded as default in hive would be nice // if the default was decided by the serde - static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo) throws SerDeException { + static void initializeOutputSerDe(SerDe serDe, Configuration conf, + OutputJobInfo jobInfo) + throws SerDeException { + initializeSerDe(serDe, conf, jobInfo.getTableInfo(), + jobInfo.getOutputSchema()); + } + + static void initializeInputSerDe(SerDe serDe, Configuration conf, + HCatTableInfo info) + throws SerDeException { + initializeSerDe(serDe, conf, info, info.getDataColumns()); + } + + static void initializeSerDe(SerDe serDe, Configuration conf, + HCatTableInfo info, HCatSchema s) + throws SerDeException { Properties props = new Properties(); - List fields = HCatUtil.getFieldSchemaList(jobInfo.getOutputSchema().getFields()); + List fields = HCatUtil.getFieldSchemaList(s.getFields()); props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, @@ -133,14 +142,12 @@ props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1"); //add props from params set in table schema - props.putAll(jobInfo.getTableInfo().getStorerInfo().getProperties()); - //add props from job properties - props.putAll(jobInfo.getProperties()); + props.putAll(info.getStorerInfo().getProperties()); serDe.initialize(conf,props); } - static Reporter createReporter(TaskAttemptContext context) { +static Reporter createReporter(TaskAttemptContext context) { return new ProgressReporter(context); } Index: src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (working copy) @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -91,9 +92,11 @@ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); try { + HiveConf hiveConf = HCatUtil.getHiveConf(null, + context.getConfiguration()); handleDuplicatePublish(context, jobInfo, - HCatOutputFormat.createHiveClient(null,context.getConfiguration()), + HCatUtil.createHiveClient(hiveConf), jobInfo.getTableInfo().getTable()); } catch (MetaException e) { throw new IOException(e); Index: src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (working copy) @@ -20,6 +20,7 @@ import org.apache.hcatalog.common.HCatUtil; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,7 +30,8 @@ * artifacts of tables which don't define a SerDe. This StorageHandler assumes * the supplied storage artifacts are for a file-based storage system. */ -public class FosterStorageHandler extends HCatStorageHandler { +public class FosterStorageHandler extends HCatStorageHandler + implements Serializable { public Configuration conf ; @@ -77,21 +79,27 @@ } @Override - public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { + public void configureInputJobProperties(TableDesc tableDesc, + Map jobProperties) { } @Override - public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { + public void configureOutputJobProperties(TableDesc tableDesc, + Map jobProperties) { try { - OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(tableDesc.getJobProperties().get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + OutputJobInfo jobInfo = (OutputJobInfo) + HCatUtil.deserialize(tableDesc.getJobProperties().get( + HCatConstants.HCAT_KEY_OUTPUT_INFO)); String parentPath = jobInfo.getTableInfo().getTableLocation(); - String dynHash = tableDesc.getJobProperties().get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID); + String dynHash = tableDesc.getJobProperties().get( + HCatConstants.HCAT_DYNAMIC_PTN_JOBID); // For dynamic partitioned writes without all keyvalues specified, // we create a temp dir for the associated write job if (dynHash != null){ - parentPath = new Path(parentPath, DYNTEMP_DIR_NAME+dynHash).toString(); + parentPath = new Path(parentPath, + DYNTEMP_DIR_NAME+dynHash).toString(); } String outputLocation; @@ -105,7 +113,9 @@ List values = new ArrayList(); //sort the cols and vals - for(String name: jobInfo.getTableInfo().getPartitionColumns().getFieldNames()) { + for(String name: + jobInfo.getTableInfo(). + getPartitionColumns().getFieldNames()) { String value = jobInfo.getPartitionValues().get(name); int i=0; while(i 0) @@ -119,15 +129,17 @@ jobInfo.setLocation(new Path(parentPath,outputLocation).toString()); //only set output dir if partition is fully materialized - if(jobInfo.getPartitionValues().size() == jobInfo.getTableInfo().getPartitionColumns().size()) { + if(jobInfo.getPartitionValues().size() + == jobInfo.getTableInfo().getPartitionColumns().size()) { jobProperties.put("mapred.output.dir", jobInfo.getLocation()); } //TODO find a better home for this, RCFile specifc jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR, - Integer.toOctalString(jobInfo.getOutputSchema().getFields().size())); + Integer.toOctalString( + jobInfo.getOutputSchema().getFields().size())); jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, - HCatUtil.serialize(jobInfo)); + HCatUtil.serialize(jobInfo)); } catch (IOException e) { throw new IllegalStateException("Failed to set output path",e); } @@ -135,7 +147,8 @@ } @Override - OutputFormatContainer getOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat outputFormat) { + OutputFormatContainer getOutputFormatContainer( + org.apache.hadoop.mapred.OutputFormat outputFormat) { return new FileOutputFormatContainer(outputFormat); } @@ -150,7 +163,8 @@ } @Override - public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { + public HiveAuthorizationProvider getAuthorizationProvider() + throws HiveException { return new DefaultHiveAuthorizationProvider(); } Index: src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.mapred.HCatMapRedUtil; import org.apache.hadoop.mapreduce.JobContext; @@ -27,6 +28,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; /** * Part of the DefaultOutput*Container classes @@ -88,7 +90,9 @@ //Cancel HCat and JobTracker tokens try { - HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration()); + HiveConf hiveConf = HCatUtil.getHiveConf(null, + context.getConfiguration()); + HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf); String tokenStrForm = client.getTokenStrForm(); if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); Index: src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (working copy) @@ -18,71 +18,118 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.SerDe; + +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.LazyHCatRecord; -/** The HCat wrapper for the underlying RecordReader, this ensures that the initialize on - * the underlying record reader is done with the underlying split, not with HCatSplit. +/** The HCat wrapper for the underlying RecordReader, + * this ensures that the initialize on + * the underlying record reader is done with the underlying split, + * not with HCatSplit. */ -class HCatRecordReader extends RecordReader - implements org.apache.hadoop.mapred.RecordReader { +class HCatRecordReader extends RecordReader { Log LOG = LogFactory.getLog(HCatRecordReader.class); - int lineCount = 0; + WritableComparable currentKey; + Writable currentValue; /** The underlying record reader to delegate to. */ - private final RecordReader baseRecordReader; + //org.apache.hadoop.mapred. + private final org.apache.hadoop.mapred.RecordReader + baseRecordReader; - /** The storage driver used */ - private final HCatInputStorageDriver storageDriver; + /** The storage handler used */ + private final HCatStorageHandler storageHandler; + private SerDe serde; + + private Map partCols; + /** * Instantiates a new hcat record reader. * @param baseRecordReader the base record reader */ - public HCatRecordReader(HCatInputStorageDriver storageDriver, RecordReader baseRecordReader) { - this.baseRecordReader = baseRecordReader; - this.storageDriver = storageDriver; + public HCatRecordReader(HCatStorageHandler storageHandler, + org.apache.hadoop.mapred.RecordReader baseRecordReader, + SerDe serde, + Map partCols) { + this.baseRecordReader = baseRecordReader; + this.storageHandler = storageHandler; + this.serde = serde; + this.partCols = partCols; } - + /* (non-Javadoc) - * @see org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) + * @see org.apache.hadoop.mapreduce.RecordReader#initialize( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override - public void initialize(InputSplit split, TaskAttemptContext taskContext) + public void initialize(org.apache.hadoop.mapreduce.InputSplit split, + TaskAttemptContext taskContext) throws IOException, InterruptedException { - InputSplit baseSplit = split; + org.apache.hadoop.mapred.InputSplit baseSplit; if( split instanceof HCatSplit ) { baseSplit = ((HCatSplit) split).getBaseSplit(); + } else { + throw new IOException("Not a HCatSplit"); } - baseRecordReader.initialize(baseSplit, taskContext); + Properties properties = new Properties(); + for (Map.Entryparam : + ((HCatSplit)split).getPartitionInfo() + .getJobProperties().entrySet()) { + properties.setProperty(param.getKey(), param.getValue()); + } } /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey() */ @Override - public WritableComparable getCurrentKey() throws IOException, InterruptedException { - return baseRecordReader.getCurrentKey(); + public WritableComparable getCurrentKey() + throws IOException, InterruptedException { + return currentKey; } /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() */ @Override - public HCatRecord getCurrentValue() throws IOException, InterruptedException { - HCatRecord r = storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue()); - return r; + public HCatRecord getCurrentValue() + throws IOException, InterruptedException { + HCatRecord r; + + try { + r = new DefaultHCatRecord((new LazyHCatRecord( + serde.deserialize(currentValue), + serde.getObjectInspector(), + partCols)).getAll()); + } catch (Exception e) { + throw new IOException("Failed to create HCatRecord " + e); + } + return r; } /* (non-Javadoc) @@ -95,9 +142,6 @@ } catch (IOException e) { LOG.warn(e.getMessage()); LOG.warn(e.getStackTrace()); - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); } return 0.0f; // errored } @@ -107,8 +151,13 @@ */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { - lineCount++; - return baseRecordReader.nextKeyValue(); + if (currentKey == null) { + currentKey = baseRecordReader.createKey(); + currentValue = baseRecordReader.createValue(); + } + + return baseRecordReader.next(currentKey, + currentValue); } /* (non-Javadoc) @@ -119,45 +168,4 @@ baseRecordReader.close(); } - @Override - public Object createKey() { - WritableComparable o = null; - try { - o = getCurrentKey(); - } catch (IOException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); - } - return o; - } - - @Override - public Object createValue() { - return new DefaultHCatRecord(); - } - - @Override - public long getPos() throws IOException { - return lineCount; - } - - @Override - public boolean next(Object key, Object value) throws IOException { - try { - if (!nextKeyValue()){ - return false; - } - - ((HCatRecord)value).copy(getCurrentValue()); - - return true; - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); - } - return false; - } } Index: src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; @@ -226,7 +227,9 @@ } static void cancelDelegationTokens(JobContext context, OutputJobInfo outputJobInfo) throws Exception { - HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration()); + HiveConf hiveConf = HCatUtil.getHiveConf(null, + context.getConfiguration()); + HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by // HCatOutputFormat and not if they were supplied by Oozie. In the latter Index: src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (working copy) @@ -69,11 +69,13 @@ @SuppressWarnings("unchecked") public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException { HiveMetaStoreClient client = null; + HiveConf hiveConf = null; try { Configuration conf = job.getConfiguration(); - client = createHiveClient(null, conf); + hiveConf = HCatUtil.getHiveConf(null, conf); + client = HCatUtil.createHiveClient(hiveConf); Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName()); if (table.getPartitionKeysSize() == 0 ){ @@ -141,7 +143,9 @@ //Serialize the output info into the configuration outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); outputJobInfo.setOutputSchema(tableSchema); + harRequested = getHarRequested(hiveConf); outputJobInfo.setHarRequested(harRequested); + maxDynamicPartitions = getMaxDynamicPartitions(hiveConf); outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions); HCatUtil.configureOutputStorageHandler(storageHandler,job,outputJobInfo); @@ -222,76 +226,21 @@ return getOutputFormat(context).getOutputCommitter(context); } - //TODO remove url component, everything should be encapsulated in HiveConf - static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException { - HiveConf hiveConf = getHiveConf(url, conf); -// HCatUtil.logHiveConf(LOG, hiveConf); - try { - return new HiveMetaStoreClient(hiveConf); - } catch (MetaException e) { - LOG.error("Error connecting to the metastore (conf follows): "+e.getMessage(), e); - HCatUtil.logHiveConf(LOG, hiveConf); - throw e; - } - } + private static int getMaxDynamicPartitions(HiveConf hConf) { + // by default the bounds checking for maximum number of + // dynamic partitions is disabled (-1) + int maxDynamicPartitions = -1; - - static HiveConf getHiveConf(String url, Configuration conf) throws IOException { - HiveConf hiveConf = new HiveConf(HCatOutputFormat.class); - - if( url != null ) { - //User specified a thrift url - - hiveConf.set("hive.metastore.local", "false"); - hiveConf.set(ConfVars.METASTOREURIS.varname, url); - - String kerberosPrincipal = conf.get(HCatConstants.HCAT_METASTORE_PRINCIPAL); - if (kerberosPrincipal == null){ - kerberosPrincipal = conf.get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); - } - if (kerberosPrincipal != null){ - hiveConf.setBoolean(ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); - hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, kerberosPrincipal); - } - } else { - //Thrift url is null, copy the hive conf into the job conf and restore it - //in the backend context - - if( conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null ) { - conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(hiveConf.getAllProperties())); - } else { - //Copy configuration properties into the hive conf - Properties properties = (Properties) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_HIVE_CONF)); - - for(Map.Entry prop : properties.entrySet() ) { - if( prop.getValue() instanceof String ) { - hiveConf.set((String) prop.getKey(), (String) prop.getValue()); - } else if( prop.getValue() instanceof Integer ) { - hiveConf.setInt((String) prop.getKey(), (Integer) prop.getValue()); - } else if( prop.getValue() instanceof Boolean ) { - hiveConf.setBoolean((String) prop.getKey(), (Boolean) prop.getValue()); - } else if( prop.getValue() instanceof Long ) { - hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue()); - } else if( prop.getValue() instanceof Float ) { - hiveConf.setFloat((String) prop.getKey(), (Float) prop.getValue()); - } - } - } - - } - - if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { - hiveConf.set("hive.metastore.token.signature", conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); - } - - // figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ - maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); - }else{ - maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions + maxDynamicPartitions = hConf.getIntVar( + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); } - harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED); - return hiveConf; + + return maxDynamicPartitions; } + private static boolean getHarRequested(HiveConf hConf) { + return hConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED); + } + } Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (working copy) @@ -27,14 +27,30 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.Deserializer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; + import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -91,16 +107,17 @@ client = new HiveMetaStoreClient(hiveConf, null); } Table table = client.getTable(inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName()); + inputJobInfo.getTableName()); List partInfoList = new ArrayList(); + inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); if( table.getPartitionKeys().size() != 0 ) { //Partitioned table List parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName(), - inputJobInfo.getFilter(), - (short) -1); + inputJobInfo.getTableName(), + inputJobInfo.getFilter(), + (short) -1); // Default to 100,000 partitions if hive.metastore.maxpartition is not defined int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000); @@ -110,19 +127,22 @@ // populate partition info for (Partition ptn : parts){ - PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters()); - partInfo.setPartitionValues(createPtnKeyValueMap(table,ptn)); + PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(), + job.getConfiguration(), + inputJobInfo); + partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); } }else{ //Non partitioned table - PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters()); + PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters(), + job.getConfiguration(), + inputJobInfo); partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); } inputJobInfo.setPartitions(partInfoList); - inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); return HCatUtil.serialize(inputJobInfo); } finally { @@ -154,65 +174,33 @@ return ptnKeyValues; } - static PartInfo extractPartInfo(StorageDescriptor sd, Map parameters) throws IOException{ + static PartInfo extractPartInfo(StorageDescriptor sd, + Map parameters, Configuration conf, + InputJobInfo inputJobInfo) throws IOException{ HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); - String inputStorageDriverClass = null; + StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters); + Properties hcatProperties = new Properties(); - if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)){ - inputStorageDriverClass = parameters.get(HCatConstants.HCAT_ISD_CLASS); - }else{ - // attempt to default to RCFile if the storage descriptor says it's an RCFile - if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){ - inputStorageDriverClass = HCatConstants.HCAT_RCFILE_ISD_CLASS; - }else{ - throw new IOException("No input storage driver classname found, cannot read partition"); - } - } + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, + storerInfo); + + // copy the properties from storageHandler to jobProperties + MapjobProperties = HCatUtil.getInputJobProperties( + storageHandler, + inputJobInfo); + for (String key : parameters.keySet()){ if (key.startsWith(HCAT_KEY_PREFIX)){ hcatProperties.put(key, parameters.get(key)); } } - return new PartInfo(schema,inputStorageDriverClass, sd.getLocation(), hcatProperties); + // FIXME + // Bloating partinfo with inputJobInfo is not good + return new PartInfo(schema, storageHandler, + sd.getLocation(), hcatProperties, + jobProperties, inputJobInfo.getTableInfo()); } - - - static StorerInfo extractStorerInfo(StorageDescriptor sd, Map properties) throws IOException { - String inputSDClass, outputSDClass; - - if (properties.containsKey(HCatConstants.HCAT_ISD_CLASS)){ - inputSDClass = properties.get(HCatConstants.HCAT_ISD_CLASS); - }else{ - // attempt to default to RCFile if the storage descriptor says it's an RCFile - if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){ - inputSDClass = HCatConstants.HCAT_RCFILE_ISD_CLASS; - }else{ - throw new IOException("No input storage driver classname found for table, cannot write partition"); - } - } - - if (properties.containsKey(HCatConstants.HCAT_OSD_CLASS)){ - outputSDClass = properties.get(HCatConstants.HCAT_OSD_CLASS); - }else{ - // attempt to default to RCFile if the storage descriptor says it's an RCFile - if ((sd.getOutputFormat() != null) && (sd.getOutputFormat().equals(HCatConstants.HIVE_RCFILE_OF_CLASS))){ - outputSDClass = HCatConstants.HCAT_RCFILE_OSD_CLASS; - }else{ - throw new IOException("No output storage driver classname found for table, cannot write partition"); - } - } - - Properties hcatProperties = new Properties(); - for (String key : properties.keySet()){ - if (key.startsWith(HCAT_KEY_PREFIX)){ - hcatProperties.put(key, properties.get(key)); - } - } - - return new StorerInfo(inputSDClass, outputSDClass, hcatProperties); - } - static HiveConf getHiveConf(InputJobInfo iInfo, Configuration conf) throws IOException { Index: src/java/org/apache/hcatalog/mapreduce/PartInfo.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/PartInfo.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/PartInfo.java (working copy) @@ -21,7 +21,11 @@ import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.TableDesc; + import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatStorageHandler; /** The Class used to serialize the partition information read from the metadata server that maps to a partition */ public class PartInfo implements Serializable { @@ -32,8 +36,8 @@ /** The partition schema. */ private final HCatSchema partitionSchema; - /** The information about which input storage driver to use */ - private final String inputStorageDriverClass; + /** The information about which input storage handler to use */ + private final HCatStorageHandler storageHandler; /** HCat-specific properties set at the partition */ private final Properties hcatProperties; @@ -44,18 +48,28 @@ /** The map of partition key names and their values. */ private Map partitionValues; + /** Job properties associated with this parition */ + Map jobProperties; + + /** the table info associated with this partition */ + HCatTableInfo tableInfo; + /** * Instantiates a new hcat partition info. * @param partitionSchema the partition schema - * @param inputStorageDriverClass the input storage driver class name + * @param storageHandler the storage handler * @param location the location * @param hcatProperties hcat-specific properties at the partition */ - public PartInfo(HCatSchema partitionSchema, String inputStorageDriverClass, String location, Properties hcatProperties){ + public PartInfo(HCatSchema partitionSchema, HCatStorageHandler storageHandler, + String location, Properties hcatProperties, + Map jobProperties, HCatTableInfo tableInfo){ this.partitionSchema = partitionSchema; - this.inputStorageDriverClass = inputStorageDriverClass; + this.storageHandler = storageHandler; this.location = location; this.hcatProperties = hcatProperties; + this.jobProperties = jobProperties; + this.tableInfo = tableInfo; } /** @@ -71,8 +85,8 @@ * Gets the value of input storage driver class name. * @return the input storage driver class name */ - public String getInputStorageDriverClass() { - return inputStorageDriverClass; + public HCatStorageHandler getStorageHandler() { + return storageHandler; } @@ -80,7 +94,7 @@ * Gets the value of hcatProperties. * @return the hcatProperties */ - public Properties getInputStorageDriverProperties() { + public Properties getInputStorageHandlerProperties() { return hcatProperties; } @@ -107,4 +121,12 @@ public Map getPartitionValues() { return partitionValues; } + + public Map getJobProperties() { + return jobProperties; + } + + public HCatTableInfo getTableInfo() { + return tableInfo; + } } Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -161,11 +161,14 @@ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); try { - HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, jobContext.getConfiguration()); + HiveConf hiveConf = HCatUtil.getHiveConf(null, + jobContext.getConfiguration()); + HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by - // HCatOutputFormat and not if they were supplied by Oozie. In the latter - // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set + // HCatOutputFormat and not if they were supplied by Oozie. + // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in + // the conf will not be set String tokenStrForm = client.getTokenStrForm(); if(tokenStrForm != null && jobContext.getConfiguration().get (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { @@ -280,9 +283,10 @@ List partitionsAdded = new ArrayList(); try { - client = HCatOutputFormat.createHiveClient(null, conf); + HiveConf hiveConf = HCatUtil.getHiveConf(null, conf); + client = HCatUtil.createHiveClient(hiveConf); - StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters()); + StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters()); updateTableSchema(client, table, jobInfo.getOutputSchema()); Index: src/java/org/apache/hcatalog/mapreduce/HCatSplit.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (revision 1293367) +++ src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (working copy) @@ -22,16 +22,21 @@ import java.io.IOException; import java.lang.reflect.Constructor; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; + import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; /** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */ -public class HCatSplit extends InputSplit implements Writable,org.apache.hadoop.mapred.InputSplit { +public class HCatSplit extends InputSplit + implements Writable,org.apache.hadoop.mapred.InputSplit { Log LOG = LogFactory.getLog(HCatSplit.class); @@ -39,10 +44,13 @@ private PartInfo partitionInfo; /** The split returned by the underlying InputFormat split. */ - private InputSplit baseSplit; + private org.apache.hadoop.mapred.InputSplit baseMapRedSplit; /** The schema for the HCatTable */ private HCatSchema tableSchema; + + private HiveConf hiveConf; + /** * Instantiates a new hcat split. */ @@ -53,13 +61,16 @@ * Instantiates a new hcat split. * * @param partitionInfo the partition info - * @param baseSplit the base split + * @param baseMapRedSplit the base mapred split * @param tableSchema the table level schema */ - public HCatSplit(PartInfo partitionInfo, InputSplit baseSplit, HCatSchema tableSchema) { - this.partitionInfo = partitionInfo; - this.baseSplit = baseSplit; - this.tableSchema = tableSchema; + public HCatSplit(PartInfo partitionInfo, + org.apache.hadoop.mapred.InputSplit baseMapRedSplit, + HCatSchema tableSchema) { + + this.partitionInfo = partitionInfo; + this.baseMapRedSplit = baseMapRedSplit; + this.tableSchema = tableSchema; } /** @@ -72,10 +83,10 @@ /** * Gets the underlying InputSplit. - * @return the baseSplit + * @return the baseMapRedSplit */ - public InputSplit getBaseSplit() { - return baseSplit; + public org.apache.hadoop.mapred.InputSplit getBaseSplit() { + return baseMapRedSplit; } /** @@ -100,13 +111,10 @@ @Override public long getLength() { try { - return baseSplit.getLength(); + return baseMapRedSplit.getLength(); } catch (IOException e) { LOG.warn(e.getMessage()); LOG.warn(e.getStackTrace()); - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); } return 0; // we errored } @@ -117,13 +125,10 @@ @Override public String[] getLocations() { try { - return baseSplit.getLocations(); + return baseMapRedSplit.getLocations(); } catch (IOException e) { LOG.warn(e.getMessage()); LOG.warn(e.getStackTrace()); - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - LOG.warn(e.getStackTrace()); } return new String[0]; // we errored } @@ -138,21 +143,22 @@ partitionInfo = (PartInfo) HCatUtil.deserialize(partitionInfoString); String baseSplitClassName = WritableUtils.readString(input); - InputSplit split; + org.apache.hadoop.mapred.InputSplit split; try{ - Class splitClass = - (Class) Class.forName(baseSplitClassName); + Class splitClass = + (Class) Class.forName(baseSplitClassName); //Class.forName().newInstance() does not work if the underlying //InputSplit has package visibility - Constructor constructor = + Constructor + constructor = splitClass.getDeclaredConstructor(new Class[]{}); constructor.setAccessible(true); split = constructor.newInstance(); // read baseSplit from input ((Writable)split).readFields(input); - this.baseSplit = split; + this.baseMapRedSplit = split; }catch(Exception e){ throw new IOException ("Exception from " + baseSplitClassName, e); } @@ -171,8 +177,8 @@ // write partitionInfo into output WritableUtils.writeString(output, partitionInfoString); - WritableUtils.writeString(output, baseSplit.getClass().getName()); - Writable baseSplitWritable = (Writable)baseSplit; + WritableUtils.writeString(output, baseMapRedSplit.getClass().getName()); + Writable baseSplitWritable = (Writable)baseMapRedSplit; //write baseSplit into output baseSplitWritable.write(output); @@ -180,4 +186,8 @@ String tableSchemaString = HCatUtil.serialize(tableSchema); WritableUtils.writeString(output, tableSchemaString); } + + public HCatSchema getSchema() { + return tableSchema; + } } Index: src/java/org/apache/hcatalog/common/ErrorType.java =================================================================== --- src/java/org/apache/hcatalog/common/ErrorType.java (revision 1293367) +++ src/java/org/apache/hcatalog/common/ErrorType.java (working copy) @@ -26,6 +26,7 @@ ERROR_DB_INIT (1000, "Error initializing database session"), ERROR_EXCEED_MAXPART (1001, "Query result exceeded maximum number of partitions allowed"), + ERROR_SET_INPUT (1002, "Error setting input information"), /* HCat Output Format related errors 2000 - 2999 */ ERROR_INVALID_TABLE (2000, "Table specified does not exist"), Index: src/java/org/apache/hcatalog/common/HCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatUtil.java (revision 1293367) +++ src/java/org/apache/hcatalog/common/HCatUtil.java (working copy) @@ -35,10 +35,12 @@ import java.util.Set; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -70,13 +72,14 @@ import org.apache.hcatalog.mapreduce.FosterStorageHandler; import org.apache.hcatalog.mapreduce.HCatOutputFormat; import org.apache.hcatalog.mapreduce.HCatStorageHandler; +import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.apache.hcatalog.mapreduce.StorerInfo; import org.apache.thrift.TException; public class HCatUtil { -// static final private Log LOG = LogFactory.getLog(HCatUtil.class); + static final private Log LOG = LogFactory.getLog(HCatUtil.class); public static boolean checkJobContextIfRunningFromBackend(JobContext j) { if (j.getConfiguration().get("mapred.task.id", "").equals("")) { @@ -469,10 +472,10 @@ */ public static HCatStorageHandler getStorageHandler(Configuration conf, StorerInfo storerInfo) throws IOException { return getStorageHandler(conf, - storerInfo.getStorageHandlerClass(), - storerInfo.getSerdeClass(), - storerInfo.getIfClass(), - storerInfo.getOfClass()); + storerInfo.getStorageHandlerClass(), + storerInfo.getSerdeClass(), + storerInfo.getIfClass(), + storerInfo.getOfClass()); } /** @@ -488,26 +491,29 @@ * @throws IOException */ public static HCatStorageHandler getStorageHandler(Configuration conf, - String storageHandler, - String serDe, - String inputFormat, - String outputFormat) throws IOException { + String storageHandler, + String serDe, + String inputFormat, + String outputFormat) + throws IOException { - if (storageHandler == null) { try { return new FosterStorageHandler(inputFormat, - outputFormat, - serDe); + outputFormat, + serDe); } catch (ClassNotFoundException e) { - throw new IOException("Failed to load foster storage handler",e); + throw new IOException("Failed to load " + + "foster storage handler",e); } } try { - Class handlerClass = (Class) Class + Class handlerClass = + (Class) Class .forName(storageHandler, true, JavaUtils.getClassLoader()); - return (HCatStorageHandler)ReflectionUtils.newInstance(handlerClass, conf); + return (HCatStorageHandler)ReflectionUtils.newInstance( + handlerClass, conf); } catch (ClassNotFoundException e) { throw new IOException("Error in loading storage handler." + e.getMessage(), e); @@ -526,27 +532,45 @@ } } - public static ObjectInspector getObjectInspector(String serdeClassName, - Configuration conf, Properties tbl) throws Exception { - SerDe s = (SerDe) Class.forName(serdeClassName).newInstance(); - s.initialize(conf, tbl); - return s.getObjectInspector(); - } + public static Map + getInputJobProperties(HCatStorageHandler storageHandler, + InputJobInfo inputJobInfo) { + TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(), + storageHandler.getInputFormatClass(), + storageHandler.getOutputFormatClass(), + inputJobInfo.getTableInfo().getStorerInfo().getProperties()); + if(tableDesc.getJobProperties() == null) { + tableDesc.setJobProperties(new HashMap()); + } - public static ObjectInspector getHCatRecordObjectInspector(HCatSchema hsch) throws Exception{ - HCatRecordSerDe hrsd = new HCatRecordSerDe(); - hrsd.initialize(hsch); - return hrsd.getObjectInspector(); + Map jobProperties = new HashMap(); + try { + tableDesc.getJobProperties().put( + HCatConstants.HCAT_KEY_JOB_INFO, + HCatUtil.serialize(inputJobInfo)); + + storageHandler.configureInputJobProperties(tableDesc, + jobProperties); + + } catch (IOException e) { + throw new IllegalStateException( + "Failed to configure StorageHandler",e); + } + + return jobProperties; } - public static void configureOutputStorageHandler(HCatStorageHandler storageHandler, - JobContext context, - OutputJobInfo outputJobInfo) { - //TODO replace IgnoreKeyTextOutputFormat with a HiveOutputFormatWrapper in StorageHandler + + public static void + configureOutputStorageHandler(HCatStorageHandler storageHandler, + JobContext context, + OutputJobInfo outputJobInfo) { + //TODO replace IgnoreKeyTextOutputFormat with a + //HiveOutputFormatWrapper in StorageHandler TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(), - storageHandler.getInputFormatClass(), - IgnoreKeyTextOutputFormat.class, - outputJobInfo.getTableInfo().getStorerInfo().getProperties()); + storageHandler.getInputFormatClass(), + IgnoreKeyTextOutputFormat.class, + outputJobInfo.getTableInfo().getStorerInfo().getProperties()); if(tableDesc.getJobProperties() == null) tableDesc.setJobProperties(new HashMap()); for (Map.Entry el: context.getConfiguration()) { @@ -555,15 +579,19 @@ Map jobProperties = new HashMap(); try { - tableDesc.getJobProperties().put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); + tableDesc.getJobProperties().put( + HCatConstants.HCAT_KEY_OUTPUT_INFO, + HCatUtil.serialize(outputJobInfo)); - storageHandler.configureOutputJobProperties(tableDesc,jobProperties); + storageHandler.configureOutputJobProperties(tableDesc, + jobProperties); for(Map.Entry el: jobProperties.entrySet()) { context.getConfiguration().set(el.getKey(),el.getValue()); } } catch (IOException e) { - throw new IllegalStateException("Failed to configure StorageHandler",e); + throw new IllegalStateException( + "Failed to configure StorageHandler",e); } } @@ -579,4 +607,96 @@ } } + //TODO remove url component, everything should be encapsulated in HiveConf + public static HiveMetaStoreClient createHiveClient(HiveConf hiveConf) + throws MetaException { + return new HiveMetaStoreClient(hiveConf); + } + + + public static HiveConf getHiveConf(String url, Configuration conf) + throws IOException { + HiveConf hiveConf = new HiveConf(); + + if( url != null ) { + //User specified a thrift url + + hiveConf.set("hive.metastore.local", "false"); + hiveConf.set(ConfVars.METASTOREURIS.varname, url); + + String kerberosPrincipal = conf.get( + HCatConstants.HCAT_METASTORE_PRINCIPAL); + if (kerberosPrincipal == null){ + kerberosPrincipal = conf.get( + ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); + } + if (kerberosPrincipal != null){ + hiveConf.setBoolean( + ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); + hiveConf.set( + ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, + kerberosPrincipal); + } + } else { + //Thrift url is null, copy the hive conf into + //the job conf and restore it + //in the backend context + + if( conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null ) { + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(hiveConf.getAllProperties())); + } else { + //Copy configuration properties into the hive conf + Properties properties = (Properties) HCatUtil.deserialize( + conf.get(HCatConstants.HCAT_KEY_HIVE_CONF)); + + for(Map.Entry prop : properties.entrySet() ) { + if( prop.getValue() instanceof String ) { + hiveConf.set((String) prop.getKey(), (String) prop.getValue()); + } else if( prop.getValue() instanceof Integer ) { + hiveConf.setInt((String) prop.getKey(), + (Integer) prop.getValue()); + } else if( prop.getValue() instanceof Boolean ) { + hiveConf.setBoolean((String) prop.getKey(), + (Boolean) prop.getValue()); + } else if( prop.getValue() instanceof Long ) { + hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue()); + } else if( prop.getValue() instanceof Float ) { + hiveConf.setFloat((String) prop.getKey(), + (Float) prop.getValue()); + } + } + } + + } + + if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + hiveConf.set("hive.metastore.token.signature", + conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); + } + + return hiveConf; + } + + + public static JobConf getJobConfFromContext(JobContext jobContext) + { + JobConf jobConf; + // we need to convert the jobContext into a jobConf + // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) + // begin conversion.. + jobConf = new JobConf(jobContext.getConfiguration()); + // ..end of conversion + + + return jobConf; + } + + public static void copyJobPropertiesToJobConf( + MapjobProperties, JobConf jobConf) + { + for (Map.Entry entry : jobProperties.entrySet()) { + jobConf.set(entry.getKey(), entry.getValue()); + } + } } Index: src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java =================================================================== --- src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java (revision 1293367) +++ src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java (working copy) @@ -68,6 +68,7 @@ fieldObjectInspectors.add(getStandardObjectInspectorFromTypeInfo(fieldTypeInfos.get(i))); } oi = new HCatRecordObjectInspector(fieldNames,fieldObjectInspectors); + break; default: // Hmm.. not good, Index: src/java/org/apache/hcatalog/data/DataType.java =================================================================== --- src/java/org/apache/hcatalog/data/DataType.java (revision 1293367) +++ src/java/org/apache/hcatalog/data/DataType.java (working copy) @@ -170,4 +170,4 @@ return dt1 < dt2 ? -1 : 1; } } -} \ No newline at end of file +} Index: src/java/org/apache/hcatalog/data/LazyHCatRecord.java =================================================================== --- src/java/org/apache/hcatalog/data/LazyHCatRecord.java (revision 1293367) +++ src/java/org/apache/hcatalog/data/LazyHCatRecord.java (working copy) @@ -5,6 +5,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,9 +29,12 @@ private Object o; private StructObjectInspector soi; + private Map partCols; @Override public Object get(int fieldNum) { + Object pc = partCols.get(fieldNum); + if (pc != null) return pc; try { StructField fref = soi.getAllStructFieldRefs().get(fieldNum); return HCatRecordSerDe.serializeField( @@ -59,7 +63,7 @@ @Override public int size() { - return soi.getAllStructFieldRefs().size(); + return soi.getAllStructFieldRefs().size() + partCols.size(); } @Override @@ -99,17 +103,20 @@ throw new UnsupportedOperationException("not allowed to run copy() on LazyHCatRecord"); } - public LazyHCatRecord(Object o, ObjectInspector oi) throws Exception{ + public LazyHCatRecord(Object o, ObjectInspector oi, + Map partCols) + throws Exception { if (oi.getCategory() != Category.STRUCT) { throw new SerDeException(getClass().toString() - + " can only make a lazy hcat record from objects of struct types, but we got: " + + " can only make a lazy hcat record from objects of " + + "struct types, but we got: " + oi.getTypeName()); } this.soi = (StructObjectInspector)oi; this.o = o; - + this.partCols = partCols; } @Override