Index: src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (revision 1341205) +++ src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (working copy) @@ -21,10 +21,12 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.net.URI; import java.util.Random; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -118,7 +120,26 @@ String inputFile = createInputFile(fileContent); FileInputFormat.setInputPaths(job, new Path(inputFile)); + //Test for merging of configs + DistributedCache.addFileToClassPath(new Path(inputFile), job.getConfiguration(), fs); + String dummyFile = createInputFile("dummy file"); + DistributedCache.addFileToClassPath(new Path(dummyFile), configurer.getJob("out1") + .getConfiguration(), fs); + // duplicate of the value. Merging should remove duplicates + DistributedCache.addFileToClassPath(new Path(inputFile), configurer.getJob("out2") + .getConfiguration(), fs); + configurer.configure(); + + // Verify if the configs are merged + Path[] fileClassPaths = DistributedCache.getFileClassPaths(job.getConfiguration()); + Assert.assertArrayEquals(new Path[] {new Path(inputFile), new Path(dummyFile)}, + fileClassPaths); + URI[] expectedCacheFiles = new URI[] {new Path(inputFile).makeQualified(fs).toUri(), + new Path(dummyFile).makeQualified(fs).toUri()}; + URI[] cacheFiles = DistributedCache.getCacheFiles(job.getConfiguration()); + Assert.assertArrayEquals(expectedCacheFiles, cacheFiles); + Assert.assertTrue(job.waitForCompletion(true)); Path textOutPath = new Path(outDir, "out1/part-m-00000"); Index: src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (revision 1341205) +++ src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (working copy) @@ -22,14 +22,18 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; +import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -43,7 +47,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; /** * The MultiOutputFormat class simplifies writing output data to multiple @@ -134,14 +137,19 @@ private static final String CONF_VALUE_DELIM = ";;"; private static final String COMMA_DELIM = ","; private static final List configsToOverride = new ArrayList(); - private static final List configsToMerge = new ArrayList(); + private static final Map configsToMerge = new HashMap(); static { configsToOverride.add("mapred.output.dir"); - configsToMerge.add(JobContext.JOB_NAMENODES); - configsToMerge.add("tmpfiles"); - configsToMerge.add("tmpjars"); - configsToMerge.add("tmparchives"); + configsToOverride.add(DistributedCache.CACHE_SYMLINK); + configsToMerge.put(JobContext.JOB_NAMENODES, COMMA_DELIM); + configsToMerge.put("tmpfiles", COMMA_DELIM); + configsToMerge.put("tmpjars", COMMA_DELIM); + configsToMerge.put("tmparchives", COMMA_DELIM); + configsToMerge.put(DistributedCache.CACHE_ARCHIVES, COMMA_DELIM); + configsToMerge.put(DistributedCache.CACHE_FILES, COMMA_DELIM); + configsToMerge.put("mapred.job.classpath.archives", System.getProperty("path.separator")); + configsToMerge.put("mapred.job.classpath.files", System.getProperty("path.separator")); } /** @@ -264,8 +272,8 @@ String value = conf.getValue(); String jobValue = userConf.getRaw(key); if (jobValue == null || !jobValue.equals(value)) { - if (configsToMerge.contains(key)) { - String mergedValue = getMergedConfValue(jobValue, value); + if (configsToMerge.containsKey(key)) { + String mergedValue = getMergedConfValue(jobValue, value, configsToMerge.get(key)); userConf.set(key, mergedValue); } else { if (configsToOverride.contains(key)) { @@ -280,18 +288,18 @@ userConf.set(getAliasConfName(alias), builder.toString()); } - private static String getMergedConfValue(String originalValues, String newValues) { + private static String getMergedConfValue(String originalValues, String newValues, String separator) { if (originalValues == null) { return newValues; } - Set mergedValues = new HashSet(); - mergedValues.addAll(StringUtils.getStringCollection(originalValues)); - mergedValues.addAll(StringUtils.getStringCollection(newValues)); + Set mergedValues = new LinkedHashSet(); + mergedValues.addAll(Arrays.asList(StringUtils.split(originalValues, separator))); + mergedValues.addAll(Arrays.asList(StringUtils.split(newValues, separator))); StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2); for (String value : mergedValues) { - builder.append(value).append(COMMA_DELIM); + builder.append(value).append(separator); } - return builder.substring(0, builder.length() - COMMA_DELIM.length()); + return builder.substring(0, builder.length() - separator.length()); } private static String getAliasConfName(String alias) {