Index: core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1453760) +++ core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision ) @@ -681,14 +681,14 @@ // no harProcessor, regular operation updateTableSchema(client, table, jobInfo.getOutputSchema()); LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos); - partitionsAdded = partitionsToAdd; - if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){ + if (dynamicPartitioningUsed && (partitionsToAdd.size()>0)){ Path src = new Path(ptnRootLocation); moveTaskOutputs(fs, src, src, tblPath, true); moveTaskOutputs(fs, src, src, tblPath, false); fs.delete(src, true); } client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; } } catch (Exception e) { if (partitionsAdded.size() > 0) { Index: core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java =================================================================== --- core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (revision 1453760) +++ core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (revision ) @@ -19,14 +19,22 @@ package org.apache.hcatalog.mapreduce; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hcatalog.HcatTestUtils; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; @@ -135,15 +143,55 @@ driver.getResults(res); assertEquals(NUM_RECORDS, res.size()); - - //Test for duplicate publish + if (!HcatTestUtils.isHadoop23()) { - IOException exc = null; - try { - generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); - Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false); + IOException exc = null; + try { + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false); - if (HcatTestUtils.isHadoop23()) { - new FileOutputCommitterContainer(job, null).cleanupJob(job); + + } catch (IOException e) { + exc = e; } + + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertTrue("Got exception of type [" + ((HCatException) exc).getErrorType().toString() + + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED", + (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType()) + || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType()) + ); + } + } + + @Test + public void testHCatException() throws Exception { + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, true); + + Job job = Job.getInstance(); + Configuration conf = job.getConfiguration(); + String attempt = "attempt_200707121733_0001_m_000000_0"; + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + TaskAttemptID taskID = TaskAttemptID.forName(attempt); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, null); + HCatOutputFormat.setOutput(job, outputJobInfo); + + // Create directories on FS which has a partitioning structure + createPartitionDirectory(job, HCatOutputFormat.getJobInfo(jContext).getLocation(), "p1=4"); + + // Run commit job and check for exception + OutputCommitter committer = new FileOutputCommitterContainer(jContext, null); + + committer.setupJob(jContext); + committer.setupTask(tContext); + + IOException exc = null; + try { + committer.commitTask(tContext); + committer.commitJob(jContext); + } catch (IOException e) { exc = e; } @@ -155,6 +203,14 @@ (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType()) || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType()) ); + } + + private void createPartitionDirectory(Job job1, String location, String partitionDirectory) + throws IOException { + FileSystem fs = new LocalFileSystem(); + fs.initialize(fs.getWorkingDirectory().toUri(), job1.getConfiguration()); + Path baseDirectory = new Path(location).getParent(); + fs.mkdirs(new Path(baseDirectory.toUri() + "/" + partitionDirectory + "/_temporary/0/task_local_0001_m_000000")); } //TODO 1.0 miniCluster is slow this test times out, make it work