Index: hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java =================================================================== --- hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java (revision 1392658) +++ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java (working copy) @@ -17,6 +17,7 @@ */ package org.apache.hcatalog.pig; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -25,6 +26,7 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hcatalog.HcatTestUtils; import org.apache.hcatalog.mapreduce.HCatBaseTest; +import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigServer; @@ -593,4 +595,62 @@ Assert.assertEquals(0, results.size()); driver.run("drop table employee"); } + + public void testPartitionPublish() + throws IOException, CommandNeedRetryException { + + driver.run("drop table ptn_fail"); + String createTable = "create table ptn_fail(a int, c string) partitioned by (b string) stored as RCFILE"; + int retCode = driver.run(createTable).getResponseCode(); + if (retCode != 0) { + throw new IOException("Failed to create table."); + } + int LOOP_SIZE = 11; + String[] input = new String[LOOP_SIZE]; + + for (int i = 0; i < LOOP_SIZE; i++) { + input[i] = i + "\tmath"; + } + HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input); + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerQuery("A = load '" + INPUT_FILE_NAME + + "' as (a:int, c:chararray);"); + server.registerQuery("B = filter A by " + FailEvalFunc.class.getName() + + "($0);"); + server.registerQuery("store B into 'ptn_fail' using " + + HCatStorer.class.getName() + "('b=math');"); + server.executeBatch(); + + String query = "show partitions ptn_fail"; + retCode = driver.run(query).getResponseCode(); + + if (retCode != 0) { + throw new IOException("Error " + retCode + " running query " + + query); + } + + ArrayList res = new ArrayList(); + driver.getResults(res); + Assert.assertEquals(0, res.size()); + + // Make sure the partitions directory is not in hdfs. + Assert.assertTrue((new File(TEST_WAREHOUSE_DIR + "/ptn_fail")).exists()); + Assert.assertFalse((new File(TEST_WAREHOUSE_DIR + "/ptn_fail/b=math")) + .exists()); + } + + static public class FailEvalFunc extends EvalFunc { + + /* + * @param Tuple /* @return null /* @throws IOException + * + * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple) + */ + @Override + public Boolean exec(Tuple tuple) throws IOException { + throw new IOException("Eval Func to mimic Failure."); + } + + } } Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1392658) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -18,6 +18,15 @@ package org.apache.hcatalog.mapreduce; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -52,15 +61,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - /** * Part of the FileOutput*Container classes * See {@link FileOutputFormatContainer} for more information @@ -139,59 +139,7 @@ @Override public void abortJob(JobContext jobContext, State state) throws IOException { - org.apache.hadoop.mapred.JobContext - mapRedJobContext = HCatMapRedUtil.createJobContext(jobContext); - if (dynamicPartitioningUsed) { - discoverPartitions(jobContext); - } - - if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().abortJob(mapRedJobContext, state); - } else if (dynamicPartitioningUsed) { - for (JobContext currContext : contextDiscoveredByPath.values()) { - try { - new JobConf(currContext.getConfiguration()).getOutputCommitter().abortJob(currContext, state); - } catch (Exception e) { - throw new IOException(e); - } - } - } - - HiveMetaStoreClient client = null; - try { - HiveConf hiveConf = HCatUtil.getHiveConf(jobContext.getConfiguration()); - client = HCatUtil.getHiveClient(hiveConf); - // cancel the deleg. tokens that were acquired for this job now that - // we are done - we should cancel if the tokens were acquired by - // HCatOutputFormat and not if they were supplied by Oozie. - // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in - // the conf will not be set - String tokenStrForm = client.getTokenStrForm(); - if (tokenStrForm != null && jobContext.getConfiguration().get - (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { - client.cancelDelegationToken(tokenStrForm); - } - } catch (Exception e) { - if (e instanceof HCatException) { - throw (HCatException) e; - } else { - throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); - } - } finally { - HCatUtil.closeHiveClientQuietly(client); - } - - Path src; - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); - if (dynamicPartitioningUsed) { - src = new Path(getPartitionRootLocation(jobInfo.getLocation(), - jobInfo.getTableInfo().getTable().getPartitionKeysSize())); - } else { - src = new Path(jobInfo.getLocation()); - } - FileSystem fs = src.getFileSystem(jobContext.getConfiguration()); -// LOG.warn("abortJob about to delete ["+src.toString() +"]"); - fs.delete(src, true); + internalAbortJob(jobContext, State.FAILED); } public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; @@ -205,191 +153,50 @@ @Override public void commitJob(JobContext jobContext) throws IOException { - if (dynamicPartitioningUsed) { - discoverPartitions(jobContext); - // Commit each partition so it gets moved out of the job work dir - for (JobContext context : contextDiscoveredByPath.values()) { - new JobConf(context.getConfiguration()).getOutputCommitter().commitJob(context); - } - } - if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext)); - } - // create _SUCCESS FILE if so requested. - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); - if (getOutputDirMarking(jobContext.getConfiguration())) { - Path outputPath = new Path(jobInfo.getLocation()); - if (outputPath != null) { - FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration()); - // create a file in the folder to mark it - if (fileSys.exists(outputPath)) { - Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME); - if (!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob() - fileSys.create(filePath).close(); - } + try { + if (dynamicPartitioningUsed) { + discoverPartitions(jobContext); + // Commit each partition so it gets moved out of the job work + // dir + for (JobContext context : contextDiscoveredByPath.values()) { + new JobConf(context.getConfiguration()) + .getOutputCommitter().commitJob(context); } } - } - cleanupJob(jobContext); - } - - @Override - public void cleanupJob(JobContext context) throws IOException { - - if (dynamicPartitioningUsed) { - discoverPartitions(context); - } - - - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); - Configuration conf = context.getConfiguration(); - Table table = new Table(jobInfo.getTableInfo().getTable()); - Path tblPath = new Path(table.getTTable().getSd().getLocation()); - FileSystem fs = tblPath.getFileSystem(conf); - - if (table.getPartitionKeys().size() == 0) { - //non partitioned table if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); - } else if (dynamicPartitioningUsed) { - for (JobContext currContext : contextDiscoveredByPath.values()) { - try { - JobConf jobConf = new JobConf(currContext.getConfiguration()); - jobConf.getOutputCommitter().cleanupJob(currContext); - } catch (Exception e) { - throw new IOException(e); - } - } + getBaseOutputCommitter().commitJob( + HCatMapRedUtil.createJobContext(jobContext)); } - - //Move data from temp directory the actual table directory - //No metastore operation required. - Path src = new Path(jobInfo.getLocation()); - moveTaskOutputs(fs, src, src, tblPath, false); - fs.delete(src, true); - return; - } - - HiveMetaStoreClient client = null; - HCatTableInfo tableInfo = jobInfo.getTableInfo(); - - List partitionsAdded = new ArrayList(); - - try { - HiveConf hiveConf = HCatUtil.getHiveConf(conf); - client = HCatUtil.getHiveClient(hiveConf); - - StorerInfo storer = - InternalUtil.extractStorerInfo(table.getTTable().getSd(), table.getParameters()); - - updateTableSchema(client, table, jobInfo.getOutputSchema()); - - FileStatus tblStat = fs.getFileStatus(tblPath); - String grpName = tblStat.getGroup(); - FsPermission perms = tblStat.getPermission(); - - List partitionsToAdd = new ArrayList(); - if (!dynamicPartitioningUsed) { - partitionsToAdd.add( - constructPartition( - context, jobInfo, - tblPath.toString(), jobInfo.getPartitionValues() - , jobInfo.getOutputSchema(), getStorerParameterMap(storer) - , table, fs - , grpName, perms)); - } else { - for (Entry> entry : partitionsDiscoveredByPath.entrySet()) { - partitionsToAdd.add( - constructPartition( - context, jobInfo, - getPartitionRootLocation(entry.getKey(), entry.getValue().size()), entry.getValue() - , jobInfo.getOutputSchema(), getStorerParameterMap(storer) - , table, fs - , grpName, perms)); - } - } - - //Publish the new partition(s) - if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())) { - - Path src = new Path(ptnRootLocation); - - // check here for each dir we're copying out, to see if it already exists, error out if so - moveTaskOutputs(fs, src, src, tblPath, true); - - moveTaskOutputs(fs, src, src, tblPath, false); - fs.delete(src, true); - - -// for (Partition partition : partitionsToAdd){ -// partitionsAdded.add(client.add_partition(partition)); -// // currently following add_partition instead of add_partitions because latter isn't -// // all-or-nothing and we want to be able to roll back partitions we added if need be. -// } - - try { - client.add_partitions(partitionsToAdd); - partitionsAdded = partitionsToAdd; - } catch (Exception e) { - // There was an error adding partitions : rollback fs copy and rethrow - for (Partition p : partitionsToAdd) { - Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation()))); - if (fs.exists(ptnPath)) { - fs.delete(ptnPath, true); + registerPartitions(jobContext); + // create _SUCCESS FILE if so requested. + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); + if (getOutputDirMarking(jobContext.getConfiguration())) { + Path outputPath = new Path(jobInfo.getLocation()); + if (outputPath != null) { + FileSystem fileSys = outputPath.getFileSystem(jobContext + .getConfiguration()); + // create a file in the folder to mark it + if (fileSys.exists(outputPath)) { + Path filePath = new Path(outputPath, + SUCCEEDED_FILE_NAME); + if (!fileSys.exists(filePath)) { // may have been + // created by + // baseCommitter.commitJob() + fileSys.create(filePath).close(); } } - throw e; } - - } else { - // no harProcessor, regular operation - - // No duplicate partition publish case to worry about because we'll - // get a AlreadyExistsException here if so, and appropriately rollback - - client.add_partitions(partitionsToAdd); - partitionsAdded = partitionsToAdd; - - if (dynamicPartitioningUsed && (partitionsAdded.size() > 0)) { - Path src = new Path(ptnRootLocation); - moveTaskOutputs(fs, src, src, tblPath, false); - fs.delete(src, true); - } - } - - if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); - } - - if (Security.getInstance().isSecurityEnabled()) { - Security.getInstance().cancelToken(client, context); - } - } catch (Exception e) { - - if (partitionsAdded.size() > 0) { - try { - //baseCommitter.cleanupJob failed, try to clean up the metastore - for (Partition p : partitionsAdded) { - client.dropPartition(tableInfo.getDatabaseName(), - tableInfo.getTableName(), p.getValues()); - } - } catch (Exception te) { - //Keep cause as the original exception - throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); - } - } - - if (e instanceof HCatException) { - throw (HCatException) e; - } else { - throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); - } } finally { - HCatUtil.closeHiveClientQuietly(client); + cancelDelegationTokens(jobContext); } } + @Override + public void cleanupJob(JobContext context) throws IOException { + throw new IOException("The method cleanupJob is deprecated and should not be called."); + } + private String getPartitionRootLocation(String ptnLocn, int numPtnKeys) { if (ptnRootLocation == null) { // we only need to calculate it once, it'll be the same for other partitions in this job. @@ -478,7 +285,6 @@ } else { partition.getSd().setLocation(partPath.toString()); } - return partition; } @@ -701,4 +507,198 @@ } } + private void registerPartitions(JobContext context) throws IOException{ + if (dynamicPartitioningUsed){ + discoverPartitions(context); + } + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + Configuration conf = context.getConfiguration(); + Table table = new Table(jobInfo.getTableInfo().getTable()); + Path tblPath = new Path(table.getTTable().getSd().getLocation()); + FileSystem fs = tblPath.getFileSystem(conf); + + if( table.getPartitionKeys().size() == 0 ) { + //Move data from temp directory the actual table directory + //No metastore operation required. + Path src = new Path(jobInfo.getLocation()); + moveTaskOutputs(fs, src, src, tblPath, false); + fs.delete(src, true); + return; + } + + HiveMetaStoreClient client = null; + HCatTableInfo tableInfo = jobInfo.getTableInfo(); + List partitionsAdded = new ArrayList(); + try { + HiveConf hiveConf = HCatUtil.getHiveConf(conf); + client = HCatUtil.getHiveClient(hiveConf); + StorerInfo storer = InternalUtil.extractStorerInfo(table.getTTable().getSd(),table.getParameters()); + + FileStatus tblStat = fs.getFileStatus(tblPath); + String grpName = tblStat.getGroup(); + FsPermission perms = tblStat.getPermission(); + + List partitionsToAdd = new ArrayList(); + if (!dynamicPartitioningUsed){ + partitionsToAdd.add( + constructPartition( + context,jobInfo, + tblPath.toString(), jobInfo.getPartitionValues() + ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) + ,table, fs + ,grpName,perms)); + }else{ + for (Entry> entry : partitionsDiscoveredByPath.entrySet()){ + partitionsToAdd.add( + constructPartition( + context,jobInfo, + getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue() + ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) + ,table, fs + ,grpName,perms)); + } + } + + ArrayList> ptnInfos = new ArrayList>(); + for(Partition ptn : partitionsToAdd){ + ptnInfos.add(InternalUtil.createPtnKeyValueMap(new Table(tableInfo.getTable()), ptn)); + } + + //Publish the new partition(s) + if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){ + + Path src = new Path(ptnRootLocation); + // check here for each dir we're copying out, to see if it + // already exists, error out if so + moveTaskOutputs(fs, src, src, tblPath,true); + moveTaskOutputs(fs, src, src, tblPath,false); + fs.delete(src, true); + try { + updateTableSchema(client, table, jobInfo.getOutputSchema()); + LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos); + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; + } catch (Exception e){ + // There was an error adding partitions : rollback fs copy and rethrow + for (Partition p : partitionsToAdd){ + Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation()))); + if (fs.exists(ptnPath)){ + fs.delete(ptnPath,true); + } + } + throw e; + } + + }else{ + // no harProcessor, regular operation + // No duplicate partition publish case to worry about because we'll + // get a AlreadyExistsException here if so, and appropriately rollback + updateTableSchema(client, table, jobInfo.getOutputSchema()); + LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos); + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; + if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){ + Path src = new Path(ptnRootLocation); + moveTaskOutputs(fs, src, src, tblPath,false); + fs.delete(src, true); + } + } + } catch (Exception e) { + if (partitionsAdded.size() > 0) { + try { + // baseCommitter.cleanupJob failed, try to clean up the + // metastore + for (Partition p : partitionsAdded) { + client.dropPartition(tableInfo.getDatabaseName(), + tableInfo.getTableName(), p.getValues()); + } + } catch (Exception te) { + // Keep cause as the original exception + throw new HCatException( + ErrorType.ERROR_PUBLISHING_PARTITION, e); + } + } + if (e instanceof HCatException) { + throw (HCatException) e; + } else { + throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); + } + } finally { + HCatUtil.closeHiveClientQuietly(client); + } + } + + /** + * This method exists to ensure unit tests run with Pig 0.8 and + * 0.9 versions. The cleanupJob method is deprecated but, Pig 0.8 and + * 0.9 call cleanupJob method. Hence this method is used by both abortJob + * and cleanupJob methods. + * @param JobContext The job context. + * @throws java.io.IOException + */ + private void internalAbortJob(JobContext context, State state) throws IOException{ + try { + if (dynamicPartitioningUsed) { + discoverPartitions(context); + } + org.apache.hadoop.mapred.JobContext mapRedJobContext = HCatMapRedUtil + .createJobContext(context); + if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { + getBaseOutputCommitter().abortJob(mapRedJobContext, state); + } else if (dynamicPartitioningUsed) { + for (JobContext currContext : contextDiscoveredByPath.values()) { + try { + new JobConf(currContext.getConfiguration()) + .getOutputCommitter().abortJob(currContext, + state); + } catch (Exception e) { + throw new IOException(e); + } + } + } + Path src; + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + if (dynamicPartitioningUsed) { + src = new Path(getPartitionRootLocation(jobInfo.getLocation() + .toString(), jobInfo.getTableInfo().getTable() + .getPartitionKeysSize())); + } else { + src = new Path(jobInfo.getLocation()); + } + FileSystem fs = src.getFileSystem(context.getConfiguration()); + LOG.info("Job failed. Cleaning up temporary directory [{}].", src); + fs.delete(src, true); + } finally { + cancelDelegationTokens(context); + } + } + + private void cancelDelegationTokens(JobContext context) throws IOException{ + LOG.info("Cancelling deletgation token for the job."); + HiveMetaStoreClient client = null; + try { + HiveConf hiveConf = HCatUtil + .getHiveConf(context.getConfiguration()); + client = HCatUtil.getHiveClient(hiveConf); + // cancel the deleg. tokens that were acquired for this job now that + // we are done - we should cancel if the tokens were acquired by + // HCatOutputFormat and not if they were supplied by Oozie. + // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in + // the conf will not be set + String tokenStrForm = client.getTokenStrForm(); + if (tokenStrForm != null + && context.getConfiguration().get( + HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + client.cancelDelegationToken(tokenStrForm); + } + } catch (MetaException e) { + LOG.warn("MetaException while cancelling delegation token.",e ); + } catch (TException e) { + LOG.warn("TException while cancelling delegation token.", e); + } finally { + HCatUtil.closeHiveClientQuietly(client); + } + } + + } Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1392658) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (working copy) @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.ql.metadata.Table; @@ -41,49 +40,52 @@ import org.slf4j.LoggerFactory; /** - * The Class which handles querying the metadata server using the MetaStoreClient. The list of - * partitions matching the partition filter is fetched from the server and the information is - * serialized and written into the JobContext configuration. The inputInfo is also updated with - * info required in the client process context. + * The Class which handles querying the metadata server using the + * MetaStoreClient. The list of partitions matching the partition filter is + * fetched from the server and the information is serialized and written into + * the JobContext configuration. The inputInfo is also updated with info + * required in the client process context. */ public class InitializeInput { - private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class); + private static final Logger LOG = LoggerFactory + .getLogger(InitializeInput.class); /** - * Set the input to use for the Job. This queries the metadata server with the specified - * partition predicates, gets the matching partitions, and puts the information in the job - * configuration object. + * Set the input to use for the Job. This queries the metadata server with + * the specified partition predicates, gets the matching partitions, and + * puts the information in the job configuration object. * - * To ensure a known InputJobInfo state, only the database name, table name, filter, and - * properties are preserved. All other modification from the given InputJobInfo are discarded. + * To ensure a known InputJobInfo state, only the database name, table name, + * filter, and properties are preserved. All other modification from the + * given InputJobInfo are discarded. * - * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows: - * {code} - * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize( - * job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); - * {code} + * After calling setInput, InputJobInfo can be retrieved from the job + * configuration as follows: {code} InputJobInfo inputInfo = (InputJobInfo) + * HCatUtil.deserialize( + * job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); {code} * * @param job the job object * @param theirInputJobInfo information on the Input to read * @throws Exception */ - public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception { - InputJobInfo inputJobInfo = InputJobInfo.create( - theirInputJobInfo.getDatabaseName(), - theirInputJobInfo.getTableName(), - theirInputJobInfo.getFilter()); + public static void setInput(Job job, InputJobInfo theirInputJobInfo) + throws Exception { + InputJobInfo inputJobInfo = InputJobInfo + .create(theirInputJobInfo.getDatabaseName(), + theirInputJobInfo.getTableName(), + theirInputJobInfo.getFilter()); inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties()); - job.getConfiguration().set( - HCatConstants.HCAT_KEY_JOB_INFO, - HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null))); + job.getConfiguration().set(HCatConstants.HCAT_KEY_JOB_INFO, + HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null))); } /** - * Returns the given InputJobInfo after populating with data queried from the metadata service. + * Returns the given InputJobInfo after populating with data queried from + * the metadata service. */ - private static InputJobInfo getInputJobInfo( - Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception { + private static InputJobInfo getInputJobInfo(Job job, + InputJobInfo inputJobInfo, String locationFilter) throws Exception { HiveMetaStoreClient client = null; HiveConf hiveConf = null; @@ -94,40 +96,48 @@ hiveConf = new HiveConf(HCatInputFormat.class); } client = HCatUtil.getHiveClient(hiveConf); - Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName()); + Table table = HCatUtil + .getTable(client, inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName()); List partInfoList = new ArrayList(); inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable())); if (table.getPartitionKeys().size() != 0) { - //Partitioned table - List parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName(), - inputJobInfo.getFilter(), - (short) -1); + // Partitioned table + List parts = client.listPartitionsByFilter( + inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName(), inputJobInfo.getFilter(), + (short) -1); - // Default to 100,000 partitions if hive.metastore.maxpartition is not defined - int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000); + // Default to 100,000 partitions if hive.metastore.maxpartition + // is not defined + int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", + 100000); if (parts != null && parts.size() > maxPart) { - throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size()); + throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, + "total number of partitions is " + parts.size()); } // populate partition info for (Partition ptn : parts) { - HCatSchema schema = HCatUtil.extractSchema( - new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn)); + HCatSchema schema = HCatUtil + .extractSchema(new org.apache.hadoop.hive.ql.metadata.Partition( + table, ptn)); PartInfo partInfo = extractPartInfo(schema, ptn.getSd(), - ptn.getParameters(), job.getConfiguration(), inputJobInfo); - partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); + ptn.getParameters(), job.getConfiguration(), + inputJobInfo); + partInfo.setPartitionValues(InternalUtil + .createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); } } else { - //Non partitioned table + // Non partitioned table HCatSchema schema = HCatUtil.extractSchema(table); - PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(), - table.getParameters(), job.getConfiguration(), inputJobInfo); + PartInfo partInfo = extractPartInfo(schema, table.getTTable() + .getSd(), table.getParameters(), + job.getConfiguration(), inputJobInfo); partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); } @@ -140,38 +150,19 @@ } - private static Map createPtnKeyValueMap(Table table, Partition ptn) throws IOException { - List values = ptn.getValues(); - if (values.size() != table.getPartitionKeys().size()) { - throw new IOException("Partition values in partition inconsistent with table definition, table " - + table.getTableName() + " has " - + table.getPartitionKeys().size() - + " partition keys, partition has " + values.size() + "partition values"); - } + private static PartInfo extractPartInfo(HCatSchema schema, + StorageDescriptor sd, Map parameters, + Configuration conf, InputJobInfo inputJobInfo) throws IOException { - Map ptnKeyValues = new HashMap(); - - int i = 0; - for (FieldSchema schema : table.getPartitionKeys()) { - // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues() - ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i)); - i++; - } - - return ptnKeyValues; - } - - private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd, - Map parameters, Configuration conf, - InputJobInfo inputJobInfo) throws IOException { - StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters); Properties hcatProperties = new Properties(); - HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo); + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, + storerInfo); // copy the properties from storageHandler to jobProperties - Map jobProperties = HCatUtil.getInputJobProperties(storageHandler, inputJobInfo); + Map jobProperties = HCatUtil.getInputJobProperties( + storageHandler, inputJobInfo); for (String key : parameters.keySet()) { hcatProperties.put(key, parameters.get(key)); @@ -179,7 +170,7 @@ // FIXME // Bloating partinfo with inputJobInfo is not good return new PartInfo(schema, storageHandler, sd.getLocation(), - hcatProperties, jobProperties, inputJobInfo.getTableInfo()); + hcatProperties, jobProperties, inputJobInfo.getTableInfo()); } } Index: src/java/org/apache/hcatalog/mapreduce/InternalUtil.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (revision 1392658) +++ src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (working copy) @@ -21,7 +21,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -48,6 +50,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -185,4 +188,30 @@ + " but found " + split.getClass().getName()); } } + + + static Map createPtnKeyValueMap(Table table, Partition ptn) + throws IOException { + List values = ptn.getValues(); + if (values.size() != table.getPartitionKeys().size()) { + throw new IOException( + "Partition values in partition inconsistent with table definition, table " + + table.getTableName() + " has " + + table.getPartitionKeys().size() + + " partition keys, partition has " + values.size() + + "partition values"); + } + + Map ptnKeyValues = new HashMap(); + + int i = 0; + for (FieldSchema schema : table.getPartitionKeys()) { + // CONCERN : the way this mapping goes, the order *needs* to be + // preserved for table.getPartitionKeys() and ptn.getValues() + ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i)); + i++; + } + + return ptnKeyValues; + } } Index: src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (revision 1392658) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (working copy) @@ -155,7 +155,7 @@ public void publishTest(Job job) throws Exception { OutputCommitter committer = new FileOutputCommitterContainer(job, null); - committer.cleanupJob(job); + committer.commitJob(job); Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1")); assertNotNull(part); Index: src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java (revision 0) +++ src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java (working copy) @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hcatalog.NoExitSecurityManager; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHCatPartitionPublish { + private static Configuration mrConf = null; + private static FileSystem fs = null; + private static MiniMRCluster mrCluster = null; + private static boolean isServerRunning = false; + private static final int msPort = 20101; + private static HiveConf hcatConf; + private static HiveMetaStoreClient msc; + private static SecurityManager securityManager; + + @BeforeClass + public static void setup() throws Exception { + Configuration conf = new Configuration(true); + fs = FileSystem.get(conf); + System.setProperty("hadoop.log.dir", new File(fs.getWorkingDirectory() + .toString(), "/logs").getAbsolutePath()); + // LocalJobRunner does not work with mapreduce OutputCommitter. So need + // to use MiniMRCluster. MAPREDUCE-2350 + mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null, + new JobConf(conf)); + mrConf = mrCluster.createJobConf(); + + if (isServerRunning) { + return; + } + + MetaStoreUtils.startMetaStore(msPort, ShimLoader + .getHadoopThriftAuthBridge()); + isServerRunning = true; + securityManager = System.getSecurityManager(); + System.setSecurityManager(new NoExitSecurityManager()); + + hcatConf = new HiveConf(TestHCatPartitionPublish.class); + hcatConf.set("hive.metastore.local", "false"); + hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + + msPort); + hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3); + + hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, + "false"); + msc = new HiveMetaStoreClient(hcatConf, null); + System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); + System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + } + + @AfterClass + public static void tearDown() throws IOException { + if (mrCluster != null) { + mrCluster.shutdown(); + } + System.setSecurityManager(securityManager); + isServerRunning = false; + } + + @Test + public void testPartitionPublish() throws Exception { + String dbName = "default"; + String tableName = "testHCatPartitionedTable"; + createTable(null, tableName); + + Map partitionMap = new HashMap(); + partitionMap.put("part1", "p1value1"); + partitionMap.put("part0", "p0value1"); + + ArrayList hcatTableColumns = new ArrayList(); + for (FieldSchema fs : getTableColumns()) { + hcatTableColumns.add(HCatSchemaUtils.getHCatFieldSchema(fs)); + } + + runMRCreateFail(dbName, tableName, partitionMap, hcatTableColumns); + List ptns = msc.listPartitionNames(dbName, tableName, + (short) 10); + Assert.assertEquals(0, ptns.size()); + Table table = msc.getTable(dbName, tableName); + Assert.assertTrue(table != null); + // Also make sure that the directory has been deleted in the table + // location. + Assert.assertFalse(fs.exists(new Path(table.getSd().getLocation() + + "/part1=p1value1/part0=p0value1"))); + } + + void runMRCreateFail( + String dbName, String tableName, Map partitionValues, + List columns) throws Exception { + + Job job = new Job(mrConf, "hcat mapreduce write fail test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(TestHCatPartitionPublish.MapFail.class); + + // input/output settings + job.setInputFormatClass(TextInputFormat.class); + + Path path = new Path(fs.getWorkingDirectory(), + "mapred/testHCatMapReduceInput"); + // The write count does not matter, as the map will fail in its first + // call. + createInputFile(path, 5); + + TextInputFormat.setInputPaths(job, path); + job.setOutputFormatClass(HCatOutputFormat.class); + OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, + partitionValues); + HCatOutputFormat.setOutput(job, outputJobInfo); + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(DefaultHCatRecord.class); + + job.setNumReduceTasks(0); + + HCatOutputFormat.setSchema(job, new HCatSchema(columns)); + + boolean success = job.waitForCompletion(true); + Assert.assertTrue(success == false); + } + + private void createInputFile(Path path, int rowCount) throws IOException { + if (fs.exists(path)) { + fs.delete(path, true); + } + FSDataOutputStream os = fs.create(path); + for (int i = 0; i < rowCount; i++) { + os.writeChars(i + "\n"); + } + os.close(); + } + + public static class MapFail extends + Mapper { + + @Override + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + { + throw new IOException("Exception to mimic job failure."); + } + } + } + + private void createTable(String dbName, String tableName) throws Exception { + String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME + : dbName; + try { + msc.dropTable(databaseName, tableName); + } catch (Exception e) { + } // can fail with NoSuchObjectException + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType("MANAGED_TABLE"); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(getTableColumns()); + tbl.setPartitionKeys(getPartitionKeys()); + tbl.setSd(sd); + sd.setBucketCols(new ArrayList(2)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters().put( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, + "1"); + sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName()); + sd.setInputFormat(RCFileInputFormat.class.getName()); + sd.setOutputFormat(RCFileOutputFormat.class.getName()); + + Map tableParams = new HashMap(); + tbl.setParameters(tableParams); + + msc.createTable(tbl); + } + + protected List getPartitionKeys() { + List fields = new ArrayList(); + // Defining partition names in unsorted order + fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + protected List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); + return fields; + } + +}