Index: src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (revision 1386736) +++ src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (working copy) @@ -25,25 +25,20 @@ import java.util.Map; import junit.framework.Assert; -import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -55,22 +50,29 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertTrue; + /** * Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and reads * it back using HCatInputFormat, checks the column values and counts. */ -public abstract class HCatMapReduceTest extends TestCase { +public abstract class HCatMapReduceTest extends HCatBaseTest { private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class); - protected String dbName = "default"; - protected String tableName = "testHCatMapReduceTable"; + protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME; + protected static String tableName = "testHCatMapReduceTable"; protected String inputFormat = RCFileInputFormat.class.getName(); protected String outputFormat = RCFileOutputFormat.class.getName(); @@ -79,52 +81,30 @@ private static List writeRecords = new ArrayList(); private static List readRecords = new ArrayList(); - protected abstract void initialize() throws Exception; - protected abstract List getPartitionKeys(); protected abstract List getTableColumns(); - private HiveMetaStoreClient client; - protected HiveConf hiveConf; + private static FileSystem fs; - private FileSystem fs; - private String thriftUri = null; - - protected Driver driver; - - @Override - protected void setUp() throws Exception { - hiveConf = new HiveConf(this.getClass()); - - //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook - //is present only in the ql/test directory - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - driver = new Driver(hiveConf); - SessionState.start(new CliSessionState(hiveConf)); - - thriftUri = System.getenv("HCAT_METASTORE_URI"); - - if (thriftUri != null) { - LOG.info("Using URI {}", thriftUri); - - hiveConf.set("hive.metastore.local", "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri); - } - + @BeforeClass + public static void setUpOneTime() throws Exception { fs = new LocalFileSystem(); fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration()); - initialize(); + HiveConf hiveConf = new HiveConf(); + hiveConf.setInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, 0); + // Hack to initialize cache with 0 expiry time causing it to return a new hive client every time + // Otherwise the cache doesn't play well with the second test method with the client gets closed() in the + // tearDown() of the previous test + HCatUtil.getHiveClient(hiveConf); - client = new HiveMetaStoreClient(hiveConf, null); - initTable(); + MapCreate.writeCount = 0; + MapRead.readCount = 0; } - @Override - protected void tearDown() throws Exception { + @After + public void deleteTable() throws Exception { try { String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; @@ -133,13 +113,10 @@ e.printStackTrace(); throw e; } - - client.close(); } - - private void initTable() throws Exception { - + @Before + public void createTable() throws Exception { String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; try { @@ -237,7 +214,24 @@ Job runMRCreate(Map partitionValues, List partitionColumns, List records, int writeCount, boolean assertWrite) throws Exception { + return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, true); + } + /** + * Run a local map reduce job to load data from in memory records to an HCatalog Table + * @param partitionValues + * @param partitionColumns + * @param records data to be written to HCatalog table + * @param writeCount + * @param assertWrite + * @param asSingleMapTask + * @return + * @throws Exception + */ + Job runMRCreate(Map partitionValues, + List partitionColumns, List records, + int writeCount, boolean assertWrite, boolean asSingleMapTask) throws Exception { + writeRecords = records; MapCreate.writeCount = 0; @@ -249,11 +243,23 @@ // input/output settings job.setInputFormatClass(TextInputFormat.class); - Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); - createInputFile(path, writeCount); + if (asSingleMapTask) { + // One input path would mean only one map task + Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); + createInputFile(path, writeCount); + TextInputFormat.setInputPaths(job, path); + } else { + // Create two input paths so that two map tasks get triggered. There could be other ways + // to trigger two map tasks. + Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); + createInputFile(path, writeCount / 2); - TextInputFormat.setInputPaths(job, path); + Path path2 = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput2"); + createInputFile(path2, (writeCount - writeCount / 2)); + TextInputFormat.setInputPaths(job, path, path2); + } + job.setOutputFormatClass(HCatOutputFormat.class); OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues); @@ -294,6 +300,13 @@ return runMRRead(readCount, null); } + /** + * Run a local map reduce job to read records from HCatalog table and verify if the count is as expected + * @param readCount + * @param filter + * @return + * @throws Exception + */ List runMRRead(int readCount, String filter) throws Exception { MapRead.readCount = 0; Index: src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java (revision 1386736) +++ src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java (working copy) @@ -72,10 +72,10 @@ */ protected void setUpHiveConf() { hiveConf = new HiveConf(this.getClass()); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, ""); + hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR); } protected void logAndRegister(PigServer server, String query) throws IOException { Index: src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (revision 1386736) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (working copy) @@ -33,14 +33,19 @@ import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.BeforeClass; +import org.junit.Test; +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestHCatPartitioned extends HCatMapReduceTest { - private List writeRecords; - private List partitionColumns; + private static List writeRecords; + private static List partitionColumns; - @Override - protected void initialize() throws Exception { + @BeforeClass + public static void oneTimeSetUp() throws Exception { tableName = "testHCatPartitionedTable"; writeRecords = new ArrayList(); @@ -77,6 +82,7 @@ } + @Test public void testHCatPartitionedTable() throws Exception { Map partitionMap = new HashMap(); Index: src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (revision 1386736) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (working copy) @@ -34,31 +34,37 @@ import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.BeforeClass; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + public class TestHCatDynamicPartitioned extends HCatMapReduceTest { - private List writeRecords; - private List dataColumns; + private static List writeRecords; + private static List dataColumns; private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class); + private static final int NUM_RECORDS = 20; + private static final int NUM_PARTITIONS = 5; - @Override - protected void initialize() throws Exception { - + @BeforeClass + public static void generateInputData() throws Exception { tableName = "testHCatDynamicPartitionedTable"; - generateWriteRecords(20, 5, 0); + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); generateDataColumns(); } - private void generateDataColumns() throws HCatException { + private static void generateDataColumns() throws HCatException { dataColumns = new ArrayList(); dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""))); } - private void generateWriteRecords(int max, int mod, int offset) { + private static void generateWriteRecords(int max, int mod, int offset) { writeRecords = new ArrayList(); for (int i = 0; i < max; i++) { @@ -86,14 +92,30 @@ return fields; } - + /** + * Run the dynamic partitioning test but with single map task + * @throws Exception + */ + @Test public void testHCatDynamicPartitionedTable() throws Exception { + runHCatDynamicPartitionedTable(true); + } - generateWriteRecords(20, 5, 0); - runMRCreate(null, dataColumns, writeRecords, 20, true); + /** + * Run the dynamic partitioning test but with multiple map task. See HCATALOG-490 + * @throws Exception + */ + @Test + public void testHCatDynamicPartitionedTableMultipleTask() throws Exception { + runHCatDynamicPartitionedTable(false); + } - runMRRead(20); + protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Exception { + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask); + runMRRead(NUM_RECORDS); + //Read with partition filter runMRRead(4, "p1 = \"0\""); runMRRead(8, "p1 = \"1\" or p1 = \"3\""); @@ -110,14 +132,14 @@ ArrayList res = new ArrayList(); driver.getResults(res); - assertEquals(20, res.size()); + assertEquals(NUM_RECORDS, res.size()); //Test for duplicate publish IOException exc = null; try { - generateWriteRecords(20, 5, 0); - Job job = runMRCreate(null, dataColumns, writeRecords, 20, false); + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false); if (HcatTestUtils.isHadoop23()) { new FileOutputCommitterContainer(job, null).cleanupJob(job); } Index: src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (revision 1386736) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (working copy) @@ -32,14 +32,19 @@ import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.BeforeClass; +import org.junit.Test; +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestHCatNonPartitioned extends HCatMapReduceTest { - private List writeRecords; - List partitionColumns; + private static List writeRecords; + static List partitionColumns; - @Override - protected void initialize() throws HCatException { + @BeforeClass + public static void oneTimeSetUp() throws Exception { dbName = null; //test if null dbName works ("default" is used) tableName = "testHCatNonPartitionedTable"; @@ -75,6 +80,7 @@ } + @Test public void testHCatNonPartitionedTable() throws Exception { Map partitionMap = new HashMap(); Index: src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (revision 1386736) +++ src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (working copy) @@ -32,6 +32,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.HCatMapRedUtil; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobContext; @@ -195,8 +196,6 @@ //create base OutputFormat org.apache.hadoop.mapred.OutputFormat baseOF = ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf()); - //check outputSpecs - baseOF.checkOutputSpecs(null, currTaskContext.getJobConf()); //get Output Committer org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter(); //create currJobContext the latest so it gets all the config changes