Index: src/examples/org/apache/hama/examples/ExampleDriver.java =================================================================== --- src/examples/org/apache/hama/examples/ExampleDriver.java (revision 1072852) +++ src/examples/org/apache/hama/examples/ExampleDriver.java (working copy) @@ -27,6 +27,7 @@ ProgramDriver pgd = new ProgramDriver(); try { pgd.addClass("pi", PiEstimator.class, "Pi Estimator"); + pgd.addClass("bench", RandBench.class, "Random Communication Benchmark"); pgd.addClass("test", SerializePrinting.class, "Serialize Printing Test"); pgd.driver(args); Index: src/examples/org/apache/hama/examples/RandBench.java =================================================================== --- src/examples/org/apache/hama/examples/RandBench.java (revision 0) +++ src/examples/org/apache/hama/examples/RandBench.java (revision 0) @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.BSPMessage; +import org.apache.hama.bsp.BSPPeerProtocol; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.mortbay.log.Log; + +public class RandBench { + private static final String SIZEOFMSG = "msg.size"; + private static final String N_COMMUNICATIONS = "communications.num"; + private static final String N_SUPERSTEPS = "supersteps.num"; + + public static class RandBSP extends BSP { + private Configuration conf; + private Random r = new Random(); + private int sizeOfMsg; + private int nCommunications; + private int nSupersteps; + + @Override + public void bsp(BSPPeerProtocol bspPeer) throws IOException, + KeeperException, InterruptedException { + byte[] dummyData = new byte[sizeOfMsg]; + BSPMessage msg = null; + String[] peers = bspPeer.getAllPeerNames(); + String peerName = bspPeer.getPeerName(); + + for (int i = 0; i < nSupersteps; i++) { + + for (int j = 0; j < nCommunications; j++) { + String tPeer = peers[r.nextInt(peers.length)]; + String tag = peerName + " to " + tPeer; + msg = new BSPMessage(Bytes.toBytes(tag), dummyData); + bspPeer.send(tPeer, msg); + } + + bspPeer.sync(); + + if ((nSupersteps - 1) == i) { + // finalize + BSPMessage received; + while ((received = bspPeer.getCurrentMessage()) != null) { + Log.info(Bytes.toString(received.getTag()) + " - " + + received.getData().length + " Bytes"); + } + } + } + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1); + this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1); + this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1); + } + + @Override + public Configuration getConf() { + return conf; + } + + } + + public static void main(String[] args) throws Exception { + if (args.length < 3) { + System.out.println("Usage: "); + System.exit(-1); + } + + // BSP job configuration + HamaConfiguration conf = new HamaConfiguration(); + + conf.setInt(SIZEOFMSG, Integer.parseInt(args[0])); + conf.setInt(N_COMMUNICATIONS, Integer.parseInt(args[1])); + conf.setInt(N_SUPERSTEPS, Integer.parseInt(args[2])); + + BSPJob bsp = new BSPJob(conf, RandBench.class); + // Set the job name + bsp.setJobName("Random Communication Benchmark"); + bsp.setBspClass(RandBSP.class); + + // Set the task size as a number of GroomServer + BSPJobClient jobClient = new BSPJobClient(conf); + ClusterStatus cluster = jobClient.getClusterStatus(false); + bsp.setNumBspTask(cluster.getGroomServers()); + + long startTime = System.currentTimeMillis(); + bsp.waitForCompletion(true); + System.out.println("Job Finished in " + + (double) (System.currentTimeMillis() - startTime) / 1000.0 + + " seconds"); + } +} Index: src/java/org/apache/hama/Constants.java =================================================================== --- src/java/org/apache/hama/Constants.java (revision 1072852) +++ src/java/org/apache/hama/Constants.java (working copy) @@ -46,7 +46,7 @@ /** Default port region server listens on. */ public static final int DEFAULT_PEER_PORT = 61000; - public static final long ATLEAST_WAIT_TIME = 100; + public static final long ATLEAST_WAIT_TIME = 1000; public static final String PEER_ID = "bsp.peer.id"; /** Parameter name for what groom server implementation to use. */ Index: src/java/org/apache/hama/bsp/BSPJob.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJob.java (revision 1072852) +++ src/java/org/apache/hama/bsp/BSPJob.java (working copy) @@ -140,7 +140,7 @@ // ///////////////////////////////////// // Methods for Job Control // ///////////////////////////////////// - public float progress() throws IOException { + public long progress() throws IOException { ensureState(JobState.RUNNING); return info.progress(); } Index: src/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobClient.java (revision 1072852) +++ src/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -35,7 +35,6 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hama.HamaConfiguration; @@ -109,7 +108,7 @@ } @Override - public float progress() throws IOException { + public long progress() throws IOException { ensureFreshStatus(); return status.progress(); } @@ -358,10 +357,10 @@ while (!job.isComplete()) { Thread.sleep(1000); - String report = "bsp: " + StringUtils.formatPercent(job.progress(), 0); + String report = "Current supersteps number: " + job.progress(); if (!report.equals(lastReport)) { - LOG.debug(report); + LOG.info(report); lastReport = report; } } Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (revision 1072852) +++ src/java/org/apache/hama/bsp/BSPMaster.java (working copy) @@ -188,9 +188,7 @@ WorkerProtocol.class, WorkerProtocol.versionID, resolveWorkerAddress(status.getRpcServer()), this.conf); if (null == wc) { - LOG - .warn("Fail to create Worker client at host " - + status.getPeerName()); + LOG.warn("Fail to create Worker client at host " + status.getPeerName()); return false; } // TODO: need to check if peer name has changed @@ -237,6 +235,7 @@ } // update GroomServerStatus hold in groomServers cache. GroomServerStatus fstus = directive.getStatus(); + // groomServers cache contains groom server status reported back if (groomServers.containsKey(fstus)) { GroomServerStatus ustus = null; @@ -247,6 +246,7 @@ break; } }// for + if (null != ustus) { List tlist = ustus.getTaskReports(); for (TaskStatus ts : tlist) { @@ -254,9 +254,13 @@ TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts .getTaskId()).getTaskID()); - jip.completedTask(tip, ts); - LOG.info("JobInProgress id:" + jip.getJobID() + " status:" - + jip.getStatus()); + + if(ts.getRunState() == TaskStatus.State.SUCCEEDED) { + jip.completedTask(tip, ts); + } else if (ts.getRunState() == TaskStatus.State.RUNNING) { + // do nothing + } + if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) { for (JobInProgressListener listener : jobInProgressListeners) { try { @@ -265,6 +269,9 @@ LOG.error("Fail to alter scheduler a job is moved.", ioe); } } + + } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) { + jip.getStatus().setprogress(ts.getSuperstepCount()); } } } else { Index: src/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServer.java (revision 1072852) +++ src/java/org/apache/hama/bsp/GroomServer.java (working copy) @@ -65,7 +65,7 @@ private BSPPeer bspPeer; static final String SUBDIR = "groomServer"; - private volatile static int REPORT_INTERVAL = 60 * 1000; + private volatile static int REPORT_INTERVAL = 1 * 1000; Configuration conf; @@ -102,7 +102,8 @@ // new nexus between GroomServer and BSPMaster // holds/ manage all tasks - //List tasksList = new CopyOnWriteArrayList(); + // List tasksList = new + // CopyOnWriteArrayList(); private String rpcServer; private Server workerServer; @@ -132,8 +133,9 @@ } if (localHostname == null) { - this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface", - "default"), conf.get("bsp.dns.nameserver", "default")); + this.localHostname = DNS.getDefaultHost( + conf.get("bsp.dns.interface", "default"), + conf.get("bsp.dns.nameserver", "default")); } // check local disk checkLocalDirs(conf.getStrings("bsp.local.dir")); @@ -165,6 +167,7 @@ LOG.info("Worker rpc server --> " + rpcServer); } + @SuppressWarnings("deprecation") String address = NetUtils.getServerAddress(conf, "bsp.groom.report.bindAddress", "bsp.groom.report.port", "bsp.groom.report.address"); @@ -363,11 +366,11 @@ Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/" + task.getTaskID() + "/" + "job.jar"); systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile); - + HamaConfiguration conf = new HamaConfiguration(); conf.addResource(localJobFile); jobConf = new BSPJob(conf, task.getJobID().toString()); - + Path jarFile = new Path(jobConf.getJar()); jobConf.setJar(localJarFile.toString()); @@ -599,7 +602,9 @@ // Check state of a Task while (true) { try { - Thread.sleep(1000); + taskStatus.setProgress(bspPeer.getSuperstepCount()); + doReport(this.taskStatus); + Thread.sleep(REPORT_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } @@ -607,6 +612,7 @@ if (bspPeer.getLocalQueueSize() == 0 && bspPeer.getOutgoingQueueSize() == 0 && !runner.isAlive()) { taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + taskStatus.setPhase(TaskStatus.Phase.CLEANUP); doReport(this.taskStatus); break; } @@ -618,9 +624,9 @@ * Update and report refresh status back to BSPMaster. */ private void doReport(TaskStatus taskStatus) { - GroomServerStatus gss = new GroomServerStatus(groomServerName, bspPeer - .getPeerName(), updateTaskStatus(taskStatus), failures, maxCurrentTasks, - rpcServer); + GroomServerStatus gss = new GroomServerStatus(groomServerName, + bspPeer.getPeerName(), updateTaskStatus(taskStatus), failures, + maxCurrentTasks, rpcServer); try { boolean ret = masterClient.report(new Directive(gss)); if (!ret) { @@ -635,15 +641,18 @@ private List updateTaskStatus(TaskStatus taskStatus) { List tlist = new ArrayList(); - synchronized(runningTasks){ - synchronized(finishedTasks){ - TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId()); - taskStatus.setProgress(1f); - taskStatus.setRunState(TaskStatus.State.SUCCEEDED); - taskStatus.setPhase(TaskStatus.Phase.CLEANUP); + synchronized (runningTasks) { + + if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) { + synchronized (finishedTasks) { + TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId()); + tlist.add((TaskStatus) taskStatus.clone()); + finishedTasks.put(taskStatus.getTaskId(), tip); + } + } else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) { tlist.add((TaskStatus) taskStatus.clone()); - finishedTasks.put(taskStatus.getTaskId(), tip); } + } return tlist; } Index: src/java/org/apache/hama/bsp/JobInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/JobInProgress.java (revision 1072852) +++ src/java/org/apache/hama/bsp/JobInProgress.java (working copy) @@ -77,7 +77,7 @@ this.localFs = FileSystem.getLocal(conf); this.jobFile = jobFile; this.master = master; - this.status = new JobStatus(jobId, null, 0.0f, 0.0f, JobStatus.State.PREP.value()); + this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP.value()); this.startTime = System.currentTimeMillis(); this.superstepCounter = 0; this.restartCount = 0; @@ -183,7 +183,7 @@ // Update job status this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(), - 1.0f, 1.0f, JobStatus.RUNNING); + 0L, 0L, JobStatus.RUNNING); tasksInited = true; LOG.debug("Job is initialized."); @@ -233,7 +233,7 @@ if (allDone) { this.status = new JobStatus(this.status.getJobID(), this.profile - .getUser(), 1.0f, 1.0f, 1.0f, JobStatus.SUCCEEDED, superstepCounter); + .getUser(), superstepCounter, superstepCounter, superstepCounter, JobStatus.SUCCEEDED, superstepCounter); this.finishTime = System.currentTimeMillis(); this.status.setFinishTime(this.finishTime); @@ -258,7 +258,7 @@ LOG.debug(">> JobInProgress.kill() step."); if (status.getRunState() != JobStatus.FAILED) { this.status = new JobStatus(status.getJobID(), this.profile.getUser(), - 1.0f, 1.0f, 1.0f, JobStatus.FAILED); + 0L, 0L, 0L, JobStatus.FAILED); this.finishTime = System.currentTimeMillis(); this.status.setFinishTime(this.finishTime); // Index: src/java/org/apache/hama/bsp/JobStatus.java =================================================================== --- src/java/org/apache/hama/bsp/JobStatus.java (revision 1072852) +++ src/java/org/apache/hama/bsp/JobStatus.java (working copy) @@ -21,13 +21,16 @@ import java.io.DataOutput; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableFactory; public class JobStatus implements Writable, Cloneable { - + public static final Log LOG = LogFactory.getLog(JobStatus.class); + static { WritableFactories.setFactory(JobStatus.class, new WritableFactory() { public Writable newInstance() { @@ -58,9 +61,9 @@ public static final int KILLED = 5; private BSPJobID jobid; - private float progress; - private float cleanupProgress; - private float setupProgress; + private long progress; + private long cleanupProgress; + private long setupProgress; private volatile State state;// runState in enum private int runState; private long startTime; @@ -73,22 +76,22 @@ public JobStatus() { } - public JobStatus(BSPJobID jobid, String user, float progress, int runState) { - this(jobid, user, progress, 0.0f, runState); + public JobStatus(BSPJobID jobid, String user, long progress, int runState) { + this(jobid, user, progress, 0, runState); } - public JobStatus(BSPJobID jobid, String user, float progress, float cleanupProgress, + public JobStatus(BSPJobID jobid, String user, long progress, long cleanupProgress, int runState) { - this(jobid, user, 0.0f, progress, cleanupProgress, runState); + this(jobid, user, 0, progress, cleanupProgress, runState); } - public JobStatus(BSPJobID jobid, String user, float setupProgress, float progress, - float cleanupProgress, int runState) { - this(jobid, user, 0.0f, progress, cleanupProgress, runState, 0); + public JobStatus(BSPJobID jobid, String user, long setupProgress, long progress, + long cleanupProgress, int runState) { + this(jobid, user, 0, progress, cleanupProgress, runState, 0); } - public JobStatus(BSPJobID jobid, String user, float setupProgress, float progress, - float cleanupProgress, int runState, long superstepCount) { + public JobStatus(BSPJobID jobid, String user, long setupProgress, long progress, + long cleanupProgress, int runState, long superstepCount) { this.jobid = jobid; this.setupProgress = setupProgress; this.progress = progress; @@ -103,28 +106,28 @@ return jobid; } - public synchronized float progress() { + public synchronized long progress() { return progress; } - synchronized void setprogress(float p) { - this.progress = (float) Math.min(1.0, Math.max(0.0, p)); + synchronized void setprogress(long p) { + this.progress = p; } - public synchronized float cleanupProgress() { + public synchronized long cleanupProgress() { return cleanupProgress; } - synchronized void setCleanupProgress(float p) { - this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p)); + synchronized void setCleanupProgress(int p) { + this.cleanupProgress = p; } - public synchronized float setupProgress() { + public synchronized long setupProgress() { return setupProgress; } - synchronized void setSetupProgress(float p) { - this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p)); + synchronized void setSetupProgress(long p) { + this.setupProgress = p; } public JobStatus.State getState(){ @@ -203,9 +206,9 @@ public synchronized void write(DataOutput out) throws IOException { jobid.write(out); - out.writeFloat(setupProgress); - out.writeFloat(progress); - out.writeFloat(cleanupProgress); + out.writeLong(setupProgress); + out.writeLong(progress); + out.writeLong(cleanupProgress); out.writeInt(runState); out.writeLong(startTime); out.writeLong(finishTime); @@ -217,9 +220,9 @@ public synchronized void readFields(DataInput in) throws IOException { this.jobid = new BSPJobID(); jobid.readFields(in); - this.setupProgress = in.readFloat(); - this.progress = in.readFloat(); - this.cleanupProgress = in.readFloat(); + this.setupProgress = in.readLong(); + this.progress = in.readLong(); + this.cleanupProgress = in.readLong(); this.runState = in.readInt(); this.startTime = in.readLong(); this.finishTime = in.readLong(); Index: src/java/org/apache/hama/bsp/RunningJob.java =================================================================== --- src/java/org/apache/hama/bsp/RunningJob.java (revision 1072852) +++ src/java/org/apache/hama/bsp/RunningJob.java (working copy) @@ -60,7 +60,7 @@ * @return the progress of the job's tasks. * @throws IOException */ - public float progress() throws IOException; + public long progress() throws IOException; /** * Check if the job is finished or not. This is a non-blocking call.