commit 5967b907b68184f44ed0685f3ff815db4649de79 Author: Satish Mittal Date: Thu Feb 6 13:33:32 2014 +0000 HIVE-6109: Support customized location for EXTERNAL tables created by Dynamic Partitioning diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java index 2ee50b3..3e91de0 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java @@ -121,6 +121,7 @@ private HCatConstants() { // restrict instantiation public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid"; public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false; + public static final String HCAT_DYNAMIC_CUSTOM_PATTERN = "hcat.dynamic.partitioning.custom.pattern"; // Message Bus related properties. public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat"; diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java index a5ae1be..0a7f9b0 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -69,10 +69,13 @@ private static final String TEMP_DIR_NAME = "_temporary"; private static final String LOGS_DIR_NAME = "_logs"; + /** The directory under which data is initially written for a partitioned table */ + static final String DYNTEMP_DIR_NAME = "_DYN"; private static final Logger LOG = LoggerFactory.getLogger(FileOutputCommitterContainer.class); private final boolean dynamicPartitioningUsed; private boolean partitionsDiscovered; + private final boolean customDynamicLocationUsed; private Map> partitionsDiscoveredByPath; private Map contextDiscoveredByPath; @@ -97,6 +100,14 @@ public FileOutputCommitterContainer(JobContext context, this.partitionsDiscovered = !dynamicPartitioningUsed; cachedStorageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + Table table = new Table(jobInfo.getTableInfo().getTable()); + if (dynamicPartitioningUsed && Boolean.valueOf((String)table.getProperty("EXTERNAL")) + && jobInfo.getCustomDynamicPath() != null + && jobInfo.getCustomDynamicPath().length() > 0) { + customDynamicLocationUsed = true; + } else { + customDynamicLocationUsed = false; + } } @Override @@ -164,8 +175,12 @@ public void abortJob(JobContext jobContext, State state) throws IOException { Path src; OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); if (dynamicPartitioningUsed) { - src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable() - .getPartitionKeysSize())); + if (!customDynamicLocationUsed) { + src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable() + .getPartitionKeysSize())); + } else { + src = new Path(getCustomPartitionRootLocation(jobInfo, jobContext.getConfiguration())); + } } else { src = new Path(jobInfo.getLocation()); } @@ -235,7 +250,26 @@ public void cleanupJob(JobContext context) throws IOException { throw new IOException("The method cleanupJob is deprecated and should not be called."); } + private String getCustomPartitionRootLocation(OutputJobInfo jobInfo, Configuration conf) { + if (ptnRootLocation == null) { + // we only need to calculate it once, it'll be the same for other partitions in this job. + String parentPath = jobInfo.getTableInfo().getTableLocation(); + if (jobInfo.getCustomDynamicRoot() != null + && jobInfo.getCustomDynamicRoot().length() > 0) { + parentPath = new Path(parentPath, jobInfo.getCustomDynamicRoot()).toString(); + } + Path ptnRoot = new Path(parentPath, DYNTEMP_DIR_NAME + + conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)); + ptnRootLocation = ptnRoot.toString(); + } + return ptnRootLocation; + } + private String getPartitionRootLocation(String ptnLocn, int numPtnKeys) { + if (customDynamicLocationUsed) { + return null; + } + if (ptnRootLocation == null) { // we only need to calculate it once, it'll be the same for other partitions in this job. Path ptnRoot = new Path(ptnLocn); @@ -255,6 +289,7 @@ private String getPartitionRootLocation(String ptnLocn, int numPtnKeys) { * @param jobInfo The OutputJobInfo. * @param partLocnRoot The table-equivalent location root of the partition * (temporary dir if dynamic partition, table dir if static) + * @param dynPartPath The path of dynamic partition which is created * @param partKVs The keyvalue pairs that form the partition * @param outputSchema The output schema for the partition * @param params The parameters to store inside the partition @@ -268,7 +303,7 @@ private String getPartitionRootLocation(String ptnLocn, int numPtnKeys) { private Partition constructPartition( JobContext context, OutputJobInfo jobInfo, - String partLocnRoot, Map partKVs, + String partLocnRoot, String dynPartPath, Map partKVs, HCatSchema outputSchema, Map params, Table table, FileSystem fs, String grpName, FsPermission perms) throws IOException { @@ -292,7 +327,10 @@ private Partition constructPartition( // Sets permissions and group name on partition dirs and files. Path partPath; - if (Boolean.valueOf((String)table.getProperty("EXTERNAL")) + if (customDynamicLocationUsed) { + partPath = new Path(dynPartPath); + } else if (!dynamicPartitioningUsed + && Boolean.valueOf((String)table.getProperty("EXTERNAL")) && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) { // honor external table that specifies the location partPath = new Path(jobInfo.getLocation()); @@ -315,7 +353,7 @@ private Partition constructPartition( // Set the location in the StorageDescriptor if (dynamicPartitioningUsed) { - String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs); + String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs, jobInfo); if (harProcessor.isEnabled()) { harProcessor.exec(context, partition, partPath); partition.getSd().setLocation( @@ -344,14 +382,25 @@ private void applyGroupAndPerms(FileSystem fs, Path dir, FsPermission permission } } - private String getFinalDynamicPartitionDestination(Table table, Map partKVs) { - // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA -> - // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA + private String getFinalDynamicPartitionDestination(Table table, Map partKVs, + OutputJobInfo jobInfo) { Path partPath = new Path(table.getTTable().getSd().getLocation()); - for (FieldSchema partKey : table.getPartitionKeys()) { - partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); + if (!customDynamicLocationUsed) { + // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA -> + // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA + for (FieldSchema partKey : table.getPartitionKeys()) { + partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); + } + + return partPath.toString(); + } else { + // if custom root specified, update the parent path + if (jobInfo.getCustomDynamicRoot() != null + && jobInfo.getCustomDynamicRoot().length() > 0) { + partPath = new Path(partPath, jobInfo.getCustomDynamicRoot()); + } + return new Path(partPath, HCatFileUtil.resolveCustomPath(jobInfo, partKVs, false)).toString(); } - return partPath.toString(); } private Map getStorerParameterMap(StorerInfo storer) { @@ -480,8 +529,11 @@ private void moveTaskOutputs(FileSystem fs, if (LOG.isDebugEnabled()) { LOG.debug("Moving directory: " + file + " to " + parentDir); } - if (!fs.rename(file, parentDir)) { - final String msg = "Failed to move file: " + file + " to " + parentDir; + + // If custom dynamic location provided, need to rename to final output path + Path dstPath = !customDynamicLocationUsed ? parentDir : finalOutputPath; + if (!fs.rename(file, dstPath)) { + final String msg = "Failed to move file: " + file + " to " + dstPath; LOG.error(msg); throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg); } @@ -576,7 +628,12 @@ private void discoverPartitions(JobContext context) throws IOException { for (FileStatus st : status) { LinkedHashMap fullPartSpec = new LinkedHashMap(); - Warehouse.makeSpecFromName(fullPartSpec, st.getPath()); + if (!customDynamicLocationUsed) { + Warehouse.makeSpecFromName(fullPartSpec, st.getPath()); + } else { + HCatFileUtil.getPartKeyValuesForCustomLocation(fullPartSpec, jobInfo, + st.getPath().toString()); + } partitionsDiscoveredByPath.put(st.getPath().toString(), fullPartSpec); JobConf jobConf = (JobConf)context.getConfiguration(); JobContext currContext = HCatMapRedUtil.createJobContext( @@ -636,7 +693,7 @@ private void registerPartitions(JobContext context) throws IOException{ partitionsToAdd.add( constructPartition( context,jobInfo, - tblPath.toString(), jobInfo.getPartitionValues() + tblPath.toString(), null, jobInfo.getPartitionValues() ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) ,table, fs ,grpName,perms)); @@ -645,7 +702,8 @@ private void registerPartitions(JobContext context) throws IOException{ partitionsToAdd.add( constructPartition( context,jobInfo, - getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue() + getPartitionRootLocation(entry.getKey(),entry.getValue().size()) + ,entry.getKey(), entry.getValue() ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) ,table, fs ,grpName,perms)); @@ -659,13 +717,16 @@ private void registerPartitions(JobContext context) throws IOException{ //Publish the new partition(s) if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){ - - Path src = new Path(ptnRootLocation); - // check here for each dir we're copying out, to see if it - // already exists, error out if so - moveTaskOutputs(fs, src, src, tblPath, true); - moveTaskOutputs(fs, src, src, tblPath, false); - fs.delete(src, true); + if (!customDynamicLocationUsed) { + Path src = new Path(ptnRootLocation); + // check here for each dir we're copying out, to see if it + // already exists, error out if so + moveTaskOutputs(fs, src, src, tblPath, true); + moveTaskOutputs(fs, src, src, tblPath, false); + fs.delete(src, true); + } else { + moveCustomLocationTaskOutputs(fs, table, hiveConf); + } try { updateTableSchema(client, table, jobInfo.getOutputSchema()); LOG.info("HAR is being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos); @@ -687,10 +748,14 @@ private void registerPartitions(JobContext context) throws IOException{ updateTableSchema(client, table, jobInfo.getOutputSchema()); LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos); if (dynamicPartitioningUsed && (partitionsToAdd.size()>0)){ - Path src = new Path(ptnRootLocation); - moveTaskOutputs(fs, src, src, tblPath, true); - moveTaskOutputs(fs, src, src, tblPath, false); - fs.delete(src, true); + if (!customDynamicLocationUsed) { + Path src = new Path(ptnRootLocation); + moveTaskOutputs(fs, src, src, tblPath, true); + moveTaskOutputs(fs, src, src, tblPath, false); + fs.delete(src, true); + } else { + moveCustomLocationTaskOutputs(fs, table, hiveConf); + } } client.add_partitions(partitionsToAdd); partitionsAdded = partitionsToAdd; @@ -720,6 +785,24 @@ private void registerPartitions(JobContext context) throws IOException{ } } + private void moveCustomLocationTaskOutputs(FileSystem fs, Table table, Configuration conf) + throws IOException { + // in case of custom dynamic partitions, we can't just move the sub-tree of partition root + // directory since the partitions location contain regex pattern. We need to first find the + // final destination of each partition and move its output. + for (Entry> entry : partitionsDiscoveredByPath.entrySet()) { + Path src = new Path(entry.getKey()); + Path destPath = new Path(getFinalDynamicPartitionDestination(table, entry.getValue(), jobInfo)); + moveTaskOutputs(fs, src, src, destPath, true); + moveTaskOutputs(fs, src, src, destPath, false); + } + // delete the parent temp directory of all custom dynamic partitions + Path parentPath = new Path(getCustomPartitionRootLocation(jobInfo, conf)); + if (fs.exists(parentPath)) { + fs.delete(parentPath, true); + } + } + private void cancelDelegationTokens(JobContext context) throws IOException{ LOG.info("Cancelling deletgation token for the job."); HiveMetaStoreClient client = null; diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 288b7a3..1407c8c 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -49,9 +49,6 @@ public class FosterStorageHandler extends DefaultStorageHandler { public Configuration conf; - /** The directory under which data is initially written for a partitioned table */ - protected static final String DYNTEMP_DIR_NAME = "_DYN"; - /** The directory under which data is initially written for a non partitioned table */ protected static final String TEMP_DIR_NAME = "_TEMP"; @@ -118,17 +115,28 @@ public void configureOutputJobProperties(TableDesc tableDesc, // For dynamic partitioned writes without all keyvalues specified, // we create a temp dir for the associated write job if (dynHash != null) { - parentPath = new Path(parentPath, - DYNTEMP_DIR_NAME + dynHash).toString(); + // if external table and custom root specified, update the parent path + if (Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL")) + && jobInfo.getCustomDynamicRoot() != null + && jobInfo.getCustomDynamicRoot().length() > 0) { + parentPath = new Path(parentPath, jobInfo.getCustomDynamicRoot()).toString(); + } + parentPath = new Path(parentPath, FileOutputCommitterContainer.DYNTEMP_DIR_NAME + dynHash).toString(); } String outputLocation; - if ((dynHash == null) + if ((dynHash != null) + && Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL")) + && jobInfo.getCustomDynamicPath() != null + && jobInfo.getCustomDynamicPath().length() > 0) { + // dynamic partitioning with custom path; resolve the custom path + // using partition column values + outputLocation = HCatFileUtil.resolveCustomPath(jobInfo, null, true); + } else if ((dynHash == null) && Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL")) && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) { // honor custom location for external table apart from what metadata specifies - // only if we're not using dynamic partitioning - see HIVE-5011 outputLocation = jobInfo.getLocation(); } else if (dynHash == null && jobInfo.getPartitionValues().size() == 0) { // For non-partitioned tables, we send them to the temp dir diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java new file mode 100644 index 0000000..875f036 --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java @@ -0,0 +1,113 @@ +package org.apache.hive.hcatalog.mapreduce; + +import java.net.URI; +import java.util.HashSet; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.Path; + +public class HCatFileUtil { + + // regex of the form: ${column name}. Following characters are not allowed in column name: + // whitespace characters, /, {, }, \ + private static final Pattern customPathPattern = Pattern.compile("(\\$\\{)([^\\s/\\{\\}\\\\]+)(\\})"); + + // This method parses the custom dynamic path and replaces each occurrence + // of column name within regex pattern with its corresponding value, if provided + public static String resolveCustomPath(OutputJobInfo jobInfo, + Map dynPartKVs, boolean createRegexPath) { + // get custom path string + String customPath = jobInfo.getCustomDynamicPath(); + // create matcher for custom path + Matcher matcher = customPathPattern.matcher(customPath); + // get the set of all partition columns in custom path + HashSet partColumns = new HashSet(); + Map partKVs = dynPartKVs != null ? dynPartKVs : + jobInfo.getPartitionValues(); + + // build the final custom path string by replacing each column name with + // its value, if provided + StringBuilder sb = new StringBuilder(); + int previousEndIndex = 0; + while (matcher.find()) { + // append the path substring since previous match + sb.append(customPath.substring(previousEndIndex, matcher.start())); + if (createRegexPath) { + // append the first group within pattern: "${" + sb.append(matcher.group(1)); + } + + // column name is the second group from current match + String columnName = matcher.group(2).toLowerCase(); + partColumns.add(columnName); + + // find the value of matched column + String columnValue = partKVs.get(columnName); + // if column value is provided, replace column name with value + if (columnValue != null) { + sb.append(columnValue); + } else { + sb.append("__HIVE_DEFAULT_PARTITION__"); + } + + if (createRegexPath) { + // append the third group within pattern: "}" + sb.append(matcher.group(3)); + } + + // update startIndex + previousEndIndex = matcher.end(); + } + + // append the trailing path string, if any + if (previousEndIndex < customPath.length()) { + sb.append(customPath.substring(previousEndIndex, customPath.length())); + } + + // validate that the set of partition columns found in custom path must match + // the set of dynamic partitions + if (partColumns.size() != jobInfo.getDynamicPartitioningKeys().size()) { + throw new IllegalArgumentException("Unable to configure custom dynamic location, " + + " mismatch between number of dynamic partition columns obtained[" + partColumns.size() + + "] and number of dynamic partition columns required[" + + jobInfo.getDynamicPartitioningKeys().size() + "]"); + } + + return sb.toString(); + } + + public static void getPartKeyValuesForCustomLocation(Map partSpec, + OutputJobInfo jobInfo, String partitionPath) { + // create matchers for custom path string as well as actual dynamic partition path created + Matcher customPathMatcher = customPathPattern.matcher(jobInfo.getCustomDynamicPath()); + Matcher dynamicPathMatcher = customPathPattern.matcher(partitionPath); + + while (customPathMatcher.find() && dynamicPathMatcher.find()) { + // get column name from custom path matcher and column value from dynamic path matcher + partSpec.put(customPathMatcher.group(2), dynamicPathMatcher.group(2)); + } + + // add any partition key values provided as part of job info + partSpec.putAll(jobInfo.getPartitionValues()); + } + + public static void setCustomPath(String customPathFormat, OutputJobInfo jobInfo) { + // find the root of all custom paths from custom pattern. The root is the + // largest prefix in input pattern string that doesn't match customPathPattern + Path customPath = new Path(customPathFormat); + URI customURI = customPath.toUri(); + while (customPath != null && !customPath.toString().isEmpty()) { + Matcher m = customPathPattern.matcher(customPath.toString()); + if (!m.find()) { + break; + } + customPath = customPath.getParent(); + } + + URI rootURI = customPath.toUri(); + URI childURI = rootURI.relativize(customURI); + jobInfo.setCustomDynamicLocation(rootURI.getPath(), childURI.getPath()); + } +} diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java index 78e77e8..2e1b0fb 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java @@ -159,6 +159,11 @@ public static void setOutput(Configuration conf, Credentials credentials, } conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash); + // if custom pattern is set in case of dynamic partitioning, configure custom path + String customPattern = conf.get(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN); + if (customPattern != null) { + HCatFileUtil.setCustomPath(customPattern, outputJobInfo); + } } outputJobInfo.setPartitionValues(valueMap); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java index b63bdc2..ee32066 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java @@ -49,6 +49,12 @@ /** The location of the partition being written */ private String location; + + /** The root location of custom dynamic partitions being written */ + private String customDynamicRoot; + + /** The relative path of custom dynamic partitions being written */ + private String customDynamicPath; /** The partition values to publish to, if used for output*/ private Map partitionValues; @@ -165,6 +171,28 @@ public String getLocation() { public void setLocation(String location) { this.location = location; } + + /** + * @param customDynamicLocation the custom location for dynamic partitions + */ + void setCustomDynamicLocation(String customDynamicRoot, String customDynamicPath) { + this.customDynamicRoot = customDynamicRoot; + this.customDynamicPath = customDynamicPath; + } + + /** + * @return the root location for custom dynamic partitions + */ + String getCustomDynamicRoot() { + return customDynamicRoot; + } + + /** + * @return the relative path custom location for dynamic partitions + */ + String getCustomDynamicPath() { + return customDynamicPath; + } /** * Sets the value of partitionValues diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java index 77bdb9d..d31c37b 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java @@ -84,6 +84,7 @@ protected abstract List getTableColumns(); private static FileSystem fs; + private String externalTableLocation = null; protected Boolean isTableExternal() { return false; @@ -123,6 +124,12 @@ public void deleteTable() throws Exception { String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; client.dropTable(databaseName, tableName); + // in case of external table, drop the table contents as well + if (isTableExternal() && (externalTableLocation != null)) { + if (fs.exists(new Path(externalTableLocation))) { + fs.delete(new Path(externalTableLocation), true); + } + } } catch (Exception e) { e.printStackTrace(); throw e; @@ -167,6 +174,9 @@ public void createTable() throws Exception { sd.setOutputFormat(outputFormat()); Map tableParams = new HashMap(); + if (isTableExternal()) { + tableParams.put("EXTERNAL", "TRUE"); + } tbl.setParameters(tableParams); client.createTable(tbl); @@ -234,7 +244,8 @@ public void map(WritableComparable key, HCatRecord value, Context context Job runMRCreate(Map partitionValues, List partitionColumns, List records, int writeCount, boolean assertWrite) throws Exception { - return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, true); + return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, + true, null); } /** @@ -250,7 +261,8 @@ Job runMRCreate(Map partitionValues, */ Job runMRCreate(Map partitionValues, List partitionColumns, List records, - int writeCount, boolean assertWrite, boolean asSingleMapTask) throws Exception { + int writeCount, boolean assertWrite, boolean asSingleMapTask, + String customDynamicPathPattern) throws Exception { writeRecords = records; MapCreate.writeCount = 0; @@ -283,6 +295,9 @@ Job runMRCreate(Map partitionValues, job.setOutputFormatClass(HCatOutputFormat.class); OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues); + if (customDynamicPathPattern != null) { + job.getConfiguration().set(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN, customDynamicPathPattern); + } HCatOutputFormat.setOutput(job, outputJobInfo); job.setMapOutputKeyClass(BytesWritable.class); @@ -313,6 +328,10 @@ Job runMRCreate(Map partitionValues, Assert.assertEquals(writeCount, MapCreate.writeCount); } + if (isTableExternal()) { + externalTableLocation = outputJobInfo.getTableInfo().getTableLocation(); + } + return job; } diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java index d8b69c2..5defc94 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java @@ -101,7 +101,7 @@ protected static void generateWriteRecords(int max, int mod, int offset) { */ @Test public void testHCatDynamicPartitionedTable() throws Exception { - runHCatDynamicPartitionedTable(true); + runHCatDynamicPartitionedTable(true, null); } /** @@ -110,12 +110,13 @@ public void testHCatDynamicPartitionedTable() throws Exception { */ @Test public void testHCatDynamicPartitionedTableMultipleTask() throws Exception { - runHCatDynamicPartitionedTable(false); + runHCatDynamicPartitionedTable(false, null); } - protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Exception { + protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask, + String customDynamicPathPattern) throws Exception { generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); - runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask); + runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask, customDynamicPathPattern); runMRRead(NUM_RECORDS); @@ -142,7 +143,8 @@ protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Ex IOException exc = null; try { generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); - Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false); + Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false, + true, customDynamicPathPattern); if (HCatUtil.isHadoop23()) { Assert.assertTrue(job.isSuccessful()==false); diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java index 36c7945..0838765 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java @@ -19,6 +19,9 @@ package org.apache.hive.hcatalog.mapreduce; +import org.junit.BeforeClass; +import org.junit.Test; + public class TestHCatExternalDynamicPartitioned extends TestHCatDynamicPartitioned { @Override @@ -26,4 +29,20 @@ protected Boolean isTableExternal() { return true; } + @BeforeClass + public static void generateInputData() throws Exception { + tableName = "testHCatExternalDynamicPartitionedTable"; + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + generateDataColumns(); + } + + /** + * Run the external dynamic partitioning test but with single map task + * @throws Exception + */ + @Test + public void testHCatExternalDynamicCustomLocation() throws Exception { + runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}"); + } + }