Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteTextPartitioned.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteTextPartitioned.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteTextPartitioned.java (revision ) @@ -94,8 +94,7 @@ if (principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "WriteTextPartitioned"); - HCatInputFormat.setInput(job, InputJobInfo.create(dbName, - inputTableName, filter)); + HCatInputFormat.setInput(job, dbName, inputTableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/TypeDataCheck.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/TypeDataCheck.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/TypeDataCheck.java (revision ) @@ -149,8 +149,7 @@ } Job job = new Job(conf, "typedatacheck"); // initialize HCatInputFormat - HCatInputFormat.setInput(job, InputJobInfo.create( - dbName, tableName, null)); + HCatInputFormat.setInput(job, dbName, tableName, null); HCatSchema s = HCatInputFormat.getTableSchema(job); job.getConfiguration().set(SCHEMA_KEY, schemaStr); job.getConfiguration().set(DELIM, outputdelim); Index: src/test/org/apache/hcatalog/pig/TestHCatLoader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/org/apache/hcatalog/pig/TestHCatLoader.java (revision 1304165) +++ src/test/org/apache/hcatalog/pig/TestHCatLoader.java (revision ) @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -34,10 +35,12 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.HcatTestUtils; import org.apache.hcatalog.data.Pair; import org.apache.pig.ExecType; import org.apache.pig.PigServer; +import org.apache.pig.ResourceStatistics; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -53,9 +56,10 @@ private static final String BASIC_TABLE = "junit_unparted_basic"; private static final String COMPLEX_TABLE = "junit_unparted_complex"; private static final String PARTITIONED_TABLE = "junit_parted_basic"; - private static Driver driver; + private static final String SPECIFIC_SIZE_TABLE = "junit_specific_size"; - private static int guardTestCount = 5; // ugh, instantiate using introspection in guardedSetupBeforeClass + private static Driver driver; + private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass private static boolean setupHasRun = false; private static Map> basicInputData; @@ -113,8 +117,8 @@ + "phnos array>"); createTable(PARTITIONED_TABLE,"a int, b string","bkt string"); + createTable(SPECIFIC_SIZE_TABLE, "a int, b string"); - int LOOP_SIZE = 3; String[] input = new String[LOOP_SIZE*LOOP_SIZE]; basicInputData = new HashMap>(); @@ -140,7 +144,9 @@ server.setBatchOn(); server.registerQuery("A = load '"+BASIC_FILE_NAME+"' as (a:int, b:chararray);"); - server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();"); + server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();"); + server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();"); + server.registerQuery("B = foreach A generate a,b;"); server.registerQuery("B2 = filter B by a < 2;"); server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');"); @@ -158,6 +164,7 @@ dropTable(BASIC_TABLE); dropTable(COMPLEX_TABLE); dropTable(PARTITIONED_TABLE); + dropTable(SPECIFIC_SIZE_TABLE); } protected void guardedTearDownAfterClass() throws Exception { @@ -375,5 +382,19 @@ numTuplesRead++; } assertEquals(basicInputData.size(),numTuplesRead); + } + + public void testGetInputBytes() throws Exception { + File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000"); + file.deleteOnExit(); + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); + randomAccessFile.setLength(2L * 1024 * 1024 * 1024); + + Job job = new Job(); + HCatLoader hCatLoader = new HCatLoader(); + hCatLoader.setUDFContextSignature(this.getName()); + hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job); + ResourceStatistics statistics = hCatLoader.getStatistics(file.getAbsolutePath(), job); + assertEquals(2048, (long) statistics.getmBytes()); } } Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadJson.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadJson.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadJson.java (revision ) @@ -93,8 +93,7 @@ if(principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "ReadJson"); - HCatInputFormat.setInput(job, InputJobInfo.create( - dbName, tableName, null)); + HCatInputFormat.setInput(job, dbName, tableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (revision 1304165) +++ src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (revision ) @@ -25,25 +25,19 @@ import java.util.Map; import junit.framework.Assert; -import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -53,27 +47,25 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.BaseHCatTestCase; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.rcfile.RCFileInputDriver; -import org.apache.hcatalog.rcfile.RCFileOutputDriver; +import org.junit.After; +import org.junit.Before; /** * Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and reads * it back using HCatInputFormat, checks the column values and counts. */ -public abstract class HCatMapReduceTest extends TestCase { +public abstract class HCatMapReduceTest extends BaseHCatTestCase { protected String dbName = "default"; protected String tableName = "testHCatMapReduceTable"; protected String inputFormat = RCFileInputFormat.class.getName(); protected String outputFormat = RCFileOutputFormat.class.getName(); - protected String inputSD = RCFileInputDriver.class.getName(); - protected String outputSD = RCFileOutputDriver.class.getName(); protected String serdeClass = ColumnarSerDe.class.getName(); private static List writeRecords = new ArrayList(); @@ -85,46 +77,19 @@ protected abstract List getTableColumns(); - private HiveMetaStoreClient client; - protected HiveConf hiveConf; - private FileSystem fs; - private String thriftUri = null; - protected Driver driver; - - @Override - protected void setUp() throws Exception { - hiveConf = new HiveConf(this.getClass()); - - //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook - //is present only in the ql/test directory - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - driver = new Driver(hiveConf); - SessionState.start(new CliSessionState(hiveConf)); - - thriftUri = System.getenv("HCAT_METASTORE_URI"); - - if( thriftUri != null ) { - System.out.println("Using URI " + thriftUri); - - hiveConf.set("hive.metastore.local", "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri); - } - + @Before + public void setup() throws Exception { fs = new LocalFileSystem(); fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration()); initialize(); - - client = new HiveMetaStoreClient(hiveConf, null); initTable(); } - @Override - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { try { String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; @@ -293,8 +258,7 @@ job.setInputFormatClass(HCatInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,filter); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job, dbName, tableName, filter); job.setMapOutputKeyClass(BytesWritable.class); job.setMapOutputValueClass(Text.class); @@ -325,8 +289,7 @@ job.setInputFormatClass(HCatInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job, dbName, tableName, null); return HCatInputFormat.getTableSchema(job); } Index: src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (revision 1304165) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (revision ) @@ -33,6 +33,8 @@ import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.Assert; +import org.junit.Test; public class TestHCatPartitioned extends HCatMapReduceTest { @@ -74,7 +76,7 @@ return fields; } - + @Test public void testHCatPartitionedTable() throws Exception { Map partitionMap = new HashMap(); @@ -95,9 +97,9 @@ exc = e; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType()); + Assert.assertTrue(exc != null); + Assert.assertTrue(exc instanceof HCatException); + Assert.assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType()); //Test for publish with invalid partition key name exc = null; @@ -110,9 +112,9 @@ exc = e; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType()); + Assert.assertTrue(exc != null); + Assert.assertTrue(exc instanceof HCatException); + Assert.assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType()); //Test for null partition value map @@ -123,7 +125,7 @@ exc = e; } - assertTrue(exc == null); + Assert.assertTrue(exc == null); // assertTrue(exc instanceof HCatException); // assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType()); // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values @@ -147,7 +149,7 @@ HCatSchema tableSchema = getTableSchema(); - assertEquals(3, tableSchema.getFields().size()); + Assert.assertEquals(3, tableSchema.getFields().size()); //Update partition schema to have 3 fields partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, ""))); @@ -172,11 +174,11 @@ tableSchema = getTableSchema(); //assert that c3 has got added to table schema - assertEquals(4, tableSchema.getFields().size()); - assertEquals("c1", tableSchema.getFields().get(0).getName()); - assertEquals("c2", tableSchema.getFields().get(1).getName()); - assertEquals("c3", tableSchema.getFields().get(2).getName()); - assertEquals("part1", tableSchema.getFields().get(3).getName()); + Assert.assertEquals(4, tableSchema.getFields().size()); + Assert.assertEquals("c1", tableSchema.getFields().get(0).getName()); + Assert.assertEquals("c2", tableSchema.getFields().get(1).getName()); + Assert.assertEquals("c3", tableSchema.getFields().get(2).getName()); + Assert.assertEquals("part1", tableSchema.getFields().get(3).getName()); //Test that changing column data type fails partitionMap.clear(); @@ -193,9 +195,9 @@ exc = e; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType()); + Assert.assertTrue(exc != null); + Assert.assertTrue(exc instanceof HCatException); + Assert.assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType()); //Test that partition key is not allowed in data partitionColumns = new ArrayList(); @@ -224,14 +226,14 @@ } List records= runMRRead(20,"part1 = \"p1value6\""); - assertEquals(20, records.size()); + Assert.assertEquals(20, records.size()); Integer i =0; for(HCatRecord rec : records){ - assertEquals(4, rec.size()); - assertTrue(rec.get(0).equals(i)); - assertTrue(rec.get(1).equals("c2value"+i)); - assertTrue(rec.get(2).equals("c3value"+i)); - assertTrue(rec.get(3).equals("p1value6")); + Assert.assertEquals(4, rec.size()); + Assert.assertTrue(rec.get(0).equals(i)); + Assert.assertTrue(rec.get(1).equals("c2value" + i)); + Assert.assertTrue(rec.get(2).equals("c3value" + i)); + Assert.assertTrue(rec.get(3).equals("p1value6")); i++; } } @@ -241,7 +243,7 @@ HCatSchema tableSchema = getTableSchema(); - assertEquals(4, tableSchema.getFields().size()); + Assert.assertEquals(4, tableSchema.getFields().size()); partitionColumns = new ArrayList(); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); @@ -272,9 +274,9 @@ exc = e; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType()); + Assert.assertTrue(exc != null); + Assert.assertTrue(exc instanceof HCatException); + Assert.assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType()); partitionColumns = new ArrayList(); @@ -310,6 +312,6 @@ ArrayList res = new ArrayList(); driver.getResults(res); - assertEquals(70, res.size()); + Assert.assertEquals(70, res.size()); } } Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreNumbers.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreNumbers.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreNumbers.java (revision ) @@ -177,8 +177,7 @@ Job job = new Job(conf, "storenumbers"); // initialize HCatInputFormat - HCatInputFormat.setInput(job, InputJobInfo.create( - dbName, tableName, null)); + HCatInputFormat.setInput(job, dbName, tableName, null); // initialize HCatOutputFormat HCatOutputFormat.setOutput(job, OutputJobInfo.create( dbName, outputTableName, outputPartitionKvps)); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteRC.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteRC.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteRC.java (revision ) @@ -94,8 +94,7 @@ if (principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "WriteRC"); - HCatInputFormat.setInput(job, InputJobInfo.create(dbName, - inputTableName, null)); + HCatInputFormat.setInput(job, dbName, inputTableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadWrite.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadWrite.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadWrite.java (revision ) @@ -85,8 +85,7 @@ if (principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "ReadWrite"); - HCatInputFormat.setInput(job, InputJobInfo.create(dbName, - inputTableName, null)); + HCatInputFormat.setInput(job, dbName, inputTableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/java/org/apache/hcatalog/pig/HCatLoader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/pig/HCatLoader.java (revision 1304165) +++ src/java/org/apache/hcatalog/pig/HCatLoader.java (revision ) @@ -31,12 +31,14 @@ import org.apache.hcatalog.data.Pair; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.InitializeInput; import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.pig.Expression; import org.apache.pig.Expression.BinaryExpression; import org.apache.pig.LoadFunc; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; import org.apache.pig.impl.util.UDFContext; /** @@ -87,10 +89,7 @@ // in the hadoop front end mapred.task.id property will not be set in // the Configuration if (!HCatUtil.checkJobContextIfRunningFromBackend(job)){ - HCatInputFormat.setInput(job, - InputJobInfo.create(dbName, - tableName, - getPartitionFilterString())); + HCatInputFormat.setInput(job, dbName, tableName, getPartitionFilterString()); } // Need to also push projections by calling setOutputSchema on @@ -175,6 +174,22 @@ // store this in the udf context so we can get it later storeInUDFContext(signature, PARTITION_FILTER, partitionFilterString); + } + + /** + * Get statistics about the data to be loaded. Only input data size is implemented at this time. + */ + @Override + public ResourceStatistics getStatistics(String location, Job job) throws IOException { + try { + ResourceStatistics stats = new ResourceStatistics(); + InputJobInfo inputJobInfo = InitializeInput.getInputJobInfo( + job.getConfiguration(), dbName, tableName, partitionFilterString); + stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024); + return stats; + } catch (Exception e) { + throw new IOException(e); + } } private String getPartitionFilterString() { Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteText.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteText.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteText.java (revision ) @@ -103,8 +103,7 @@ if (principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "WriteText"); - HCatInputFormat.setInput(job, InputJobInfo.create(dbName, - inputTableName, null)); + HCatInputFormat.setInput(job, dbName, inputTableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/java/org/apache/hcatalog/pig/HCatBaseLoader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/pig/HCatBaseLoader.java (revision 1304165) +++ src/java/org/apache/hcatalog/pig/HCatBaseLoader.java (revision ) @@ -22,10 +22,15 @@ import java.util.List; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.PartInfo; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; import org.apache.pig.LoadPushDown; @@ -114,4 +119,30 @@ props.put(key, value); } + /** + * Gets the size of inputs. This is accomplished by summing the size of all + * input paths on supported FileSystems. Locations whose size cannot be determined + * are ignored. + */ + protected static long getSizeInBytes(InputJobInfo inputJobInfo) throws IOException { + Configuration conf = new Configuration(); + long sizeInBytes = 0; + + for (PartInfo partInfo : inputJobInfo.getPartitions()) { + try { + Path p = new Path(partInfo.getLocation()); + if (p.getFileSystem(conf).isFile(p)) { + sizeInBytes += p.getFileSystem(conf).getFileStatus(p).getLen(); + } else { + for (FileStatus child : p.getFileSystem(conf).listStatus(p)) { + sizeInBytes += child.getLen(); + } + } + } catch (IOException e) { + // Report size to the extent possible. + } + } + + return sizeInBytes; + } } Index: src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java (revision 1304165) +++ src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java (revision ) @@ -30,18 +30,14 @@ * the specified partition predicates, gets the matching partitions, puts * the information in the conf object. The inputInfo object is updated with * information needed in the client context - * @param job the job object - * @param inputJobInfo the input info for table to read * @throws IOException the exception in communicating with the metadata server */ - public static void setInput(Job job, - InputJobInfo inputJobInfo) throws IOException { + public static void setInput(Job job, String dbName, String tableName, String filter) + throws IOException { try { - InitializeInput.setInput(job, inputJobInfo); + InitializeInput.setInput(job, dbName, tableName, filter); } catch (Exception e) { throw new IOException(e); } } - - } Index: src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (revision 1304165) +++ src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (revision ) @@ -38,7 +38,6 @@ import org.apache.hcatalog.data.transfer.ReaderContext; import org.apache.hcatalog.data.transfer.state.StateProvider; import org.apache.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hcatalog.mapreduce.InputJobInfo; /** This reader reads via {@link HCatInputFormat} * @@ -58,11 +57,9 @@ @Override public ReaderContext prepareRead() throws HCatException { - try { - Job job = new Job(conf); + Job job = new Job(conf); - InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(), re.getTableName(), re.getFilterString()); - HCatInputFormat.setInput(job, jobInfo); + HCatInputFormat.setInput(job, re.getDbName(), re.getTableName(), re.getFilterString()); HCatInputFormat hcif = new HCatInputFormat(); ReaderContext cntxt = new ReaderContext(); cntxt.setInputSplits(hcif.getSplits(new JobContext(job.getConfiguration(), null))); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/SumNumbers.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/SumNumbers.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/SumNumbers.java (revision ) @@ -159,8 +159,7 @@ if(principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "sumnumbers"); - HCatInputFormat.setInput(job, InputJobInfo.create( - dbName, tableName, null)); + HCatInputFormat.setInput(job, dbName, tableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/test/org/apache/hcatalog/data/TestReaderWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/org/apache/hcatalog/data/TestReaderWriter.java (revision 1304165) +++ src/test/org/apache/hcatalog/data/TestReaderWriter.java (revision ) @@ -32,13 +32,10 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hcatalog.BaseHCatTestCase; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.data.transfer.DataTransferFactory; import org.apache.hcatalog.data.transfer.HCatReader; @@ -50,17 +47,16 @@ import org.junit.Assert; import org.junit.Test; -public class TestReaderWriter { +public class TestReaderWriter extends BaseHCatTestCase { + private static final String DB_NAME = "default"; + private static final String TABLE_NAME = "junit_testreaderwriter"; + private static final String FILTER = ""; @Test public void test() throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException { - - HiveConf conf = new HiveConf(getClass()); - Driver driver = new Driver(conf); - SessionState.start(new CliSessionState(conf)); - driver.run("drop table mytbl"); - driver.run("create table mytbl (a string, b int)"); - Iterator> itr = conf.iterator(); + driver.run("drop table " + TABLE_NAME); + driver.run("create table " + TABLE_NAME + " (a string, b int)"); + Iterator> itr = hiveConf.iterator(); Map map = new HashMap(); while(itr.hasNext()){ Entry kv = itr.next(); @@ -108,7 +104,7 @@ private WriterContext runsInMaster(Map config) throws HCatException { WriteEntity.Builder builder = new WriteEntity.Builder(); - WriteEntity entity = builder.withTable("mytbl").build(); + WriteEntity entity = builder.withTable(TABLE_NAME).build(); HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); WriterContext info = writer.prepareWrite(); return info; @@ -117,7 +113,8 @@ private ReaderContext runsInMaster(Map config, boolean bogus) throws HCatException { ReadEntity.Builder builder = new ReadEntity.Builder(); - ReadEntity entity = builder.withTable("mytbl").build(); + ReadEntity entity = + builder.withDatabase(DB_NAME).withTable(TABLE_NAME).withFilter(FILTER).build(); HCatReader reader = DataTransferFactory.getHCatReader(entity, config); ReaderContext cntxt = reader.prepareRead(); return cntxt; @@ -148,7 +145,7 @@ private void commit(Map config, boolean status, WriterContext context) throws IOException { WriteEntity.Builder builder = new WriteEntity.Builder(); - WriteEntity entity = builder.withTable("mytbl").build(); + WriteEntity entity = builder.withDatabase(DB_NAME).withTable(TABLE_NAME).build(); HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); if(status){ writer.commit(context); @@ -169,7 +166,7 @@ int i = 0; @Override public boolean hasNext() { - return i++ < 100 ? true : false; + return i++ < 100; } @Override Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/SimpleRead.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/SimpleRead.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/SimpleRead.java (revision ) @@ -87,8 +87,7 @@ if(principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "SimpleRead"); - HCatInputFormat.setInput(job, InputJobInfo.create( - dbName, tableName, null)); + HCatInputFormat.setInput(job, dbName, tableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/GroupByAge.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/GroupByAge.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/GroupByAge.java (revision ) @@ -105,8 +105,7 @@ if (principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "GroupByAge"); - HCatInputFormat.setInput(job, InputJobInfo.create(dbName, - inputTableName, null)); + HCatInputFormat.setInput(job, dbName, inputTableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreDemo.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreDemo.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreDemo.java (revision ) @@ -113,8 +113,7 @@ conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "storedemo"); // initialize HCatInputFormat - HCatInputFormat.setInput(job, InputJobInfo.create( - dbName, tableName, null)); + HCatInputFormat.setInput(job, dbName, tableName, null); // initialize HCatOutputFormat HCatOutputFormat.setOutput(job, OutputJobInfo.create( dbName, outputTableName, outputPartitionKvps)); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteJson.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteJson.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteJson.java (revision ) @@ -92,8 +92,7 @@ if (principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "WriteJson"); - HCatInputFormat.setInput(job, InputJobInfo.create(dbName, - inputTableName, null)); + HCatInputFormat.setInput(job, dbName, inputTableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (revision 1304165) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (revision ) @@ -32,6 +32,8 @@ import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.Assert; +import org.junit.Test; public class TestHCatNonPartitioned extends HCatMapReduceTest { @@ -74,7 +76,7 @@ return fields; } - + @Test public void testHCatNonPartitionedTable() throws Exception { Map partitionMap = new HashMap(); @@ -88,9 +90,9 @@ exc = e; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType()); + Assert.assertTrue(exc != null); + Assert.assertTrue(exc instanceof HCatException); + Assert.assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType()); //Test for publish with invalid partition key name exc = null; @@ -103,9 +105,9 @@ exc = e; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); + Assert.assertTrue(exc != null); + Assert.assertTrue(exc instanceof HCatException); + Assert.assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); //Read should get 10 rows runMRRead(10); @@ -117,14 +119,10 @@ private void hiveReadTest() throws Exception { String query = "select * from " + tableName; - int retCode = driver.run(query).getResponseCode(); + Assert.assertEquals(0, driver.run(query).getResponseCode()); - if( retCode != 0 ) { - throw new Exception("Error " + retCode + " running query " + query); - } - ArrayList res = new ArrayList(); driver.getResults(res); - assertEquals(10, res.size()); + Assert.assertEquals(10, res.size()); } } Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadRC.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadRC.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadRC.java (revision ) @@ -93,10 +93,9 @@ if(principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "ReadRC"); - HCatInputFormat.setInput(job, InputJobInfo.create( - dbName, tableName, null)); + HCatInputFormat.setInput(job, dbName, tableName, null); // initialize HCatOutputFormat - + job.setInputFormatClass(HCatInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setJarByClass(ReadRC.class); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreComplex.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreComplex.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreComplex.java (revision ) @@ -102,8 +102,7 @@ Job job = new Job(conf, "storecomplex"); // initialize HCatInputFormat - HCatInputFormat.setInput(job, InputJobInfo.create( - dbName, tableName, null)); + HCatInputFormat.setInput(job, dbName, tableName, null); // initialize HCatOutputFormat HCatOutputFormat.setOutput(job, OutputJobInfo.create( dbName, outputTableName, outputPartitionKvps)); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadText.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadText.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadText.java (revision ) @@ -103,8 +103,7 @@ if(principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "ReadText"); - HCatInputFormat.setInput(job, InputJobInfo.create( - dbName, tableName, null)); + HCatInputFormat.setInput(job, dbName, tableName, null); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); Index: src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/HBaseReadWrite.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/HBaseReadWrite.java (revision 1304165) +++ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/HBaseReadWrite.java (revision ) @@ -166,8 +166,7 @@ if (!succ) return 1; job = new Job(conf, "HBaseRead"); - HCatInputFormat.setInput(job, InputJobInfo.create(dbName, tableName, - null)); + HCatInputFormat.setInput(job, dbName, tableName, null); job.setInputFormatClass(HCatInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1304165) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision ) @@ -24,33 +24,14 @@ import java.util.Map; import java.util.Properties; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; -import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.serde2.Deserializer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; - import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -64,50 +45,59 @@ * info required in the client process context. */ public class InitializeInput { + /** The prefix for keys used for storage driver arguments */ + public static final String HCAT_KEY_PREFIX = "hcat."; - + - private static final Log LOG = LogFactory.getLog(InitializeInput.class); + // Key is db+table+filter + private static Map inputJobInfoMap = new HashMap(); - + - /** The prefix for keys used for storage driver arguments */ - static final String HCAT_KEY_PREFIX = "hcat."; private static HiveConf hiveConf; - private static HiveMetaStoreClient createHiveMetaClient(Configuration conf) throws Exception { + /** + * As a utility class, all functionality is exposed through static methods. + */ + private InitializeInput() {} + private static HiveMetaStoreClient createHiveMetaClient(Configuration conf) throws Exception { - hiveConf = HCatUtil.getHiveConf(conf); - return new HiveMetaStoreClient(hiveConf, null); + hiveConf = HCatUtil.getHiveConf(conf); + return new HiveMetaStoreClient(hiveConf, null); } /** * Set the input to use for the Job. This queries the metadata server with the specified partition predicates, * gets the matching partitions, puts the information in the configuration object. - * @param job the job object - * @param inputJobInfo information on the Input to read - * @throws Exception */ - public static void setInput(Job job, InputJobInfo inputJobInfo) throws Exception { - - //* Create and initialize an InputJobInfo object - //* Serialize the InputJobInfo and save in the Job's Configuration object - + public static void setInput(Job job, String db, String table, String filter) throws Exception { job.getConfiguration().set( - HCatConstants.HCAT_KEY_JOB_INFO, + HCatConstants.HCAT_KEY_JOB_INFO, - getSerializedHcatKeyJobInfo(job, inputJobInfo,null)); + HCatUtil.serialize(getInputJobInfo(job.getConfiguration(), db, table, filter))); } - public static String getSerializedHcatKeyJobInfo(Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception { - //* Create and initialize an InputJobInfo object + /** + * Query the HiveMetaStore for information about the table, and partitions matching the + * locationFilter. + * @param dbName database name; "default" is used if null + * @param filter partition filter; "" is used if null + * @return populated and serialized {@link InputJobInfo} + */ + public static InputJobInfo getInputJobInfo(Configuration conf, String dbName, String tableName, String filter) throws Exception { + String db = dbName == null ? "default" : dbName; + String f = filter == null ? "" : filter; - HiveMetaStoreClient client = null; + if (inputJobInfoMap.containsKey(db+tableName+f)) { + return inputJobInfoMap.get(db+tableName+f); + } + InputJobInfo inputJobInfo = InputJobInfo.create(db, tableName, f); + HiveMetaStoreClient client = null; try { - if (job != null){ - client = createHiveMetaClient(job.getConfiguration()); + if (conf != null){ + client = createHiveMetaClient(conf); } else { hiveConf = new HiveConf(HCatInputFormat.class); client = new HiveMetaStoreClient(hiveConf, null); } - Table table = client.getTable(inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName()); + Table table = client.getTable(db, tableName); List partInfoList = new ArrayList(); @@ -128,8 +118,7 @@ // populate partition info for (Partition ptn : parts){ PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(), - job.getConfiguration(), - inputJobInfo); + conf, inputJobInfo); partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); } @@ -137,22 +126,20 @@ }else{ //Non partitioned table PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters(), - job.getConfiguration(), - inputJobInfo); + conf, inputJobInfo); partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); } inputJobInfo.setPartitions(partInfoList); - - return HCatUtil.serialize(inputJobInfo); + inputJobInfoMap.put(dbName+tableName+filter, inputJobInfo); + return inputJobInfo; } finally { if (client != null ) { client.close(); } } - } - + private static Map createPtnKeyValueMap(Table table, Partition ptn) throws IOException{ List values = ptn.getValues(); if( values.size() != table.getPartitionKeys().size() ) { @@ -174,7 +161,7 @@ return ptnKeyValues; } - static PartInfo extractPartInfo(StorageDescriptor sd, + private static PartInfo extractPartInfo(StorageDescriptor sd, Map parameters, Configuration conf, InputJobInfo inputJobInfo) throws IOException{ HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); Index: src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (revision 1304165) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (revision ) @@ -20,16 +20,10 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.serde.Constants; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; @@ -38,6 +32,8 @@ import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.Assert; +import org.junit.Test; public class TestHCatDynamicPartitioned extends HCatMapReduceTest { @@ -87,7 +83,7 @@ return fields; } - + @Test public void testHCatDynamicPartitionedTable() throws Exception { generateWriteRecords(20,5,0); @@ -111,7 +107,7 @@ ArrayList res = new ArrayList(); driver.getResults(res); - assertEquals(20, res.size()); + Assert.assertEquals(20, res.size()); //Test for duplicate publish @@ -123,12 +119,12 @@ exc = e; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertTrue( "Got exception of type ["+((HCatException) exc).getErrorType().toString() + Assert.assertTrue(exc != null); + Assert.assertTrue(exc instanceof HCatException); + Assert.assertTrue("Got exception of type [" + ((HCatException) exc).getErrorType().toString() + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED", - (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType()) - || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType()) + (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType()) + || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType()) ); } @@ -149,11 +145,11 @@ } if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType()); + Assert.assertTrue(exc != null); + Assert.assertTrue(exc instanceof HCatException); + Assert.assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType()); }else{ - assertTrue(exc == null); + Assert.assertTrue(exc == null); runMRRead(maxParts+5); } } Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java (revision 1304165) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java (revision ) @@ -71,7 +71,7 @@ @Test public void TestSnapshotConversion() throws Exception{ Initialize(); - String tableName = newTableName("mytableOne"); + String tableName = newTableName("mytableone"); String databaseName = newTableName("mydatabase"); String fullyQualTableName = databaseName + "." + tableName; String db_dir = getTestDir() + "/hbasedb"; @@ -87,15 +87,14 @@ cmdResponse = hcatDriver.run(tableQuery); assertEquals(0, cmdResponse.getResponseCode()); - InputJobInfo inputInfo = InputJobInfo.create(databaseName, tableName, null); Configuration conf = new Configuration(hcatConf); conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); Job job = new Job(conf); - inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); - InitializeInput.setInput(job, inputInfo); + InitializeInput.setInput(job, databaseName, tableName, null); String modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); - inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); + InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); + inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); Map revMap = new HashMap(); revMap.put("cf1", 3L); @@ -110,7 +109,7 @@ cmdResponse = hcatDriver.run(dropTable); assertEquals(0, cmdResponse.getResponseCode()); - tableName = newTableName("mytableTwo"); + tableName = newTableName("mytabletwo"); fullyQualTableName = databaseName + "." + tableName; tableQuery = "CREATE TABLE " + fullyQualTableName + "(key string, value1 string, value2 string) STORED BY " + @@ -121,11 +120,10 @@ revMap.clear(); revMap.put("cf1", 3L); hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap, -1); - inputInfo = InputJobInfo.create(databaseName, tableName, null); - inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); - InitializeInput.setInput(job, inputInfo); + InitializeInput.setInput(job, databaseName, tableName, null); modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); + inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo()); assertEquals(hcatSnapshot.getRevision("value1"), 3); assertEquals(hcatSnapshot.getRevision("value2"), 3); Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (revision 1304165) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (revision ) @@ -92,7 +92,7 @@ manager.open(); ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base"); - String tableName = newTableName("testTable"); + String tableName = newTableName("testtable"); List columnFamilies = Arrays.asList("cf1", "cf2", "cf3"); Transaction txn = manager.beginWriteTransaction(tableName, columnFamilies); @@ -140,7 +140,7 @@ manager.open(); ZKUtil zkutil = new ZKUtil(host + ':' + port, "/rm_base"); - String tableName = newTableName("testTable"); + String tableName = newTableName("testtable"); List columnFamilies = Arrays.asList("cf1", "cf2", "cf3"); Transaction txn = manager.beginWriteTransaction(tableName, columnFamilies); List cfs = zkutil.getColumnFamiliesOfTable(tableName); @@ -202,7 +202,7 @@ ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); manager.initialize(props); manager.open(); - String tableName = newTableName("testTable"); + String tableName = newTableName("testtable"); List columnFamilies = Arrays.asList("cf1", "cf2"); Transaction txn = manager.beginWriteTransaction(tableName, columnFamilies, 40); @@ -227,7 +227,7 @@ ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); manager.initialize(props); manager.open(); - String tableName = newTableName("testTable"); + String tableName = newTableName("testtable"); List cfOne = Arrays.asList("cf1", "cf2"); List cfTwo = Arrays.asList("cf2", "cf3"); Transaction tsx1 = manager.beginWriteTransaction(tableName, cfOne); Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (revision 1304165) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (revision ) @@ -590,9 +590,7 @@ job.setJarByClass(this.getClass()); job.setMapperClass(MapReadAbortedTransaction.class); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, - tableName, null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job, databaseName, tableName, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (revision 1304165) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (revision ) @@ -354,9 +354,7 @@ job.setJarByClass(this.getClass()); job.setMapperClass(MapReadAbortedTransaction.class); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, - tableName, null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job, databaseName, tableName, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java (revision 1304165) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java (revision ) @@ -211,9 +211,7 @@ MapReadHTable.resetCounters(); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, tableName, - null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job, databaseName, tableName, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); @@ -274,10 +272,8 @@ job.setJarByClass(this.getClass()); job.setMapperClass(MapReadProjHTable.class); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create( - MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); HCatInputFormat.setOutputSchema(job, getProjectionSchema()); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job, MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); @@ -334,12 +330,10 @@ job.setMapperClass(MapReadProjectionHTable.class); job.setInputFormat(HBaseInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create( - MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); //Configure projection schema job.set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(getProjectionSchema())); Job newJob = new Job(job); - HCatInputFormat.setInput(newJob, inputJobInfo); + HCatInputFormat.setInput(newJob, MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); String inputJobString = newJob.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); InputJobInfo info = (InputJobInfo) HCatUtil.deserialize(inputJobString); job.set(HCatConstants.HCAT_KEY_JOB_INFO, inputJobString); @@ -406,9 +400,7 @@ job.setMapperClass(MapReadHTable.class); MapReadHTable.resetCounters(); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create( - MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job, MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); @@ -469,9 +461,7 @@ job.setJarByClass(this.getClass()); job.setMapperClass(MapReadHTableRunningAbort.class); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create( - MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job, MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class);