Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1226591) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -86,7 +86,7 @@ JobProfile profile; JobStatus status; long statustime; - + public NetworkedJob() { } @@ -305,14 +305,20 @@ fs.mkdirs(submitJobDir); short replication = (short) job.getInt("bsp.submit.replication", 10); + ClusterStatus clusterStatus = getClusterStatus(true); + int maxTasks = clusterStatus.getMaxTasks(); + if (maxTasks < job.getNumBspTask()) { + job.setNumBspTask(maxTasks); + } + // only create the splits if we have an input if (job.get("bsp.input.dir") != null) { // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); if (job.getConf().get("bsp.input.partitioner.class") != null) { - job = partition(job); + job = partition(job, maxTasks); } - job.setNumBspTask(writeSplits(job, submitSplitFile)); + job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks)); job.set("bsp.job.split.file", submitSplitFile.toString()); } @@ -368,9 +374,12 @@ } @SuppressWarnings({ "rawtypes", "unchecked" }) - protected BSPJob partition(BSPJob job) throws IOException { - InputSplit[] splits = job.getInputFormat().getSplits(job, 0); - int numOfTasks = splits.length; // job.getNumBspTask(); + protected BSPJob partition(BSPJob job, int maxTasks) throws IOException { + InputSplit[] splits = job.getInputFormat().getSplits( + job, + (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() + : maxTasks); + String input = job.getConf().get("bsp.input.dir"); if (input != null) { @@ -396,7 +405,7 @@ RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job); List writers = new ArrayList( - numOfTasks); + splits.length); CompressionType compressionType = getOutputCompressionType(job); Class outputCompressorClass = getOutputCompressorClass( @@ -408,7 +417,7 @@ } try { - for (int i = 0; i < numOfTasks; i++) { + for (int i = 0; i < splits.length; i++) { Path p = new Path(partitionedPath, getPartitionName(i)); if (codec == null) { writers.add(SequenceFile.createWriter(fs, job.getConf(), p, @@ -429,7 +438,7 @@ Object value = recordReader.createValue(); while (recordReader.next(key, value)) { int index = Math.abs(partitioner.getPartition(key, value, - numOfTasks)); + splits.length)); writers.get(index).append(key, value); } LOG.debug("Done with split " + i); @@ -447,8 +456,12 @@ return job; } + private boolean isProperSize(int numBspTask, int maxTasks) { + return (numBspTask > 1 && numBspTask < maxTasks); + } + private String getPartitionName(int i) { - return "part-" + String.valueOf(100000 + i).substring(1, 6); + return "part-" + String.valueOf(100000 + i).substring(1, 6); } /** @@ -492,8 +505,12 @@ return codecClass; } - private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException { - InputSplit[] splits = job.getInputFormat().getSplits(job, 0); + private int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks) + throws IOException { + InputSplit[] splits = job.getInputFormat().getSplits( + job, + (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() + : maxTasks); final DataOutputStream out = writeSplitsFileHeader(job.getConf(), submitSplitFile, splits.length); @@ -589,7 +606,7 @@ lastReport = report; } } - + LOG.info("The total number of supersteps: " + info.getSuperstepCount()); // TODO job.getCounters().log(LOG); return job.isSuccessful(); Index: examples/src/main/java/org/apache/hama/examples/ShortestPaths.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (revision 1226591) +++ examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (working copy) @@ -199,7 +199,7 @@ } public static void printUsage() { - System.out.println("Usage: "); + System.out.println("Usage: [numTasks]"); } public static void main(String[] args) throws IOException, @@ -221,6 +221,10 @@ bsp.setOutputPath(new Path(args[1])); bsp.setInputPath(new Path(args[2])); + if(args.length == 4) { + bsp.setNumBspTask(Integer.parseInt(args[3])); + } + bsp.setBspClass(ShortestPaths.class); bsp.setInputFormat(SequenceFileInputFormat.class); bsp.setPartitioner(HashPartitioner.class);