Index: src/test/org/apache/hcatalog/pig/TestHCatStorer.java =================================================================== --- src/test/org/apache/hcatalog/pig/TestHCatStorer.java (revision 1367361) +++ src/test/org/apache/hcatalog/pig/TestHCatStorer.java (working copy) @@ -17,6 +17,7 @@ */ package org.apache.hcatalog.pig; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -30,6 +31,7 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.HcatTestUtils; +import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigServer; @@ -37,6 +39,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.LogUtils; +import org.junit.Assert; public class TestHCatStorer extends TestCase { private static final String TEST_DATA_DIR = System.getProperty("user.dir") + @@ -639,4 +642,57 @@ assertEquals(0, results.size()); driver.run("drop table employee"); } + + public void testPartitionPublish() throws IOException, CommandNeedRetryException{ + + driver.run("drop table ptn_fail"); + String createTable = "create table ptn_fail(a int, c string) partitioned by (b string) stored as RCFILE"; + int retCode = driver.run(createTable).getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table."); + } + int LOOP_SIZE = 11; + String[] input = new String[LOOP_SIZE]; + + for(int i = 0; i < LOOP_SIZE; i++) { + input[i] = i + "\tmath"; + } + HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input); + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerQuery("A = load '"+ INPUT_FILE_NAME+"' as (a:int, c:chararray);"); + server.registerQuery("B = filter A by " + FailEvalFunc.class.getName() + "($0);"); + server.registerQuery("store B into 'ptn_fail' using "+HCatStorer.class.getName()+"('b=math');"); + server.executeBatch(); + + String query = "show partitions ptn_fail"; + retCode = driver.run(query).getResponseCode(); + + if( retCode != 0 ) { + throw new IOException("Error " + retCode + " running query " + query); + } + + ArrayList res = new ArrayList(); + driver.getResults(res); + assertEquals(0, res.size()); + + //Make sure the partitions directory is not in hdfs. + Assert.assertTrue((new File(TEST_WAREHOUSE_DIR + "/ptn_fail")).exists()); + Assert.assertFalse((new File(TEST_WAREHOUSE_DIR + "/ptn_fail/b=math")).exists()); + } + + static public class FailEvalFunc extends EvalFunc { + + /* @param Tuple + /* @return null + /* @throws IOException + * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple) + */ + @Override + public Boolean exec(Tuple tuple) throws IOException { + throw new IOException("Eval Func to mimic Failure."); + } + + } + } Index: src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (revision 1367361) +++ src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (working copy) @@ -167,7 +167,7 @@ HCatOutputFormat.setSchema(job, getSchema()); job.setNumReduceTasks(0); assertTrue(job.waitForCompletion(true)); - new FileOutputCommitterContainer(job, null).cleanupJob(job); + new FileOutputCommitterContainer(job, null).commitJob(job); assertTrue(job.isSuccessful()); server.setBatchOn(); @@ -211,7 +211,7 @@ job.setOutputFormatClass(HCatOutputFormat.class); HCatOutputFormat.setSchema(job, getSchema()); assertTrue(job.waitForCompletion(true)); - new FileOutputCommitterContainer(job, null).cleanupJob(job); + new FileOutputCommitterContainer(job, null).commitJob(job); assertTrue(job.isSuccessful()); server.setBatchOn(); Index: src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java (revision 0) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java (revision 0) @@ -0,0 +1,236 @@ +package org.apache.hcatalog.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hcatalog.NoExitSecurityManager; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHCatPartitionPublish { + private static File workDir; + private static Configuration mrConf = null; + private static FileSystem fs = null; + private static MiniMRCluster mrCluster = null; + private static boolean isServerRunning = false; + private static final int msPort = 20101; + private static HiveConf hcatConf; + private static HiveMetaStoreClient msc; + private static SecurityManager securityManager; + + @BeforeClass + public static void setup() throws Exception { + createWorkDir(); + Configuration conf = new Configuration(true); + fs = FileSystem.get(conf); + System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath()); + // LocalJobRunner does not work with mapreduce OutputCommitter. So need + // to use MiniMRCluster. MAPREDUCE-2350 + mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null, + new JobConf(conf)); + mrConf = mrCluster.createJobConf(); + + if(isServerRunning) { + return; + } + + MetaStoreUtils.startMetaStore(msPort, ShimLoader.getHadoopThriftAuthBridge()); + isServerRunning = true; + securityManager = System.getSecurityManager(); + System.setSecurityManager(new NoExitSecurityManager()); + + hcatConf = new HiveConf(TestHCatPartitionPublish.class); + hcatConf.set("hive.metastore.local", "false"); + hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort); + hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3); + + hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName()); + hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + msc = new HiveMetaStoreClient(hcatConf,null); + System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); + System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + } + + private static void createWorkDir() throws IOException { + String testDir = System.getProperty("test.data.dir", "./build/test"); + testDir = testDir + "/test_hcat_test_ptn" + Math.abs(new Random().nextLong()) + "/"; + workDir = new File(new File(testDir).getCanonicalPath()); + FileUtil.fullyDelete(workDir); + workDir.mkdirs(); + } + + @AfterClass + public static void tearDown() throws IOException { + if (mrCluster != null) { + mrCluster.shutdown(); + } + FileUtil.fullyDelete(workDir); + System.setSecurityManager(securityManager); + isServerRunning = false; + } + + @Test + public void testPartitionPublish() throws Exception { + String dbName = "default"; + String tableName = "testHCatPartitionedTable"; + createTable(null,tableName); + + Map partitionMap = new HashMap(); + partitionMap.put("part1", "p1value1"); + partitionMap.put("part0", "p0value1"); + + ArrayList hcatTableColumns = new ArrayList(); + for(FieldSchema fs : getTableColumns()){ + hcatTableColumns.add(HCatSchemaUtils.getHCatFieldSchema(fs)); + } + + runMRCreateFail(dbName, tableName, partitionMap, hcatTableColumns); + List ptns = msc.listPartitionNames(dbName, tableName, (short)10); + Assert.assertEquals(0, ptns.size()); + Table table = msc.getTable(dbName, tableName); + Assert.assertTrue(table != null); + // Also make sure that the directory has been deleted in the table location. + Assert.assertFalse(fs.exists(new Path(table.getSd().getLocation() + "/part1=p1value1/part0=p0value1"))); + } + + void runMRCreateFail(String dbName, String tableName, Map partitionValues, + List columns) throws Exception { + + Job job = new Job(mrConf, "hcat mapreduce write fail test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(TestHCatPartitionPublish.MapFail.class); + + // input/output settings + job.setInputFormatClass(TextInputFormat.class); + + Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); + //The write count does not matter, as the map will fail in its first call. + createInputFile(path, 5); + + TextInputFormat.setInputPaths(job, path); + job.setOutputFormatClass(HCatOutputFormat.class); + OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues); + HCatOutputFormat.setOutput(job, outputJobInfo); + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(DefaultHCatRecord.class); + + job.setNumReduceTasks(0); + + HCatOutputFormat.setSchema(job, new HCatSchema(columns)); + + boolean success = job.waitForCompletion(true); + Assert.assertTrue(success == false); + } + + private void createInputFile(Path path, int rowCount) throws IOException { + if (fs.exists(path)) { + fs.delete(path, true); + } + FSDataOutputStream os = fs.create(path); + for (int i = 0; i < rowCount; i++) { + os.writeChars(i + "\n"); + } + os.close(); + } + + public static class MapFail extends + Mapper { + + @Override + public void map(LongWritable key, Text value, Context context + ) throws IOException, InterruptedException { + { + throw new IOException("Exception to mimic job failure."); + } + } + } + + private void createTable(String dbName,String tableName) throws Exception{ + String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; + try { + msc.dropTable(databaseName, tableName); + } catch(Exception e) { + } //can fail with NoSuchObjectException + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType("MANAGED_TABLE"); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(getTableColumns()); + tbl.setPartitionKeys(getPartitionKeys()); + tbl.setSd(sd); + sd.setBucketCols(new ArrayList(2)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters().put( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1"); + sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName()); + sd.setInputFormat(RCFileInputFormat.class.getName()); + sd.setOutputFormat(RCFileOutputFormat.class.getName()); + + Map tableParams = new HashMap(); + tbl.setParameters(tableParams); + + msc.createTable(tbl); + } + + protected List getPartitionKeys() { + List fields = new ArrayList(); + //Defining partition names in unsorted order + fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + protected List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); + return fields; + } + +} Index: src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (revision 1367361) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (working copy) @@ -154,7 +154,7 @@ public void publishTest(Job job) throws Exception { OutputCommitter committer = new FileOutputCommitterContainer(job,null); - committer.cleanupJob(job); + committer.commitJob(job); Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1")); assertNotNull(part); Index: src/java/org/apache/hcatalog/pig/HCatStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatStorer.java (revision 1367361) +++ src/java/org/apache/hcatalog/pig/HCatStorer.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; @@ -156,7 +157,7 @@ //Calling it from here so that the partition publish happens. //This call needs to be removed after MAPREDUCE-1447 is fixed. getOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext( - job.getConfiguration(), new TaskAttemptID())).cleanupJob(job); + job.getConfiguration(), new TaskAttemptID())).commitJob(job); } catch (IOException e) { throw new IOException("Failed to cleanup job",e); } catch (InterruptedException e) { @@ -164,4 +165,23 @@ } } } + + @Override + public void cleanupOnFailure(String location, Job job) throws IOException { + if (job.getConfiguration().get("mapred.job.tracker", "") + .equalsIgnoreCase("local")) { + try { + // This call needs to be removed after MAPREDUCE-1447 is fixed. + getOutputFormat().getOutputCommitter( + HCatHadoopShims.Instance.get() + .createTaskAttemptContext( + job.getConfiguration(), + new TaskAttemptID())).abortJob(job, State.FAILED); + } catch (IOException e) { + throw new IOException("Failed to abort job", e); + } catch (InterruptedException e) { + throw new IOException("Failed to abort job", e); + } + } + } } Index: src/java/org/apache/hcatalog/mapreduce/InternalUtil.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (revision 1367361) +++ src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (working copy) @@ -21,7 +21,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; @@ -45,6 +47,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -56,7 +59,7 @@ for (String key : properties.keySet()){ hcatProperties.put(key, properties.get(key)); } - + // also populate with StorageDescriptor->SerDe.Parameters for (Map.Entryparam : sd.getSerdeInfo().getParameters().entrySet()) { @@ -132,20 +135,20 @@ //TODO this has to find a better home, it's also hardcoded as default in hive would be nice // if the default was decided by the serde - static void initializeOutputSerDe(SerDe serDe, Configuration conf, - OutputJobInfo jobInfo) + static void initializeOutputSerDe(SerDe serDe, Configuration conf, + OutputJobInfo jobInfo) throws SerDeException { - initializeSerDe(serDe, conf, jobInfo.getTableInfo(), - jobInfo.getOutputSchema()); + initializeSerDe(serDe, conf, jobInfo.getTableInfo(), + jobInfo.getOutputSchema()); } - static void initializeInputSerDe(SerDe serDe, Configuration conf, + static void initializeInputSerDe(SerDe serDe, Configuration conf, HCatTableInfo info, HCatSchema s) throws SerDeException { - initializeSerDe(serDe, conf, info, s); + initializeSerDe(serDe, conf, info, s); } - static void initializeSerDe(SerDe serDe, Configuration conf, + static void initializeSerDe(SerDe serDe, Configuration conf, HCatTableInfo info, HCatSchema s) throws SerDeException { Properties props = new Properties(); @@ -183,4 +186,25 @@ + " but found " + split.getClass().getName()); } } + + static Map createPtnKeyValueMap(Table table, Partition ptn) throws IOException{ + List values = ptn.getValues(); + if( values.size() != table.getPartitionKeys().size() ) { + throw new IOException("Partition values in partition inconsistent with table definition, table " + + table.getTableName() + " has " + + table.getPartitionKeys().size() + + " partition keys, partition has " + values.size() + "partition values" ); + } + + Map ptnKeyValues = new HashMap(); + + int i = 0; + for(FieldSchema schema : table.getPartitionKeys()) { + // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues() + ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i)); + i++; + } + + return ptnKeyValues; + } } Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1367361) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (working copy) @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -103,7 +102,7 @@ PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(), job.getConfiguration(), inputJobInfo); - partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); + partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); } @@ -124,27 +123,6 @@ } - private static Map createPtnKeyValueMap(Table table, Partition ptn) throws IOException{ - List values = ptn.getValues(); - if( values.size() != table.getPartitionKeys().size() ) { - throw new IOException("Partition values in partition inconsistent with table definition, table " - + table.getTableName() + " has " - + table.getPartitionKeys().size() - + " partition keys, partition has " + values.size() + "partition values" ); - } - - Map ptnKeyValues = new HashMap(); - - int i = 0; - for(FieldSchema schema : table.getPartitionKeys()) { - // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues() - ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i)); - i++; - } - - return ptnKeyValues; - } - static PartInfo extractPartInfo(StorageDescriptor sd, Map parameters, Configuration conf, InputJobInfo inputJobInfo) throws IOException{ Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1367361) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -136,61 +136,7 @@ @Override public void abortJob(JobContext jobContext, State state) throws IOException { - org.apache.hadoop.mapred.JobContext - mapRedJobContext = HCatMapRedUtil.createJobContext(jobContext); - if (dynamicPartitioningUsed){ - discoverPartitions(jobContext); - } - - if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().abortJob(mapRedJobContext, state); - } - else if (dynamicPartitioningUsed){ - for(JobContext currContext : contextDiscoveredByPath.values()){ - try { - new JobConf(currContext.getConfiguration()).getOutputCommitter().abortJob(currContext, state); - } catch (Exception e) { - throw new IOException(e); - } - } - } - - HiveMetaStoreClient client = null; - try { - HiveConf hiveConf = HCatUtil.getHiveConf(jobContext.getConfiguration()); - client = HCatUtil.createHiveClient(hiveConf); - // cancel the deleg. tokens that were acquired for this job now that - // we are done - we should cancel if the tokens were acquired by - // HCatOutputFormat and not if they were supplied by Oozie. - // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in - // the conf will not be set - String tokenStrForm = client.getTokenStrForm(); - if(tokenStrForm != null && jobContext.getConfiguration().get - (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { - client.cancelDelegationToken(tokenStrForm); - } - } catch(Exception e) { - if( e instanceof HCatException ) { - throw (HCatException) e; - } else { - throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); - } - } finally { - HCatUtil.closeHiveClientQuietly(client); - } - - Path src; - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); - if (dynamicPartitioningUsed){ - src = new Path(getPartitionRootLocation( - jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize() - )); - }else{ - src = new Path(jobInfo.getLocation()); - } - FileSystem fs = src.getFileSystem(jobContext.getConfiguration()); -// LOG.warn("abortJob about to delete ["+src.toString() +"]"); - fs.delete(src, true); + abortJobInternal(jobContext, state); } public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; @@ -204,187 +150,44 @@ @Override public void commitJob(JobContext jobContext) throws IOException { - if (dynamicPartitioningUsed){ - discoverPartitions(jobContext); - } - if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext)); - } - // create _SUCCESS FILE if so requested. - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); - if(getOutputDirMarking(jobContext.getConfiguration())) { - Path outputPath = new Path(jobInfo.getLocation()); - if (outputPath != null) { - FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration()); - // create a file in the folder to mark it - if (fileSys.exists(outputPath)) { - Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME); - if(!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob() - fileSys.create(filePath).close(); - } - } - } - } - cleanupJob(jobContext); - } - - @Override - public void cleanupJob(JobContext context) throws IOException { - - if (dynamicPartitioningUsed){ - discoverPartitions(context); - } - - - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); - Configuration conf = context.getConfiguration(); - Table table = jobInfo.getTableInfo().getTable(); - Path tblPath = new Path(table.getSd().getLocation()); - FileSystem fs = tblPath.getFileSystem(conf); - - if( table.getPartitionKeys().size() == 0 ) { - //non partitioned table - if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); - } - else if (dynamicPartitioningUsed){ - for(JobContext currContext : contextDiscoveredByPath.values()){ - try { - JobConf jobConf = new JobConf(currContext.getConfiguration()); - jobConf.getOutputCommitter().cleanupJob(currContext); - } catch (Exception e) { - throw new IOException(e); - } - } - } - - //Move data from temp directory the actual table directory - //No metastore operation required. - Path src = new Path(jobInfo.getLocation()); - moveTaskOutputs(fs, src, src, tblPath, false); - fs.delete(src, true); - return; - } - - HiveMetaStoreClient client = null; - HCatTableInfo tableInfo = jobInfo.getTableInfo(); - - List partitionsAdded = new ArrayList(); - try { - HiveConf hiveConf = HCatUtil.getHiveConf(conf); - client = HCatUtil.createHiveClient(hiveConf); - - StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters()); - - updateTableSchema(client, table, jobInfo.getOutputSchema()); - - FileStatus tblStat = fs.getFileStatus(tblPath); - String grpName = tblStat.getGroup(); - FsPermission perms = tblStat.getPermission(); - - List partitionsToAdd = new ArrayList(); - if (!dynamicPartitioningUsed){ - partitionsToAdd.add( - constructPartition( - context, - tblPath.toString(), jobInfo.getPartitionValues() - ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) - ,table, fs - ,grpName,perms)); - }else{ - for (Entry> entry : partitionsDiscoveredByPath.entrySet()){ - partitionsToAdd.add( - constructPartition( - context, - getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue() - ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) - ,table, fs - ,grpName,perms)); - } + if (dynamicPartitioningUsed) { + discoverPartitions(jobContext); } - - //Publish the new partition(s) - if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){ - - Path src = new Path(ptnRootLocation); - - // check here for each dir we're copying out, to see if it already exists, error out if so - moveTaskOutputs(fs, src, src, tblPath,true); - - moveTaskOutputs(fs, src, src, tblPath,false); - fs.delete(src, true); - - -// for (Partition partition : partitionsToAdd){ -// partitionsAdded.add(client.add_partition(partition)); -// // currently following add_partition instead of add_partitions because latter isn't -// // all-or-nothing and we want to be able to roll back partitions we added if need be. -// } - - try { - client.add_partitions(partitionsToAdd); - partitionsAdded = partitionsToAdd; - } catch (Exception e){ - // There was an error adding partitions : rollback fs copy and rethrow - for (Partition p : partitionsToAdd){ - Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation()))); - if (fs.exists(ptnPath)){ - fs.delete(ptnPath,true); + if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { + getBaseOutputCommitter().commitJob( + HCatMapRedUtil.createJobContext(jobContext)); + } + registerPartitions(jobContext); + // create _SUCCESS FILE if so requested. + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); + if (getOutputDirMarking(jobContext.getConfiguration())) { + Path outputPath = new Path(jobInfo.getLocation()); + if (outputPath != null) { + FileSystem fileSys = outputPath.getFileSystem(jobContext + .getConfiguration()); + // create a file in the folder to mark it + if (fileSys.exists(outputPath)) { + Path filePath = new Path(outputPath, + SUCCEEDED_FILE_NAME); + if (!fileSys.exists(filePath)) { // may have been + // created by + // baseCommitter.commitJob() + fileSys.create(filePath).close(); } } - throw e; } - - }else{ - // no harProcessor, regular operation - - // No duplicate partition publish case to worry about because we'll - // get a AlreadyExistsException here if so, and appropriately rollback - - client.add_partitions(partitionsToAdd); - partitionsAdded = partitionsToAdd; - - if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){ - Path src = new Path(ptnRootLocation); - moveTaskOutputs(fs, src, src, tblPath,false); - fs.delete(src, true); - } - } - - if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); - } - - if(Security.getInstance().isSecurityEnabled()) { - Security.getInstance().cancelToken(client, context); - } - } catch (Exception e) { - - if( partitionsAdded.size() > 0 ) { - try { - //baseCommitter.cleanupJob failed, try to clean up the metastore - for (Partition p : partitionsAdded){ - client.dropPartition(tableInfo.getDatabaseName(), - tableInfo.getTableName(), p.getValues()); - } - } catch(Exception te) { - //Keep cause as the original exception - throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); - } - } - - if( e instanceof HCatException ) { - throw (HCatException) e; - } else { - throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); - } } finally { - HCatUtil.closeHiveClientQuietly(client); + cancelDelegationTokens(jobContext); } } + @Override + public void cleanupJob(JobContext context) throws IOException { + abortJobInternal(context, State.FAILED); + } + private String getPartitionRootLocation(String ptnLocn,int numPtnKeys) { if (ptnRootLocation == null){ // we only need to calculate it once, it'll be the same for other partitions in this job. @@ -542,6 +345,7 @@ //Update table schema to add the newly added columns table.getSd().setCols(tableColumns); client.alter_table(table.getDbName(), table.getTableName(), table); + LOG.info("The columns {} have been added to the table {}.", newColumns, table.getTableName()); } } @@ -681,4 +485,194 @@ } } + private void registerPartitions(JobContext context) throws IOException{ + if (dynamicPartitioningUsed){ + discoverPartitions(context); + } + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + Configuration conf = context.getConfiguration(); + Table table = jobInfo.getTableInfo().getTable(); + Path tblPath = new Path(table.getSd().getLocation()); + FileSystem fs = tblPath.getFileSystem(conf); + + if( table.getPartitionKeys().size() == 0 ) { + //Move data from temp directory the actual table directory + //No metastore operation required. + Path src = new Path(jobInfo.getLocation()); + moveTaskOutputs(fs, src, src, tblPath, false); + fs.delete(src, true); + return; + } + + HiveMetaStoreClient client = null; + HCatTableInfo tableInfo = jobInfo.getTableInfo(); + List partitionsAdded = new ArrayList(); + try { + HiveConf hiveConf = HCatUtil.getHiveConf(conf); + client = HCatUtil.createHiveClient(hiveConf); + StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters()); + + FileStatus tblStat = fs.getFileStatus(tblPath); + String grpName = tblStat.getGroup(); + FsPermission perms = tblStat.getPermission(); + + List partitionsToAdd = new ArrayList(); + if (!dynamicPartitioningUsed){ + partitionsToAdd.add( + constructPartition( + context, + tblPath.toString(), jobInfo.getPartitionValues() + ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) + ,table, fs + ,grpName,perms)); + }else{ + for (Entry> entry : partitionsDiscoveredByPath.entrySet()){ + partitionsToAdd.add( + constructPartition( + context, + getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue() + ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) + ,table, fs + ,grpName,perms)); + } + } + + ArrayList> ptnInfos = new ArrayList>(); + for(Partition ptn : partitionsToAdd){ + ptnInfos.add(InternalUtil.createPtnKeyValueMap(tableInfo.getTable(), ptn)); + } + + //Publish the new partition(s) + if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){ + + Path src = new Path(ptnRootLocation); + // check here for each dir we're copying out, to see if it + // already exists, error out if so + moveTaskOutputs(fs, src, src, tblPath,true); + moveTaskOutputs(fs, src, src, tblPath,false); + fs.delete(src, true); + try { + updateTableSchema(client, table, jobInfo.getOutputSchema()); + client.add_partitions(partitionsToAdd); + LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos); + partitionsAdded = partitionsToAdd; + } catch (Exception e){ + // There was an error adding partitions : rollback fs copy and rethrow + for (Partition p : partitionsToAdd){ + Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation()))); + if (fs.exists(ptnPath)){ + fs.delete(ptnPath,true); + } + } + throw e; + } + + }else{ + // no harProcessor, regular operation + // No duplicate partition publish case to worry about because we'll + // get a AlreadyExistsException here if so, and appropriately rollback + updateTableSchema(client, table, jobInfo.getOutputSchema()); + client.add_partitions(partitionsToAdd); + LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos); + partitionsAdded = partitionsToAdd; + if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){ + Path src = new Path(ptnRootLocation); + moveTaskOutputs(fs, src, src, tblPath,false); + fs.delete(src, true); + } + } + } catch (Exception e) { + if( partitionsAdded.size() > 0 ) { + try { + //baseCommitter.cleanupJob failed, try to clean up the metastore + for (Partition p : partitionsAdded){ + client.dropPartition(tableInfo.getDatabaseName(), + tableInfo.getTableName(), p.getValues()); + } + } catch(Exception te) { + //Keep cause as the original exception + throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); + } + } + if( e instanceof HCatException ) { + throw (HCatException) e; + } else { + throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); + } + } finally { + HCatUtil.closeHiveClientQuietly(client); + } + } + + /** + * This method exists to ensure backward compatibility with Pig 0.8 and + * 0.9 versions. The cleanupJob method is deprecated but, Pig 0.8 and + * 0.9 call cleanupJob method. Hence this method is used by both abortJob + * and cleanupJob methods. + * @param JobContext The job context. + * @throws java.io.IOException + */ + private void abortJobInternal(JobContext context, State state) throws IOException{ + try{ + if (dynamicPartitioningUsed) { + discoverPartitions(context); + } + org.apache.hadoop.mapred.JobContext mapRedJobContext = HCatMapRedUtil + .createJobContext(context); + if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { + getBaseOutputCommitter().abortJob(mapRedJobContext, state); + } else if (dynamicPartitioningUsed) { + for (JobContext currContext : contextDiscoveredByPath.values()) { + try { + new JobConf(currContext.getConfiguration()) + .getOutputCommitter().abortJob(currContext, state); + } catch (Exception e) { + throw new IOException(e); + } + } + } + Path src; + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + if (dynamicPartitioningUsed) { + src = new Path(getPartitionRootLocation(jobInfo.getLocation() + .toString(), jobInfo.getTableInfo().getTable() + .getPartitionKeysSize())); + } else { + src = new Path(jobInfo.getLocation()); + } + FileSystem fs = src.getFileSystem(context.getConfiguration()); + LOG.info("Job failed. Cleaning up temporary directory [{}].",src); + System.out.println("Directory to be deleted : " + src); + fs.delete(src, true); + } finally { + cancelDelegationTokens(context); + } + } + + private void cancelDelegationTokens(JobContext context) throws IOException{ + LOG.info("Cancelling deletgation token for the job."); + HiveMetaStoreClient client = null; + try { + HiveConf hiveConf = HCatUtil + .getHiveConf(context.getConfiguration()); + client = HCatUtil.createHiveClient(hiveConf); + // cancel the deleg. tokens that were acquired for this job now that + // we are done - we should cancel if the tokens were acquired by + // HCatOutputFormat and not if they were supplied by Oozie. + // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in + // the conf will not be set + String tokenStrForm = client.getTokenStrForm(); + if (tokenStrForm != null + && context.getConfiguration().get( + HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + client.cancelDelegationToken(tokenStrForm); + } + } catch (MetaException e) { + LOG.warn("MetaException while cancelling delegation token.",e ); + } catch (TException e) { + LOG.warn("TException while cancelling delegation token.", e); + } finally { + HCatUtil.closeHiveClientQuietly(client); + } + } }