Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1226591) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -86,7 +86,7 @@ JobProfile profile; JobStatus status; long statustime; - + public NetworkedJob() { } @@ -305,14 +305,20 @@ fs.mkdirs(submitJobDir); short replication = (short) job.getInt("bsp.submit.replication", 10); + ClusterStatus clusterStatus = getClusterStatus(true); + int maxTasks = clusterStatus.getMaxTasks(); + if (maxTasks < job.getNumBspTask()) { + job.setNumBspTask(maxTasks); + } + // only create the splits if we have an input if (job.get("bsp.input.dir") != null) { // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); if (job.getConf().get("bsp.input.partitioner.class") != null) { - job = partition(job); + job = partition(job, maxTasks); } - job.setNumBspTask(writeSplits(job, submitSplitFile)); + job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks)); job.set("bsp.job.split.file", submitSplitFile.toString()); } @@ -368,9 +374,24 @@ } @SuppressWarnings({ "rawtypes", "unchecked" }) - protected BSPJob partition(BSPJob job) throws IOException { + protected BSPJob partition(BSPJob job, int maxTasks) throws IOException { InputSplit[] splits = job.getInputFormat().getSplits(job, 0); - int numOfTasks = splits.length; // job.getNumBspTask(); + int numOfTasks = 0; + + if (maxTasks > splits.length) { + if (job.getNumBspTask() > 1 && maxTasks > job.getNumBspTask()) { + numOfTasks = job.getNumBspTask(); + } else { + numOfTasks = splits.length; + } + } else { + if (job.getNumBspTask() > 1 && maxTasks > job.getNumBspTask()) { + numOfTasks = job.getNumBspTask(); + } else { + numOfTasks = maxTasks; + } + } + String input = job.getConf().get("bsp.input.dir"); if (input != null) { @@ -448,7 +469,7 @@ } private String getPartitionName(int i) { - return "part-" + String.valueOf(100000 + i).substring(1, 6); + return "part-" + String.valueOf(100000 + i).substring(1, 6); } /** @@ -492,9 +513,15 @@ return codecClass; } - private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException { + private int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks) + throws IOException { InputSplit[] splits = job.getInputFormat().getSplits(job, 0); + if (maxTasks < splits.length) { + // FIXME : if splits.length is bigger than cluster capacity, we have to + // re-partition. + } + final DataOutputStream out = writeSplitsFileHeader(job.getConf(), submitSplitFile, splits.length); try { @@ -589,7 +616,7 @@ lastReport = report; } } - + LOG.info("The total number of supersteps: " + info.getSuperstepCount()); // TODO job.getCounters().log(LOG); return job.isSuccessful(); Index: examples/src/main/java/org/apache/hama/examples/ShortestPaths.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (revision 1226591) +++ examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (working copy) @@ -199,7 +199,7 @@ } public static void printUsage() { - System.out.println("Usage: "); + System.out.println("Usage: [numTasks]"); } public static void main(String[] args) throws IOException, @@ -221,6 +221,10 @@ bsp.setOutputPath(new Path(args[1])); bsp.setInputPath(new Path(args[2])); + if(args.length == 4) { + bsp.setNumBspTask(Integer.parseInt(args[3])); + } + bsp.setBspClass(ShortestPaths.class); bsp.setInputFormat(SequenceFileInputFormat.class); bsp.setPartitioner(HashPartitioner.class);