Index: src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (revision 1363556) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (working copy) @@ -341,4 +341,25 @@ driver.getResults(res); assertEquals(70, res.size()); } + + public void testPartitionPublish() throws Exception { + + Map partitionMap = new HashMap(); + partitionMap.put("part1", "p1value1"); + partitionMap.put("part0", "p0value1"); + + runMRCreateFail(partitionMap, partitionColumns, writeRecords); + + String query = "show partitions " + tableName; + int retCode = driver.run(query).getResponseCode(); + + if( retCode != 0 ) { + throw new Exception("Error " + retCode + " running query " + query); + } + + ArrayList res = new ArrayList(); + driver.getResults(res); + assertEquals(0, res.size()); + + } } Index: src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (revision 1363556) +++ src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (working copy) @@ -51,6 +51,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -215,6 +216,18 @@ } } + public static class MapFail extends + Mapper { + + @Override + public void map(LongWritable key, Text value, Context context + ) throws IOException, InterruptedException { + { + throw new IOException("Exception to mimic job failure."); + } + } + } + public static class MapRead extends Mapper { @@ -325,7 +338,45 @@ return readRecords; } + void runMRCreateFail(Map partitionValues, + List partitionColumns, List records) throws Exception { + Configuration conf = new Configuration(); + Job job = new Job(conf, "hcat mapreduce write fail test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(HCatMapReduceTest.MapFail.class); + + // input/output settings + job.setInputFormatClass(TextInputFormat.class); + + Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); + //The write count does not matter, as the map will fail in its first call. + createInputFile(path, 5); + + TextInputFormat.setInputPaths(job, path); + + job.setOutputFormatClass(HCatOutputFormat.class); + + OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues); + HCatOutputFormat.setOutput(job, outputJobInfo); + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(DefaultHCatRecord.class); + + job.setNumReduceTasks(0); + + HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns)); + + boolean success = job.waitForCompletion(true); + Assert.assertTrue(success == false); + + FileOutputCommitterContainer committer = new FileOutputCommitterContainer(job,null); + committer.abortJob(job, JobStatus.State.FAILED); + committer.cleanupJob(new JobContext(job.getConfiguration(), job.getJobID())); + + } + + protected HCatSchema getTableSchema() throws Exception { Configuration conf = new Configuration(); Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1363556) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -284,7 +284,25 @@ FsPermission perms = tblStat.getPermission(); List partitionsToAdd = new ArrayList(); - if (!dynamicPartitioningUsed){ + if (dynamicPartitioningUsed) { + for (Entry> entry : partitionsDiscoveredByPath + .entrySet()) { + partitionsToAdd.add(constructPartition( + context, + getPartitionRootLocation(entry.getKey(), entry + .getValue().size()), entry.getValue(), + jobInfo.getOutputSchema(), + getStorerParameterMap(storer), table, fs, grpName, + perms)); + } + } else { + Path partPath = new Path(tblPath.toString()); + for (FieldSchema partKey : table.getPartitionKeys()) { + partPath = constructPartialPartPath(partPath, partKey + .getName().toLowerCase(), + jobInfo.getPartitionValues()); + } + if(fs.exists(partPath)){ partitionsToAdd.add( constructPartition( context, @@ -292,65 +310,66 @@ ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) ,table, fs ,grpName,perms)); - }else{ - for (Entry> entry : partitionsDiscoveredByPath.entrySet()){ - partitionsToAdd.add( - constructPartition( - context, - getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue() - ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) - ,table, fs - ,grpName,perms)); } } - //Publish the new partition(s) - if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){ + //Publish the new partition(s) if any + if (!partitionsToAdd.isEmpty()) { + if (dynamicPartitioningUsed && harProcessor.isEnabled()) { - Path src = new Path(ptnRootLocation); + Path src = new Path(ptnRootLocation); - // check here for each dir we're copying out, to see if it already exists, error out if so - moveTaskOutputs(fs, src, src, tblPath,true); + // check here for each dir we're copying out, to see if it + // already exists, error out if so + moveTaskOutputs(fs, src, src, tblPath, true); - moveTaskOutputs(fs, src, src, tblPath,false); - fs.delete(src, true); + moveTaskOutputs(fs, src, src, tblPath, false); + fs.delete(src, true); + // for (Partition partition : partitionsToAdd){ + // partitionsAdded.add(client.add_partition(partition)); + // // currently following add_partition instead of + // add_partitions because latter isn't + // // all-or-nothing and we want to be able to roll back + // partitions we added if need be. + // } -// for (Partition partition : partitionsToAdd){ -// partitionsAdded.add(client.add_partition(partition)); -// // currently following add_partition instead of add_partitions because latter isn't -// // all-or-nothing and we want to be able to roll back partitions we added if need be. -// } - - try { - client.add_partitions(partitionsToAdd); - partitionsAdded = partitionsToAdd; - } catch (Exception e){ - // There was an error adding partitions : rollback fs copy and rethrow - for (Partition p : partitionsToAdd){ - Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation()))); - if (fs.exists(ptnPath)){ - fs.delete(ptnPath,true); + try { + LOG.info("Publishing partitions to the metastore."); + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; + } catch (Exception e) { + // There was an error adding partitions : rollback fs + // copy and rethrow + for (Partition p : partitionsToAdd) { + Path ptnPath = new Path( + harProcessor.getParentFSPath(new Path(p + .getSd().getLocation()))); + if (fs.exists(ptnPath)) { + fs.delete(ptnPath, true); + } } + throw e; } - throw e; - } - }else{ - // no harProcessor, regular operation + } else { + // no harProcessor, regular operation - // No duplicate partition publish case to worry about because we'll - // get a AlreadyExistsException here if so, and appropriately rollback + // No duplicate partition publish case to worry about + // because we'll + // get a AlreadyExistsException here if so, and + // appropriately rollback - client.add_partitions(partitionsToAdd); - partitionsAdded = partitionsToAdd; + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; - if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){ - Path src = new Path(ptnRootLocation); - moveTaskOutputs(fs, src, src, tblPath,false); - fs.delete(src, true); + if (dynamicPartitioningUsed && (partitionsAdded.size() > 0)) { + Path src = new Path(ptnRootLocation); + moveTaskOutputs(fs, src, src, tblPath, false); + fs.delete(src, true); + } + } - } if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {