Index: src/examples/org/apache/hama/examples/PiEstimator.java =================================================================== --- src/examples/org/apache/hama/examples/PiEstimator.java (revision 1142856) +++ src/examples/org/apache/hama/examples/PiEstimator.java (working copy) @@ -31,7 +31,7 @@ import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.BSPJobClient; -import org.apache.hama.bsp.BSPPeerProtocol; +import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.ClusterStatus; import org.apache.hama.bsp.DoubleMessage; import org.apache.zookeeper.KeeperException; @@ -46,7 +46,7 @@ private String masterTask; private static final int iterations = 10000; - public void bsp(BSPPeerProtocol bspPeer) throws IOException, + public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, InterruptedException { int in = 0, out = 0; for (int i = 0; i < iterations; i++) { @@ -133,7 +133,7 @@ bsp.setNumBspTask(Integer.parseInt(args[0])); } else { // Set to maximum - bsp.setNumBspTask(cluster.getGroomServers()); + bsp.setNumBspTask(cluster.getMaxTasks()); } // Choose one as a master Index: src/examples/org/apache/hama/examples/RandBench.java =================================================================== --- src/examples/org/apache/hama/examples/RandBench.java (revision 1142856) +++ src/examples/org/apache/hama/examples/RandBench.java (working copy) @@ -28,7 +28,7 @@ import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.BSPMessage; -import org.apache.hama.bsp.BSPPeerProtocol; +import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.ByteMessage; import org.apache.hama.bsp.ClusterStatus; import org.apache.hama.util.Bytes; @@ -48,7 +48,7 @@ private int nSupersteps; @Override - public void bsp(BSPPeerProtocol bspPeer) throws IOException, + public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, InterruptedException { byte[] dummyData = new byte[sizeOfMsg]; BSPMessage msg = null; @@ -111,7 +111,7 @@ // Set the task size as a number of GroomServer BSPJobClient jobClient = new BSPJobClient(conf); ClusterStatus cluster = jobClient.getClusterStatus(false); - bsp.setNumBspTask(cluster.getGroomServers()); + bsp.setNumBspTask(cluster.getMaxTasks()); long startTime = System.currentTimeMillis(); bsp.waitForCompletion(true); Index: src/examples/org/apache/hama/examples/SerializePrinting.java =================================================================== --- src/examples/org/apache/hama/examples/SerializePrinting.java (revision 1142856) +++ src/examples/org/apache/hama/examples/SerializePrinting.java (working copy) @@ -27,13 +27,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.BSPJobClient; -import org.apache.hama.bsp.BSPPeerProtocol; +import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.ClusterStatus; import org.apache.zookeeper.KeeperException; @@ -47,7 +47,7 @@ private FileSystem fileSys; private int num; - public void bsp(BSPPeerProtocol bspPeer) throws IOException, + public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, InterruptedException { int i = 0; Index: src/examples/org/apache/hama/examples/graph/PageRank.java =================================================================== --- src/examples/org/apache/hama/examples/graph/PageRank.java (revision 1142856) +++ src/examples/org/apache/hama/examples/graph/PageRank.java (working copy) @@ -30,7 +30,7 @@ import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.BSPJobClient; -import org.apache.hama.bsp.BSPPeerProtocol; +import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.ClusterStatus; import org.apache.hama.bsp.DoubleMessage; import org.apache.zookeeper.KeeperException; @@ -48,7 +48,7 @@ private String[] peerNames; @Override - public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException, + public void bsp(BSPPeer peer) throws IOException, KeeperException, InterruptedException { String master = conf.get(MASTER_TASK); // setup the datasets @@ -109,7 +109,7 @@ LOG.info("Finished with iteration " + iteration + "!"); } - private double broadcastError(BSPPeerProtocol peer, String master, + private double broadcastError(BSPPeer peer, String master, double error) throws IOException, KeeperException, InterruptedException { peer.send(master, new DoubleMessage("", error)); peer.sync(); @@ -148,7 +148,7 @@ } } - private void sendMessageToNeighbors(BSPPeerProtocol peer, PageRankVertex v) + private void sendMessageToNeighbors(BSPPeer peer, PageRankVertex v) throws IOException { List outgoingEdges = adjacencyList.get(v); for (PageRankVertex adjacent : outgoingEdges) { Index: src/examples/org/apache/hama/examples/graph/PageRankBase.java =================================================================== --- src/examples/org/apache/hama/examples/graph/PageRankBase.java (revision 1142856) +++ src/examples/org/apache/hama/examples/graph/PageRankBase.java (working copy) @@ -39,7 +39,7 @@ import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPPeerProtocol; +import org.apache.hama.bsp.BSPPeer; public abstract class PageRankBase extends BSP { public static final Log LOG = LogFactory.getLog(PageRankBase.class); @@ -52,7 +52,7 @@ protected static double EPSILON = 0.001; static HashMap> mapAdjacencyList( - Configuration conf, BSPPeerProtocol peer) throws FileNotFoundException, + Configuration conf, BSPPeer peer) throws FileNotFoundException, IOException { FileSystem fs = FileSystem.get(conf); HashMap> adjacencyList = new HashMap>(); @@ -199,7 +199,7 @@ return conf; } - static void savePageRankMap(BSPPeerProtocol peer, Configuration conf, + static void savePageRankMap(BSPPeer peer, Configuration conf, Map tentativePagerank) throws IOException { FileSystem fs = FileSystem.get(conf); Path outPath = new Path(conf.get("out.path") + Path.SEPARATOR + "temp" Index: src/examples/org/apache/hama/examples/graph/ShortestPaths.java =================================================================== --- src/examples/org/apache/hama/examples/graph/ShortestPaths.java (revision 1142856) +++ src/examples/org/apache/hama/examples/graph/ShortestPaths.java (working copy) @@ -32,7 +32,7 @@ import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.BSPJobClient; -import org.apache.hama.bsp.BSPPeerProtocol; +import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.BooleanMessage; import org.apache.hama.bsp.ClusterStatus; import org.apache.hama.bsp.IntegerMessage; @@ -48,7 +48,7 @@ private String[] peerNames; @Override - public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException, + public void bsp(BSPPeer peer) throws IOException, KeeperException, InterruptedException { LOG.info("Mapping graph into ram..."); // map our input into ram @@ -119,7 +119,7 @@ * @throws KeeperException * @throws InterruptedException */ - private boolean broadcastUpdatesMade(BSPPeerProtocol peer, String master, + private boolean broadcastUpdatesMade(BSPPeer peer, String master, int updates) throws IOException, KeeperException, InterruptedException { peer.send(master, new IntegerMessage(peer.getPeerName(), updates)); peer.sync(); @@ -154,7 +154,7 @@ * @param id The vertex to all adjacent vertices the new cost has to be send. * @throws IOException */ - private void sendMessageToNeighbors(BSPPeerProtocol peer, + private void sendMessageToNeighbors(BSPPeer peer, ShortestPathVertex id) throws IOException { List outgoingEdges = adjacencyList.get(id); for (ShortestPathVertex adjacent : outgoingEdges) { Index: src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java =================================================================== --- src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java (revision 1142856) +++ src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java (working copy) @@ -34,7 +34,7 @@ import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPPeerProtocol; +import org.apache.hama.bsp.BSPPeer; public abstract class ShortestPathsBase extends BSP { @@ -54,7 +54,7 @@ * @param adjacencyList * @throws IOException */ - static void saveVertexMap(Configuration conf, BSPPeerProtocol peer, + static void saveVertexMap(Configuration conf, BSPPeer peer, Map> adjacencyList) throws IOException { Path outPath = new Path(conf.get(OUT_PATH) + Path.SEPARATOR @@ -106,7 +106,7 @@ * @param adjacencyList * @param vertexLookupMap */ - static void mapAdjacencyList(Configuration conf, BSPPeerProtocol peer, + static void mapAdjacencyList(Configuration conf, BSPPeer peer, Map> adjacencyList, Map vertexLookupMap) throws FileNotFoundException, IOException { Index: src/java/org/apache/hama/bsp/BSPInterface.java =================================================================== --- src/java/org/apache/hama/bsp/BSPInterface.java (revision 1142894) +++ src/java/org/apache/hama/bsp/BSPInterface.java (working copy) @@ -39,7 +39,6 @@ * @throws KeeperException * @throws InterruptedException */ - public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException, + public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, InterruptedException; - } Index: src/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobClient.java (revision 1142894) +++ src/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -412,13 +412,9 @@ IOException { BSPJobClient jc = new BSPJobClient(job.getConf()); - // TODO this code must be removed - // when GroomServer supports the multiple tasks. if (job.getNumBspTask() == 0 - || job.getNumBspTask() > jc.getClusterStatus(false).getGroomServers()) { - // If the number of tasks is greater than the number of GroomServer, - // reset the number of tasks as number of GroomServer. - job.setNumBspTask(jc.getClusterStatus(false).getGroomServers()); + || job.getNumBspTask() > jc.getClusterStatus(false).getMaxTasks()) { + job.setNumBspTask(jc.getClusterStatus(false).getMaxTasks()); } RunningJob running = jc.submitJobInternal(job); Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (revision 1142894) +++ src/java/org/apache/hama/bsp/BSPMaster.java (working copy) @@ -62,7 +62,7 @@ private HamaConfiguration conf; /** - * Constants for BSPMaster's status. + * Constants for BSPMaster's status. */ public static enum State { INITIALIZING, RUNNING Index: src/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeer.java (revision 1142894) +++ src/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -72,11 +72,29 @@ private InetSocketAddress peerAddress; private TaskStatus currentTaskStatus; + private TaskAttemptID taskid; + private BSPPeerProtocol umbilical; + + /** + * Protected default constructor for LocalBSPRunner. + */ + protected BSPPeer() { + bspRoot = null; + quorumServers = null; + } + /** * Constructor + * + * @param umbilical + * @param taskid */ - public BSPPeer(Configuration conf) throws IOException { + public BSPPeer(Configuration conf, TaskAttemptID taskid, + BSPPeerProtocol umbilical) throws IOException { this.conf = conf; + this.taskid = taskid; + this.umbilical = umbilical; + String bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); int bindPort = conf @@ -89,15 +107,14 @@ // TODO: may require to dynamic reflect the underlying // network e.g. ip address, port. peerAddress = new InetSocketAddress(bindAddress, bindPort); - reinitialize(); } public void reinitialize() { try { LOG.debug("reinitialize(): " + getPeerName()); - server = RPC.getServer(this, peerAddress.getHostName(), peerAddress - .getPort(), conf); + server = RPC.getServer(this, peerAddress.getHostName(), + peerAddress.getPort(), conf); server.start(); LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:" + peerAddress.getPort()); @@ -202,7 +219,8 @@ leaveBarrier(); currentTaskStatus.incrementSuperstepCount(); - + umbilical.incrementSuperstepCount(taskid); + startTime = System.currentTimeMillis(); // Clear outgoing queues. clearOutgoingQueues(); @@ -219,8 +237,8 @@ // TODO: This is a quite temporary solution of HAMA-387. // If zk.getChildren() response is slower than 200 milliseconds, // BSP system will be hanged. - - // We have to consider another way to avoid this problem. + + // We have to consider another way to avoid this problem. if ((System.currentTimeMillis() - startTime) < 200) { Thread.sleep(200); // at least wait } @@ -229,8 +247,9 @@ protected boolean enterBarrier() throws KeeperException, InterruptedException { LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " + this.getSuperstepCount()); - zk.create(bspRoot + "/" + getPeerName(), Bytes.toBytes(this - .getSuperstepCount()), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + zk.create(bspRoot + "/" + getPeerName(), + Bytes.toBytes(this.getSuperstepCount()), Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); while (true) { synchronized (mutex) { @@ -322,8 +341,8 @@ private InetSocketAddress getAddress(String peerName) { String[] peerAddrParts = peerName.split(":"); - return new InetSocketAddress(peerAddrParts[0], Integer - .parseInt(peerAddrParts[1])); + return new InetSocketAddress(peerAddrParts[0], + Integer.parseInt(peerAddrParts[1])); } @Override Index: src/java/org/apache/hama/bsp/BSPPeerProtocol.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeerProtocol.java (revision 1142894) +++ src/java/org/apache/hama/bsp/BSPPeerProtocol.java (working copy) @@ -17,12 +17,16 @@ */ package org.apache.hama.bsp; +import java.io.Closeable; import java.io.IOException; +import org.apache.hama.Constants; + /** * Protocol that task child process uses to contact its parent process. */ -public interface BSPPeerProtocol extends BSPPeerInterface { +public interface BSPPeerProtocol extends BSPRPCProtocolVersion, Closeable, + Constants { /** Called when a child task process starts, to get its task. */ Task getTask(TaskAttemptID taskid) throws IOException; @@ -46,4 +50,11 @@ /** Report that the task encounted a local filesystem error. */ void fsError(TaskAttemptID taskId, String message) throws IOException; + void incrementSuperstepCount(TaskAttemptID taskid) throws IOException; + + /** + * @return the all BSPPeer names. + */ + PeerNames getAllPeerNames(); + } Index: src/java/org/apache/hama/bsp/BSPTask.java =================================================================== --- src/java/org/apache/hama/bsp/BSPTask.java (revision 1142894) +++ src/java/org/apache/hama/bsp/BSPTask.java (working copy) @@ -49,14 +49,14 @@ } @Override - public void run(BSPJob job, BSPPeerProtocol umbilical) + public void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical) throws IOException { BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass( "bsp.work.class", BSP.class), job.getConf()); try { - bsp.bsp(umbilical); + bsp.bsp(bspPeer); } catch (IOException e) { LOG.error("Exception during BSP execution!", e); } catch (KeeperException e) { Index: src/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServer.java (revision 1142896) +++ src/java/org/apache/hama/bsp/GroomServer.java (working copy) @@ -24,6 +24,7 @@ import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -49,15 +50,14 @@ import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.RunJar; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; import org.apache.hama.ipc.MasterProtocol; import org.apache.hama.ipc.WorkerProtocol; import org.apache.log4j.LogManager; -import org.apache.zookeeper.KeeperException; /** * A Groom Server (shortly referred to as groom) is a process that performs bsp @@ -68,9 +68,7 @@ * physical node. */ public class GroomServer implements Runnable, WorkerProtocol, BSPPeerProtocol { - public static final Log LOG = LogFactory.getLog(GroomServer.class); - private BSPPeer bspPeer; static final String SUBDIR = "groomServer"; private volatile static int REPORT_INTERVAL = 1 * 1000; @@ -92,6 +90,7 @@ // Attributes String groomServerName; String localHostname; + String groomHostName; InetSocketAddress bspMasterAddr; private Instructor instructor; @@ -121,21 +120,25 @@ InetSocketAddress taskReportAddress; Server taskReportServer = null; -// private BlockingQueue tasksToCleanup = new LinkedBlockingQueue(); + private PeerNames allPeerNames = null; + + // private BlockingQueue tasksToCleanup = new + // LinkedBlockingQueue(); private class DispatchTasksHandler implements DirectiveHandler { public void handle(Directive directive) throws DirectiveException { GroomServerAction[] actions = ((DispatchTasksDirective) directive) .getActions(); - synchronized (bspPeer) { - bspPeer.setAllPeerNames(((DispatchTasksDirective) directive) - .getGroomServerPeers().values()); - } + + allPeerNames = new PeerNames(((DispatchTasksDirective) directive) + .getGroomServerPeers().values()); + if (LOG.isDebugEnabled()) { LOG.debug("Got Response from BSPMaster with " + ((actions != null) ? actions.length : 0) + " actions"); } + if (actions != null) { for (GroomServerAction action : actions) { if (action instanceof LaunchTaskAction) { @@ -216,8 +219,9 @@ } if (localHostname == null) { - this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface", - "default"), conf.get("bsp.dns.nameserver", "default")); + this.localHostname = DNS.getDefaultHost( + conf.get("bsp.dns.interface", "default"), + conf.get("bsp.dns.nameserver", "default")); } // check local disk checkLocalDirs(conf.getStrings("bsp.local.dir")); @@ -230,7 +234,6 @@ this.finishedTasks = new LinkedHashMap(); this.conf.set(Constants.PEER_HOST, localHostname); this.conf.set(Constants.GROOM_RPC_HOST, localHostname); - bspPeer = new BSPPeer(conf); int rpcPort = -1; String rpcAddr = null; @@ -270,8 +273,9 @@ + ":" + taskReportAddress.getPort()); LOG.info("GroomServer up at: " + this.taskReportAddress); - this.groomServerName = "groomd_" + bspPeer.getPeerName().replace(':', '_'); - LOG.info("Starting groom: " + this.groomServerName); + this.groomHostName = rpcAddr; + this.groomServerName = "groomd_" + this.rpcServer.replace(':', '_'); + LOG.info("Starting groom: " + this.rpcServer); DistributedCache.purgeCache(this.conf); @@ -284,7 +288,7 @@ throw new IllegalArgumentException("Error rpc address " + rpcAddr + " port" + rpcPort); if (!this.masterClient.register(new GroomServerStatus(groomServerName, - bspPeer.getPeerName(), cloneAndResetRunningTaskStatuses(), failures, + getBspPeerName(), cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks, this.rpcServer))) { LOG.error("There is a problem in establishing communication" + " link with BSPMaster"); @@ -376,14 +380,16 @@ Thread.sleep(REPORT_INTERVAL); TaskInProgress tip = e.getValue(); TaskStatus taskStatus = tip.getStatus(); - taskStatus.setProgress(bspPeer.getSuperstepCount()); + + if (taskStatus.getRunState() == TaskStatus.State.RUNNING) { + taskStatus.setProgress(taskStatus.getSuperstepCount()); - if (bspPeer.getLocalQueueSize() == 0 - && bspPeer.getOutgoingQueueSize() == 0 && !tip.runner.isAlive()) { - if (taskStatus.getRunState() != TaskStatus.State.FAILED) { - taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + if (!tip.runner.isAlive()) { + if (taskStatus.getRunState() != TaskStatus.State.FAILED) { + taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + } + taskStatus.setPhase(TaskStatus.Phase.CLEANUP); } - taskStatus.setPhase(TaskStatus.Phase.CLEANUP); } doReport(taskStatus); @@ -451,7 +457,7 @@ */ public void doReport(TaskStatus taskStatus) { GroomServerStatus groomStatus = new GroomServerStatus(groomServerName, - bspPeer.getPeerName(), updateTaskStatus(taskStatus), failures, + getBspPeerName(), updateTaskStatus(taskStatus), failures, maxCurrentTasks, rpcServer); try { boolean ret = masterClient.report(new ReportGroomStatusDirective( @@ -647,7 +653,6 @@ public synchronized void close() throws IOException { this.running = false; this.initialized = false; - bspPeer.close(); cleanupStorage(); this.workerServer.stop(); RPC.stopProxy(masterClient); @@ -728,8 +733,6 @@ public void launchTask() throws IOException { localizeTask(task); taskStatus.setRunState(TaskStatus.State.RUNNING); - bspPeer.setJobConf(jobConf); - bspPeer.setCurrentTaskStatus(taskStatus); this.runner = task.createRunner(GroomServer.this); this.runner.start(); } @@ -806,14 +809,15 @@ } /** - * GroomServer address information. - * * @return bsp peer information in the form of "address:port". */ public String getBspPeerName() { - if (null != bspPeer) - return bspPeer.getPeerName(); - return null; + // TODO Later, peers list should be returned. + return this.groomHostName + ":" + Constants.DEFAULT_PEER_PORT; + } + + public Collection AllPeerNames() { + return allPeerNames.getAllPeerNames(); } /** @@ -841,12 +845,26 @@ defaultConf.addResource(new Path(task.getJobFile())); BSPJob job = new BSPJob(task.getJobID(), task.getJobFile()); + defaultConf.set(Constants.PEER_HOST, args[3]); + defaultConf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); + + BSPPeer bspPeer = new BSPPeer(defaultConf, taskid, umbilical); + bspPeer.setJobConf(job); + bspPeer.setAllPeerNames(umbilical.getAllPeerNames().getAllPeerNames()); + + TaskStatus taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), + 0, TaskStatus.State.RUNNING, "running", host, + TaskStatus.Phase.STARTING); + + bspPeer.setCurrentTaskStatus(taskStatus); + try { // use job-specified working directory FileSystem.get(job.getConf()).setWorkingDirectory( job.getWorkingDirectory()); - task.run(job, umbilical); // run the task + task.run(job, bspPeer, umbilical); // run the task + } catch (FSError e) { LOG.fatal("FSError from child", e); umbilical.fsError(taskid, e.getMessage()); @@ -856,6 +874,8 @@ ByteArrayOutputStream baos = new ByteArrayOutputStream(); throwable.printStackTrace(new PrintStream(baos)); } finally { + bspPeer.close(); // close peer. + RPC.stopProxy(umbilical); // Shutting down log4j of the child-vm... // This assumes that on return from Task.run() @@ -875,6 +895,11 @@ } } + public void incrementSuperstepCount(TaskAttemptID taskid) throws IOException { + TaskInProgress tip = tasks.get(taskid); + tip.getStatus().incrementSuperstepCount(); + } + @Override public boolean ping(TaskAttemptID taskid) throws IOException { // TODO Auto-generated method stub @@ -885,62 +910,15 @@ public void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException { // TODO Auto-generated method stub - } @Override public void fsError(TaskAttemptID taskId, String message) throws IOException { // TODO Auto-generated method stub - } @Override - public void send(String peerName, BSPMessage msg) throws IOException { - bspPeer.send(peerName, msg); + public PeerNames getAllPeerNames() { + return allPeerNames; } - - @Override - public void put(BSPMessage msg) throws IOException { - bspPeer.put(msg); - } - - @Override - public void put(BSPMessageBundle messages) throws IOException { - bspPeer.put(messages); - } - - @Override - public BSPMessage getCurrentMessage() throws IOException { - return bspPeer.getCurrentMessage(); - } - - @Override - public int getNumCurrentMessages() { - return bspPeer.getNumCurrentMessages(); - } - - @Override - public void sync() throws IOException, KeeperException, InterruptedException { - bspPeer.sync(); - } - - @Override - public long getSuperstepCount() { - return bspPeer.getSuperstepCount(); - } - - @Override - public String getPeerName() { - return bspPeer.getPeerName(); - } - - @Override - public String[] getAllPeerNames() { - return bspPeer.getAllPeerNames(); - } - - @Override - public void clear() { - bspPeer.clear(); - } -} +} \ No newline at end of file Index: src/java/org/apache/hama/bsp/JobProfile.java =================================================================== --- src/java/org/apache/hama/bsp/JobProfile.java (revision 1142894) +++ src/java/org/apache/hama/bsp/JobProfile.java (working copy) @@ -31,7 +31,7 @@ */ public class JobProfile implements Writable { - static { // register a ctor + static { // register actor WritableFactories.setFactory(JobProfile.class, new WritableFactory() { public Writable newInstance() { return new JobProfile(); Index: src/java/org/apache/hama/bsp/LocalBSPRunner.java =================================================================== --- src/java/org/apache/hama/bsp/LocalBSPRunner.java (revision 1142894) +++ src/java/org/apache/hama/bsp/LocalBSPRunner.java (working copy) @@ -48,7 +48,6 @@ * */ public class LocalBSPRunner implements JobSubmissionProtocol { - public static final Log LOG = LogFactory.getLog(LocalBSPRunner.class); private static final String IDENTIFIER = "localrunner"; @@ -65,7 +64,7 @@ .newFixedThreadPool(threadPoolSize); } - protected HashMap localGrooms = new HashMap(); + protected HashMap localGrooms = new HashMap(); protected String jobFile; protected String jobName; @@ -115,7 +114,7 @@ JobStatus.RUNNING); for (int i = 0; i < threadPoolSize; i++) { String name = IDENTIFIER + " " + i; - BSPPeerProtocol localGroom = new LocalGroom(name); + LocalGroom localGroom = new LocalGroom(name); localGrooms.put(name, localGroom); futureList.add(threadPool.submit(new BSPRunner(conf, job, ReflectionUtils .newInstance(job.getBspClass(), conf), localGroom))); @@ -127,7 +126,7 @@ @Override public ClusterStatus getClusterStatus(boolean detailed) throws IOException { Map map = new HashMap(); - for (Entry entry : localGrooms.entrySet()) { + for (Entry entry : localGrooms.entrySet()) { map.put(entry.getKey(), entry.getValue().getPeerName()); } return new ClusterStatus(map, threadPoolSize, threadPoolSize, State.RUNNING); @@ -185,10 +184,9 @@ Configuration conf; BSPJob job; BSP bsp; - BSPPeerProtocol groom; + LocalGroom groom; - public BSPRunner(Configuration conf, BSPJob job, BSP bsp, - BSPPeerProtocol groom) { + public BSPRunner(Configuration conf, BSPJob job, BSP bsp, LocalGroom groom) { super(); this.conf = conf; this.job = job; @@ -199,7 +197,7 @@ public void run() { bsp.setConf(conf); try { - bsp.bsp(groom); + bsp.bsp(groom); } catch (Exception e) { LOG.error("Exception during BSP execution!", e); } @@ -247,15 +245,14 @@ } - class LocalGroom implements BSPPeerProtocol { + class LocalGroom extends BSPPeer { private long superStepCount = 0; private final ConcurrentLinkedQueue localMessageQueue = new ConcurrentLinkedQueue(); // outgoing queue private final Map> outgoingQueues = new ConcurrentHashMap>(); private final String peerName; - public LocalGroom(String peerName) { - super(); + public LocalGroom(String peerName) throws IOException { this.peerName = peerName; } @@ -352,33 +349,8 @@ } @Override - public Task getTask(TaskAttemptID taskid) throws IOException { - return null; - } - - @Override - public boolean ping(TaskAttemptID taskid) throws IOException { - return true; - } - - @Override - public void done(TaskAttemptID taskid, boolean shouldBePromoted) - throws IOException { - - } - - @Override - public void fsError(TaskAttemptID taskId, String message) - throws IOException { - - } - - @Override public void put(BSPMessageBundle messages) throws IOException { - throw new UnsupportedOperationException( - "Messagebundle is not supported by local testing"); } } - } Index: src/java/org/apache/hama/bsp/PeerNames.java =================================================================== --- src/java/org/apache/hama/bsp/PeerNames.java (revision 0) +++ src/java/org/apache/hama/bsp/PeerNames.java (revision 0) @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * + */ +public class PeerNames implements Writable { + Collection allPeers; + + public PeerNames() { + this.allPeers = new ArrayList(); + } + + public PeerNames(Collection allPeers) { + this.allPeers = allPeers; + } + + public Collection getAllPeerNames() { + return allPeers; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(allPeers.size()); + for (String peerName : allPeers) { + Text.writeString(out, peerName); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int peersNum = in.readInt(); + for (int i = 0; i < peersNum; i++) { + allPeers.add(Text.readString(in)); + } + } + +} Index: src/java/org/apache/hama/bsp/SimpleTaskScheduler.java =================================================================== --- src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (revision 1142894) +++ src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (working copy) @@ -130,6 +130,7 @@ public void run() { // obtain tasks Task t = jip.obtainNewTask(this.stus, groomNum); + // assembly into actions // List tasks = new ArrayList(); if (jip.getStatus().getRunState() == JobStatus.RUNNING) { Index: src/java/org/apache/hama/bsp/Task.java =================================================================== --- src/java/org/apache/hama/bsp/Task.java (revision 1142894) +++ src/java/org/apache/hama/bsp/Task.java (working copy) @@ -121,9 +121,10 @@ * Run this task as a part of the named job. This method is executed in the * child process. * - * @param umbilical for progress reports + * @param bspPeer for communications + * @param umbilical for communications with GroomServer */ - public abstract void run(BSPJob job, BSPPeerProtocol umbilical) + public abstract void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical) throws IOException; public abstract BSPTaskRunner createRunner(GroomServer groom); Index: src/java/org/apache/hama/bsp/TaskRunner.java =================================================================== --- src/java/org/apache/hama/bsp/TaskRunner.java (revision 1142894) +++ src/java/org/apache/hama/bsp/TaskRunner.java (working copy) @@ -116,6 +116,7 @@ vargs.add(addr.getHostName()); vargs.add(Integer.toString(addr.getPort())); vargs.add(task.getTaskID().toString()); + vargs.add(groomServer.groomHostName); // Run java runChild((String[]) vargs.toArray(new String[0]), workDir); Index: src/test/testjar/ClassSerializePrinting.java =================================================================== --- src/test/testjar/ClassSerializePrinting.java (revision 1142856) +++ src/test/testjar/ClassSerializePrinting.java (working copy) @@ -26,10 +26,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPPeerProtocol; +import org.apache.hama.bsp.BSPPeer; import org.apache.zookeeper.KeeperException; public class ClassSerializePrinting { @@ -42,7 +42,7 @@ private FileSystem fileSys; private int num; - public void bsp(BSPPeerProtocol bspPeer) throws IOException, + public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, InterruptedException { int i = 0;