Index: src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (revision 1384153) +++ src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (working copy) @@ -55,6 +55,8 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; @@ -69,7 +71,7 @@ public abstract class HCatMapReduceTest extends TestCase { private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class); - protected String dbName = "default"; + protected String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME; protected String tableName = "testHCatMapReduceTable"; protected String inputFormat = RCFileInputFormat.class.getName(); @@ -79,8 +81,6 @@ private static List writeRecords = new ArrayList(); private static List readRecords = new ArrayList(); - protected abstract void initialize() throws Exception; - protected abstract List getPartitionKeys(); protected abstract List getTableColumns(); @@ -95,13 +95,14 @@ @Override protected void setUp() throws Exception { + super.setUp(); hiveConf = new HiveConf(this.getClass()); //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook //is present only in the ql/test directory - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, ""); + hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); driver = new Driver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); @@ -110,21 +111,31 @@ if (thriftUri != null) { LOG.info("Using URI {}", thriftUri); - hiveConf.set("hive.metastore.local", "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri); + hiveConf.setBoolean("hive.metastore.local", false); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, thriftUri); } fs = new LocalFileSystem(); fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration()); - initialize(); + client = new HiveMetaStoreClient(hiveConf, null); - client = new HiveMetaStoreClient(hiveConf, null); + hiveConf.setInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, 0); + // Hack to initialize cache with 0 expiry time causing it to return a new hive client every time + // Otherwise the cache doesn't play well with the second test method with the client gets closed() in the + // tearDown() of the previous test + HCatUtil.getHiveClient(hiveConf); + initTable(); + MapCreate.writeCount = 0; + MapRead.readCount = 0; + + initTable(); } @Override protected void tearDown() throws Exception { + super.tearDown(); try { String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; @@ -135,6 +146,7 @@ } client.close(); + driver.close(); } @@ -237,7 +249,24 @@ Job runMRCreate(Map partitionValues, List partitionColumns, List records, int writeCount, boolean assertWrite) throws Exception { + return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, true); + } + /** + * Run a local map reduce job to load data from in memory records to an HCatalog Table + * @param partitionValues + * @param partitionColumns + * @param records data to be written to HCatalog table + * @param writeCount + * @param assertWrite + * @param asSingleMapTask + * @return + * @throws Exception + */ + Job runMRCreate(Map partitionValues, + List partitionColumns, List records, + int writeCount, boolean assertWrite, boolean asSingleMapTask) throws Exception { + writeRecords = records; MapCreate.writeCount = 0; @@ -249,11 +278,23 @@ // input/output settings job.setInputFormatClass(TextInputFormat.class); - Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); - createInputFile(path, writeCount); + if (asSingleMapTask) { + // One input path would mean only one map task + Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); + createInputFile(path, writeCount); + TextInputFormat.setInputPaths(job, path); + } else { + // Create two input paths so that two map tasks get triggered. There could be other ways + // to trigger two map tasks. + Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); + createInputFile(path, writeCount / 2); - TextInputFormat.setInputPaths(job, path); + Path path2 = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput2"); + createInputFile(path2, (writeCount - writeCount / 2)); + TextInputFormat.setInputPaths(job, path, path2); + } + job.setOutputFormatClass(HCatOutputFormat.class); OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues); @@ -294,6 +335,13 @@ return runMRRead(readCount, null); } + /** + * Run a local map reduce job to read records from HCatalog table and verify if the count is as expected + * @param readCount + * @param filter + * @return + * @throws Exception + */ List runMRRead(int readCount, String filter) throws Exception { MapRead.readCount = 0; Index: src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (revision 1384153) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (working copy) @@ -40,7 +40,7 @@ private List partitionColumns; @Override - protected void initialize() throws Exception { + protected void setUp() throws Exception { tableName = "testHCatPartitionedTable"; writeRecords = new ArrayList(); @@ -56,6 +56,7 @@ partitionColumns = new ArrayList(); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); + super.setUp(); } Index: src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (revision 1384153) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (working copy) @@ -42,13 +42,15 @@ private List writeRecords; private List dataColumns; private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class); + private static final int NUM_RECORDS = 20; + private static final int NUM_PARTITIONS = 5; @Override - protected void initialize() throws Exception { - + protected void setUp() throws Exception { tableName = "testHCatDynamicPartitionedTable"; - generateWriteRecords(20, 5, 0); + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); generateDataColumns(); + super.setUp(); } private void generateDataColumns() throws HCatException { @@ -86,14 +88,28 @@ return fields; } - + /** + * Run the dynamic partitioning test but with single map task + * @throws Exception + */ public void testHCatDynamicPartitionedTable() throws Exception { + runHCatDynamicPartitionedTable(true); + } - generateWriteRecords(20, 5, 0); - runMRCreate(null, dataColumns, writeRecords, 20, true); + /** + * Run the dynamic partitioning test but with multiple map task. See HCATALOG-490 + * @throws Exception + */ + public void testHCatDynamicPartitionedTableMultipleTask() throws Exception { + runHCatDynamicPartitionedTable(false); + } - runMRRead(20); + public void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Exception { + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask); + runMRRead(NUM_RECORDS); + //Read with partition filter runMRRead(4, "p1 = \"0\""); runMRRead(8, "p1 = \"1\" or p1 = \"3\""); @@ -110,14 +126,14 @@ ArrayList res = new ArrayList(); driver.getResults(res); - assertEquals(20, res.size()); + assertEquals(NUM_RECORDS, res.size()); //Test for duplicate publish IOException exc = null; try { - generateWriteRecords(20, 5, 0); - Job job = runMRCreate(null, dataColumns, writeRecords, 20, false); + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false); if (HcatTestUtils.isHadoop23()) { new FileOutputCommitterContainer(job, null).cleanupJob(job); } Index: src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (revision 1384153) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (working copy) @@ -39,7 +39,7 @@ List partitionColumns; @Override - protected void initialize() throws HCatException { + protected void setUp() throws Exception { dbName = null; //test if null dbName works ("default" is used) tableName = "testHCatNonPartitionedTable"; @@ -57,6 +57,7 @@ partitionColumns = new ArrayList(); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); + super.setUp(); } @Override Index: src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (revision 1384153) +++ src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (working copy) @@ -32,6 +32,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.HCatMapRedUtil; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobContext; @@ -196,8 +197,11 @@ org.apache.hadoop.mapred.OutputFormat baseOF = ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf()); //check outputSpecs - baseOF.checkOutputSpecs(null, currTaskContext.getJobConf()); - //get Output Committer + try { + baseOF.checkOutputSpecs(null,currTaskContext.getJobConf()); + } catch (FileAlreadyExistsException e) { + // HCATALOG-490: The directory may already exist, if created by anothe maptask in the same job. Ignore it. + } //get Output Committer org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter(); //create currJobContext the latest so it gets all the config changes org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext);