Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1470810) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -325,10 +325,21 @@ // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); - job = partition(job, maxTasks); + InputSplit[] splits = job.getInputFormat().getSplits( + job, + (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() + : maxTasks); + + job = partition(job, splits, maxTasks); maxTasks = job.getInt("hama.partition.count", maxTasks); - job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks)); + if (job.getBoolean("input.has.partitioned", false)) { + splits = job.getInputFormat().getSplits( + job, + (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() + : maxTasks); + } + job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks)); job.set("bsp.job.split.file", submitSplitFile.toString()); } @@ -369,15 +380,11 @@ return launchJob(jobId, job, submitJobFile, fs); } - protected BSPJob partition(BSPJob job, int maxTasks) throws IOException { + protected BSPJob partition(BSPJob job, InputSplit[] splits, int maxTasks) + throws IOException { String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR); - Path inputDir = new Path(inputPath); - if (fs.isFile(inputDir)) { - inputDir = inputDir.getParent(); - } - Path partitionDir = new Path(inputDir + "/partitions"); - + Path partitionDir = new Path("/tmp/hama-parts/" + job.getJobID() + "/"); if (fs.exists(partitionDir)) { fs.delete(partitionDir, true); } @@ -386,11 +393,6 @@ return job; }// Early exit for the partitioner job. - InputSplit[] splits = job.getInputFormat().getSplits( - job, - (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() - : maxTasks); - if (inputPath != null) { int numSplits = splits.length; int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0); @@ -427,8 +429,7 @@ job.get(Constants.RUNTIME_PARTITIONING_CLASS)); } BSPJob partitioningJob = new BSPJob(conf); - partitioningJob.setInputPath(new Path(job.getConfiguration().get( - Constants.JOB_INPUT_DIR))); + partitioningJob.setInputPath(new Path(inputPath)); partitioningJob.setInputFormat(job.getInputFormat().getClass()); partitioningJob.setInputKeyClass(job.getInputKeyClass()); partitioningJob.setInputValueClass(job.getInputValueClass()); @@ -439,6 +440,7 @@ partitioningJob.set("bsp.partitioning.runner.job", "true"); partitioningJob.getConfiguration().setBoolean( Constants.ENABLE_RUNTIME_PARTITIONING, false); + partitioningJob.setOutputPath(partitionDir); boolean isPartitioned = false; try { @@ -453,8 +455,9 @@ job.setInputPath(new Path(conf .get(Constants.RUNTIME_PARTITIONING_DIR))); } else { - job.setInputPath(new Path(inputDir + "/partitions")); + job.setInputPath(partitionDir); } + job.setBoolean("input.has.partitioned", true); job.setInputFormat(SequenceFileInputFormat.class); } else { LOG.error("Error partitioning the input path."); @@ -543,13 +546,8 @@ return codecClass; } - private static int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks) - throws IOException { - InputSplit[] splits = job.getInputFormat().getSplits( - job, - (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() - : maxTasks); - + private static int writeSplits(BSPJob job, InputSplit[] splits, + Path submitSplitFile, int maxTasks) throws IOException { final DataOutputStream out = writeSplitsFileHeader(job.getConfiguration(), submitSplitFile, splits.length); try { Index: core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (revision 1470810) +++ core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (working copy) @@ -57,18 +57,13 @@ this.fs = FileSystem.get(conf); - Path inputDir = new Path(conf.get(Constants.JOB_INPUT_DIR)); - if (fs.isFile(inputDir)) { - inputDir = inputDir.getParent(); - } - converter = ReflectionUtils.newInstance(conf.getClass( Constants.RUNTIME_PARTITION_RECORDCONVERTER, DefaultRecordConverter.class, RecordConverter.class), conf); converter.setup(conf); if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) { - this.partitionDir = new Path(inputDir + "/partitions"); + this.partitionDir = new Path(conf.get("bsp.output.dir")); } else { this.partitionDir = new Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR)); } Index: core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java (revision 1470810) +++ core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java (working copy) @@ -17,8 +17,13 @@ */ package org.apache.hama.bsp; +import java.io.IOException; + import junit.framework.TestCase; +import org.apache.hadoop.fs.Path; +import org.apache.hama.HamaConfiguration; + public class TestFileInputFormat extends TestCase { public void testComputeGoalSize() throws Exception { @@ -28,4 +33,24 @@ && 1200 > input.computeGoalSize(10, 10000)); } + + public void testSetInputPaths() throws IOException { + HamaConfiguration conf = new HamaConfiguration(); + BSPJob job = new BSPJob(conf); + + String[] files = new String[2]; + files[0] = "hdfs://hadoop.uta.edu/user/hadoop/employee.txt"; + files[1] = "hdfs://hadoop.uta.edu/user/hadoop/department.txt"; + + FileInputFormat.setInputPaths(job, files[0] + "," + files[1]); + Path[] paths = FileInputFormat.getInputPaths(job); + + System.out.println(job.getConfiguration().get("bsp.input.dir")); + assertEquals(2, FileInputFormat.getInputPaths(job).length); + + for (int i = 0; i < paths.length; i++) { + System.out.println(paths[i]); + assertEquals(paths[i].toString(), files[i]); + } + } } Index: core/src/test/java/org/apache/hama/bsp/TestPartitioning.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (revision 1470810) +++ core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (working copy) @@ -19,8 +19,6 @@ import java.io.IOException; -import junit.framework.TestCase; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -30,19 +28,54 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.Constants; +import org.apache.hama.HamaCluster; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.message.queue.DiskQueue; import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.util.KeyValuePair; -public class TestPartitioning extends TestCase { +public class TestPartitioning extends HamaCluster { public static final Log LOG = LogFactory.getLog(TestPartitioning.class); + public static String TMP_OUTPUT = "/tmp/test-example/"; + public static final String TMP_OUTPUT_PATH = "/tmp/test-example/output.txt"; + public static Path OUTPUT_PATH = new Path(TMP_OUTPUT); + + protected HamaConfiguration configuration; + + // these variables are preventing from rebooting the whole stuff again since + // setup and teardown are called per method. + + public TestPartitioning() { + configuration = new HamaConfiguration(); + configuration.set("bsp.master.address", "localhost"); + configuration.set("hama.child.redirect.log.console", "true"); + assertEquals("Make sure master addr is set to localhost:", "localhost", + configuration.get("bsp.master.address")); + configuration.set("bsp.local.dir", "/tmp/hama-test"); + configuration.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH); + configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost"); + configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810); + configuration.set("hama.sync.client.class", + org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class + .getCanonicalName()); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + } + public void testPartitioner() throws Exception { Configuration conf = new Configuration(); conf.set("bsp.local.dir", "/tmp/hama-test/partitioning"); - conf.set("bsp.partitioning.dir", "/tmp/hama-test/partitioning/localtest"); conf.setBoolean("bsp.input.runtime.partitioning", true); BSPJob bsp = new BSPJob(new HamaConfiguration(conf)); bsp.setJobName("Test partitioning with input"); @@ -51,12 +84,12 @@ conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600); bsp.setInputFormat(TextInputFormat.class); bsp.setOutputFormat(NullOutputFormat.class); - bsp.setInputPath(new Path("../CHANGES.txt")); + FileInputFormat.setInputPaths(bsp, "../CHANGES.txt,../README.txt"); bsp.setPartitioner(HashPartitioner.class); assertTrue(bsp.waitForCompletion(true)); FileSystem fs = FileSystem.get(conf); - fs.delete(new Path("/tmp/hama-test/partitioning/localtest"), true); + fs.delete(OUTPUT_PATH, true); } public static class PartionedBSP extends