Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (revision 1146937) +++ src/java/org/apache/hama/bsp/BSPMaster.java (working copy) @@ -49,7 +49,7 @@ import org.apache.hama.http.HttpServer; import org.apache.hama.ipc.JobSubmissionProtocol; import org.apache.hama.ipc.MasterProtocol; -import org.apache.hama.ipc.WorkerProtocol; +import org.apache.hama.ipc.GroomProtocol; /** * BSPMaster is responsible to control all the groom servers and to manage bsp @@ -113,7 +113,7 @@ private TaskScheduler taskScheduler; // GroomServers cache - protected ConcurrentMap groomServers = new ConcurrentHashMap(); + protected ConcurrentMap groomServers = new ConcurrentHashMap(); private Instructor instructor; @@ -161,7 +161,7 @@ } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) { jip.getStatus().setprogress(ts.getSuperstepCount()); } else if (jip.getStatus().getRunState() == JobStatus.KILLED) { - WorkerProtocol worker = findGroomServer(tmpStatus); + GroomProtocol worker = findGroomServer(tmpStatus); Directive d1 = new DispatchTasksDirective( currentGroomServerPeers(), new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) }); @@ -313,8 +313,8 @@ } Throwable e = null; try { - WorkerProtocol wc = (WorkerProtocol) RPC.waitForProxy( - WorkerProtocol.class, WorkerProtocol.versionID, + GroomProtocol wc = (GroomProtocol) RPC.waitForProxy( + GroomProtocol.class, GroomProtocol.versionID, resolveWorkerAddress(status.getRpcServer()), this.conf); if (null == wc) { LOG.warn("Fail to create Worker client at host " + status.getPeerName()); @@ -350,7 +350,7 @@ private void updateGroomServersKey(GroomServerStatus old, GroomServerStatus newKey) { synchronized (groomServers) { - WorkerProtocol worker = groomServers.remove(old); + GroomProtocol worker = groomServers.remove(old); groomServers.put(newKey, worker); } } @@ -517,7 +517,7 @@ int numGroomServers = groomServers.size(); if (detailed) { groomPeersMap = new HashMap(); - for (Map.Entry entry : groomServers + for (Map.Entry entry : groomServers .entrySet()) { GroomServerStatus s = entry.getKey(); groomPeersMap.put(s.getGroomName(), s.getPeerName()); @@ -537,12 +537,12 @@ } @Override - public WorkerProtocol findGroomServer(GroomServerStatus status) { + public GroomProtocol findGroomServer(GroomServerStatus status) { return groomServers.get(status); } @Override - public Collection findGroomServers() { + public Collection findGroomServers() { return groomServers.values(); } Index: src/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeer.java (revision 1146937) +++ src/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.apache.hama.Constants; +import org.apache.hama.ipc.BSPPeerProtocol; import org.apache.hama.util.Bytes; import org.apache.hama.zookeeper.QuorumPeer; import org.apache.zookeeper.CreateMode; Index: src/java/org/apache/hama/bsp/BSPPeerInterface.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeerInterface.java (revision 1146937) +++ src/java/org/apache/hama/bsp/BSPPeerInterface.java (working copy) @@ -21,12 +21,13 @@ import java.io.IOException; import org.apache.hama.Constants; +import org.apache.hama.ipc.HamaRPCProtocolVersion; import org.apache.zookeeper.KeeperException; /** * BSP communication interface. */ -public interface BSPPeerInterface extends BSPRPCProtocolVersion, Closeable, +public interface BSPPeerInterface extends HamaRPCProtocolVersion, Closeable, Constants { /** Index: src/java/org/apache/hama/bsp/BSPPeerProtocol.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeerProtocol.java (revision 1146937) +++ src/java/org/apache/hama/bsp/BSPPeerProtocol.java (working copy) @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.hama.Constants; - -/** - * Protocol that task child process uses to contact its parent process. - */ -public interface BSPPeerProtocol extends BSPRPCProtocolVersion, Closeable, - Constants { - - /** Called when a child task process starts, to get its task. */ - Task getTask(TaskAttemptID taskid) throws IOException; - - /** - * Periodically called by child to check if parent is still alive. - * - * @return True if the task is known - */ - boolean ping(TaskAttemptID taskid) throws IOException; - - /** - * Report that the task is successfully completed. Failure is assumed if the - * task process exits without calling this. - * - * @param taskid task's id - * @param shouldBePromoted whether to promote the task's output or not - */ - void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException; - - /** Report that the task encounted a local filesystem error. */ - void fsError(TaskAttemptID taskId, String message) throws IOException; - - void incrementSuperstepCount(TaskAttemptID taskid) throws IOException; - - /** - * @return the all BSPPeer names. - */ - PeerNames getAllPeerNames(); - -} Index: src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java =================================================================== --- src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java (revision 1146937) +++ src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java (working copy) @@ -1,35 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp; - -import org.apache.hadoop.ipc.VersionedProtocol; - -/** - * RPC Protocol version. - */ -public interface BSPRPCProtocolVersion extends VersionedProtocol { - - /** - * Interface Version History - * - * 0 - Alpha Version - */ - public static final long versionID = 0L; -} Index: src/java/org/apache/hama/bsp/BSPTask.java =================================================================== --- src/java/org/apache/hama/bsp/BSPTask.java (revision 1146937) +++ src/java/org/apache/hama/bsp/BSPTask.java (working copy) @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.ipc.BSPPeerProtocol; import org.apache.zookeeper.KeeperException; /** Index: src/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServer.java (revision 1146937) +++ src/java/org/apache/hama/bsp/GroomServer.java (working copy) @@ -54,8 +54,9 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; +import org.apache.hama.ipc.BSPPeerProtocol; import org.apache.hama.ipc.MasterProtocol; -import org.apache.hama.ipc.WorkerProtocol; +import org.apache.hama.ipc.GroomProtocol; import org.apache.log4j.LogManager; /** @@ -66,7 +67,7 @@ * storages. Basically, a groom server and a data node should be run on one * physical node. */ -public class GroomServer implements Runnable, WorkerProtocol, BSPPeerProtocol { +public class GroomServer implements Runnable, GroomProtocol, BSPPeerProtocol { public static final Log LOG = LogFactory.getLog(GroomServer.class); static final String SUBDIR = "groomServer"; @@ -798,8 +799,8 @@ @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - if (protocol.equals(WorkerProtocol.class.getName())) { - return WorkerProtocol.versionID; + if (protocol.equals(GroomProtocol.class.getName())) { + return GroomProtocol.versionID; } else if (protocol.equals(BSPPeerProtocol.class.getName())) { return BSPPeerProtocol.versionID; } else { Index: src/java/org/apache/hama/bsp/GroomServerManager.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServerManager.java (revision 1146937) +++ src/java/org/apache/hama/bsp/GroomServerManager.java (working copy) @@ -20,7 +20,7 @@ import java.util.Collection; import java.util.Map; -import org.apache.hama.ipc.WorkerProtocol; +import org.apache.hama.ipc.GroomProtocol; /** * Manages information about the {@link GroomServer}s in the cluster @@ -41,14 +41,14 @@ * @param groomId The identification value of GroomServer * @return GroomServerStatus */ - WorkerProtocol findGroomServer(GroomServerStatus status); + GroomProtocol findGroomServer(GroomServerStatus status); /** * Find the collection of groom servers. * * @return Collection of groom servers list. */ - Collection findGroomServers(); + Collection findGroomServers(); /** * Collection of GroomServerStatus as the key set. Index: src/java/org/apache/hama/bsp/SimpleTaskScheduler.java =================================================================== --- src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (revision 1146937) +++ src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (working copy) @@ -26,7 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hama.ipc.WorkerProtocol; +import org.apache.hama.ipc.GroomProtocol; /** * A simple task scheduler. @@ -134,7 +134,7 @@ // assembly into actions // List tasks = new ArrayList(); if (jip.getStatus().getRunState() == JobStatus.RUNNING) { - WorkerProtocol worker = groomServerManager.findGroomServer(this.stus); + GroomProtocol worker = groomServerManager.findGroomServer(this.stus); try { // dispatch() to the groom server Directive d1 = new DispatchTasksDirective(groomServerManager Index: src/java/org/apache/hama/bsp/Task.java =================================================================== --- src/java/org/apache/hama/bsp/Task.java (revision 1146937) +++ src/java/org/apache/hama/bsp/Task.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hama.ipc.BSPPeerProtocol; /** * Base class for tasks. Index: src/java/org/apache/hama/ipc/BSPPeerProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/BSPPeerProtocol.java (revision 0) +++ src/java/org/apache/hama/ipc/BSPPeerProtocol.java (revision 0) @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ipc; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hama.Constants; +import org.apache.hama.bsp.PeerNames; +import org.apache.hama.bsp.Task; +import org.apache.hama.bsp.TaskAttemptID; + +/** + * Protocol that task child process uses to contact its parent process. + */ +public interface BSPPeerProtocol extends HamaRPCProtocolVersion, Closeable, + Constants { + + /** Called when a child task process starts, to get its task. */ + Task getTask(TaskAttemptID taskid) throws IOException; + + /** + * Periodically called by child to check if parent is still alive. + * + * @return True if the task is known + */ + boolean ping(TaskAttemptID taskid) throws IOException; + + /** + * Report that the task is successfully completed. Failure is assumed if the + * task process exits without calling this. + * + * @param taskid task's id + * @param shouldBePromoted whether to promote the task's output or not + */ + void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException; + + /** Report that the task encounted a local filesystem error. */ + void fsError(TaskAttemptID taskId, String message) throws IOException; + + void incrementSuperstepCount(TaskAttemptID taskid) throws IOException; + + /** + * @return the all BSPPeer names. + */ + PeerNames getAllPeerNames(); + +} Index: src/java/org/apache/hama/ipc/GroomProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/GroomProtocol.java (revision 0) +++ src/java/org/apache/hama/ipc/GroomProtocol.java (revision 0) @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ipc; + +import java.io.IOException; + +import org.apache.hama.bsp.Directive; + +/** + * A protocol for BSPMaster talks to GroomServer. + */ +public interface GroomProtocol extends HamaRPCProtocolVersion { + + /** + * Instruct GroomServer performaning tasks. + * + * @param directive instructs a GroomServer performing necessary + * execution. + * @throws IOException + */ + void dispatch(Directive directive) throws IOException; + +} Index: src/java/org/apache/hama/ipc/WorkerProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/WorkerProtocol.java (revision 1146937) +++ src/java/org/apache/hama/ipc/WorkerProtocol.java (working copy) @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.ipc; - -import java.io.IOException; - -import org.apache.hama.bsp.Directive; - -/** - * A protocol for BSPMaster talks to GroomServer. - */ -public interface WorkerProtocol extends HamaRPCProtocolVersion { - - /** - * Instruct GroomServer performaning tasks. - * - * @param directive instructs a GroomServer performing necessary - * execution. - * @throws IOException - */ - void dispatch(Directive directive) throws IOException; - -}