Index: src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (revision 1343946) +++ src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (working copy) @@ -21,10 +21,12 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.net.URI; import java.util.Random; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -118,7 +120,26 @@ String inputFile = createInputFile(fileContent); FileInputFormat.setInputPaths(job, new Path(inputFile)); + //Test for merging of configs + DistributedCache.addFileToClassPath(new Path(inputFile), job.getConfiguration(), fs); + String dummyFile = createInputFile("dummy file"); + DistributedCache.addFileToClassPath(new Path(dummyFile), configurer.getJob("out1") + .getConfiguration(), fs); + // duplicate of the value. Merging should remove duplicates + DistributedCache.addFileToClassPath(new Path(inputFile), configurer.getJob("out2") + .getConfiguration(), fs); + configurer.configure(); + + // Verify if the configs are merged + Path[] fileClassPaths = DistributedCache.getFileClassPaths(job.getConfiguration()); + Assert.assertArrayEquals(new Path[] {new Path(inputFile), new Path(dummyFile)}, + fileClassPaths); + URI[] expectedCacheFiles = new URI[] {new Path(inputFile).makeQualified(fs).toUri(), + new Path(dummyFile).makeQualified(fs).toUri()}; + URI[] cacheFiles = DistributedCache.getCacheFiles(job.getConfiguration()); + Assert.assertArrayEquals(expectedCacheFiles, cacheFiles); + Assert.assertTrue(job.waitForCompletion(true)); Path textOutPath = new Path(outDir, "out1/part-m-00000"); Index: src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (revision 1343946) +++ src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (working copy) @@ -22,14 +22,18 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; +import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -43,7 +47,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The MultiOutputFormat class simplifies writing output data to multiple @@ -128,20 +133,26 @@ */ public class MultiOutputFormat extends OutputFormat { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiOutputFormat.class.getName()); private static final String MO_ALIASES = "mapreduce.multiout.aliases"; private static final String MO_ALIAS = "mapreduce.multiout.alias"; private static final String CONF_KEY_DELIM = "%%"; private static final String CONF_VALUE_DELIM = ";;"; private static final String COMMA_DELIM = ","; private static final List configsToOverride = new ArrayList(); - private static final List configsToMerge = new ArrayList(); + private static final Map configsToMerge = new HashMap(); static { configsToOverride.add("mapred.output.dir"); - configsToMerge.add(JobContext.JOB_NAMENODES); - configsToMerge.add("tmpfiles"); - configsToMerge.add("tmpjars"); - configsToMerge.add("tmparchives"); + configsToOverride.add(DistributedCache.CACHE_SYMLINK); + configsToMerge.put(JobContext.JOB_NAMENODES, COMMA_DELIM); + configsToMerge.put("tmpfiles", COMMA_DELIM); + configsToMerge.put("tmpjars", COMMA_DELIM); + configsToMerge.put("tmparchives", COMMA_DELIM); + configsToMerge.put(DistributedCache.CACHE_ARCHIVES, COMMA_DELIM); + configsToMerge.put(DistributedCache.CACHE_FILES, COMMA_DELIM); + configsToMerge.put("mapred.job.classpath.archives", System.getProperty("path.separator")); + configsToMerge.put("mapred.job.classpath.files", System.getProperty("path.separator")); } /** @@ -204,6 +215,7 @@ @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { for (String alias : getOutputFormatAliases(context)) { + LOGGER.debug("Calling checkOutputSpecs for alias: " + alias); JobContext aliasContext = getJobContext(alias, context); OutputFormat outputFormat = getOutputFormatInstance(aliasContext); outputFormat.checkOutputSpecs(aliasContext); @@ -264,8 +276,8 @@ String value = conf.getValue(); String jobValue = userConf.getRaw(key); if (jobValue == null || !jobValue.equals(value)) { - if (configsToMerge.contains(key)) { - String mergedValue = getMergedConfValue(jobValue, value); + if (configsToMerge.containsKey(key)) { + String mergedValue = getMergedConfValue(jobValue, value, configsToMerge.get(key)); userConf.set(key, mergedValue); } else { if (configsToOverride.contains(key)) { @@ -280,18 +292,18 @@ userConf.set(getAliasConfName(alias), builder.toString()); } - private static String getMergedConfValue(String originalValues, String newValues) { + private static String getMergedConfValue(String originalValues, String newValues, String separator) { if (originalValues == null) { return newValues; } - Set mergedValues = new HashSet(); - mergedValues.addAll(StringUtils.getStringCollection(originalValues)); - mergedValues.addAll(StringUtils.getStringCollection(newValues)); + Set mergedValues = new LinkedHashSet(); + mergedValues.addAll(Arrays.asList(StringUtils.split(originalValues, separator))); + mergedValues.addAll(Arrays.asList(StringUtils.split(newValues, separator))); StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2); for (String value : mergedValues) { - builder.append(value).append(COMMA_DELIM); + builder.append(value).append(separator); } - return builder.substring(0, builder.length() - COMMA_DELIM.length()); + return builder.substring(0, builder.length() - separator.length()); } private static String getAliasConfName(String alias) { @@ -422,6 +434,7 @@ baseRecordWriters = new LinkedHashMap(); String[] aliases = getOutputFormatAliases(context); for (String alias : aliases) { + LOGGER.info("Creating record writer for alias: " + alias); TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context); Configuration aliasConf = aliasContext.getConfiguration(); // Create output directory if not already created. @@ -455,7 +468,9 @@ @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - for (BaseRecordWriterContainer baseRWContainer : baseRecordWriters.values()) { + for (Entry entry : baseRecordWriters.entrySet()) { + BaseRecordWriterContainer baseRWContainer = entry.getValue(); + LOGGER.info("Closing record writer for alias: " + entry.getKey()); baseRWContainer.getRecordWriter().close(baseRWContainer.getContext()); } } @@ -490,6 +505,7 @@ outputCommitters = new LinkedHashMap(); String[] aliases = getOutputFormatAliases(context); for (String alias : aliases) { + LOGGER.info("Creating output committer for alias: " + alias); TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context); OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext) .getOutputCommitter(aliasContext); @@ -501,6 +517,7 @@ @Override public void setupJob(JobContext jobContext) throws IOException { for (String alias : outputCommitters.keySet()) { + LOGGER.info("Calling setupJob for alias: " + alias); BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); outputContainer.getBaseCommitter().setupJob(outputContainer.getContext()); } @@ -509,6 +526,7 @@ @Override public void setupTask(TaskAttemptContext taskContext) throws IOException { for (String alias : outputCommitters.keySet()) { + LOGGER.info("Calling setupTask for alias: " + alias); BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); outputContainer.getBaseCommitter().setupTask(outputContainer.getContext()); } @@ -533,6 +551,7 @@ OutputCommitter baseCommitter = outputContainer.getBaseCommitter(); TaskAttemptContext committerContext = outputContainer.getContext(); if (baseCommitter.needsTaskCommit(committerContext)) { + LOGGER.info("Calling commitTask for alias: " + alias); baseCommitter.commitTask(committerContext); } } @@ -541,6 +560,7 @@ @Override public void abortTask(TaskAttemptContext taskContext) throws IOException { for (String alias : outputCommitters.keySet()) { + LOGGER.info("Calling abortTask for alias: " + alias); BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); outputContainer.getBaseCommitter().abortTask(outputContainer.getContext()); } @@ -549,6 +569,7 @@ @Override public void commitJob(JobContext jobContext) throws IOException { for (String alias : outputCommitters.keySet()) { + LOGGER.info("Calling commitJob for alias: " + alias); BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); outputContainer.getBaseCommitter().commitJob(outputContainer.getContext()); } @@ -557,6 +578,7 @@ @Override public void abortJob(JobContext jobContext, State state) throws IOException { for (String alias : outputCommitters.keySet()) { + LOGGER.info("Calling abortJob for alias: " + alias); BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state); }