Index: bin/hama
===================================================================
--- bin/hama (리비전 934620)
+++ bin/hama (작업 사본)
@@ -162,10 +162,10 @@
# figure out which class to run
if [ "$COMMAND" = "bspmaster" ] ; then
- CLASS='org.apache.hama.HamaMaster'
+ CLASS='org.apache.hama.bsp.BSPMaster'
BSP_OPTS="$BSP_OPTS $BSP_BSPMASTER_OPTS"
elif [ "$COMMAND" = "groom" ] ; then
- CLASS='org.apache.hama.graph.GroomServer'
+ CLASS='org.apache.hama.bsp.GroomServer'
BSP_OPTS="$BSP_OPTS $BSP_GROOMSERVER_OPTS"
elif [ "$COMMAND" = "version" ] ; then
CLASS=org.apache.bsp.util.VersionInfo
Index: conf/hama-default.xml
===================================================================
--- conf/hama-default.xml (리비전 934620)
+++ conf/hama-default.xml (작업 사본)
@@ -31,7 +31,19 @@
hama.groom.port
40020
- The port an groom server binds to.
+ The port an groom server binds to.
+
+
+
+ bspd.system.dir
+ ${hadoop.tmp.dir}/bsp/system
+ The shared directory where BSP stores control files.
+
+
+ bspd.groom.local.dir
+ ${hadoop.tmp.dir}/groomserver/local
+ local directory for temporal store
+
\ No newline at end of file
Index: src/java/groomserver-default.xml
===================================================================
--- src/java/groomserver-default.xml (리비전 934620)
+++ src/java/groomserver-default.xml (작업 사본)
@@ -1,13 +0,0 @@
-
-
-
-
-
-
-
-
-
- hama.groomserver.local.dir
- ${hadoop.tmp.dir}/groomserver/local
-
-
Index: src/java/org/apache/hama/HamaMaster.java
===================================================================
--- src/java/org/apache/hama/HamaMaster.java (리비전 934620)
+++ src/java/org/apache/hama/HamaMaster.java (작업 사본)
@@ -1,215 +0,0 @@
-/**
- * Copyright 2009 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.graph.JobID;
-import org.apache.hama.graph.JobStatus;
-import org.apache.hama.ipc.HeartbeatResponse;
-import org.apache.hama.ipc.InterTrackerProtocol;
-import org.apache.hama.ipc.JobSubmissionProtocol;
-
-public class HamaMaster implements JobSubmissionProtocol, InterTrackerProtocol {
- static{
- Configuration.addDefaultResource("groomserver-default.xml");
- }
-
- public static final Log LOG = LogFactory.getLog(HamaMaster.class);
-
- private HamaConfiguration conf;
- public static enum State { INITIALIZING, RUNNING }
- State state = State.INITIALIZING;
-
- String masterIdentifier;
-
- private Server interTrackerServer;
-
- FileSystem fs = null;
- Path systemDir = null;
-
- // system directories are world-wide readable and owner readable
- final static FsPermission SYSTEM_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0733); // rwx-wx-wx
-
- // system files should have 700 permission
- final static FsPermission SYSTEM_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0700); // rwx------
-
- private static final int FS_ACCESS_RETRY_PERIOD = 10000;
-
- private int nextJobId = 1;
-
- public HamaMaster(HamaConfiguration conf, String identifier) throws IOException, InterruptedException {
- this.conf = conf;
-
- this.masterIdentifier = identifier;
-
- InetSocketAddress addr = getAddress(conf);
- this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), conf);
-
- while (!Thread.currentThread().isInterrupted()) {
- try {
- if (fs == null) {
- fs = FileSystem.get(conf);
- }
- // clean up the system dir, which will only work if hdfs is out of
- // safe mode
- if(systemDir == null) {
- systemDir = new Path(getSystemDir());
- }
-
- LOG.info("Cleaning up the system directory");
- fs.delete(systemDir, true);
- if (FileSystem.mkdirs(fs, systemDir,
- new FsPermission(SYSTEM_DIR_PERMISSION))) {
- break;
- }
- LOG.error("Mkdirs failed to create " + systemDir);
-
- } catch (AccessControlException ace) {
- LOG.warn("Failed to operate on mapred.system.dir (" + systemDir
- + ") because of permissions.");
- LOG.warn("Manually delete the mapred.system.dir (" + systemDir
- + ") and then start the JobTracker.");
- LOG.warn("Bailing out ... ");
- throw ace;
- } catch (IOException ie) {
- LOG.info("problem cleaning system directory: " + systemDir, ie);
- }
- Thread.sleep(FS_ACCESS_RETRY_PERIOD);
- }
-
- // deleteLocalFiles(SUBDIR);
- }
-
- public static HamaMaster startMaster(HamaConfiguration conf) throws IOException,
- InterruptedException {
- return startTracker(conf, generateNewIdentifier());
- }
-
- public static HamaMaster startTracker(HamaConfiguration conf, String identifier)
- throws IOException, InterruptedException {
-
- HamaMaster result = null;
- result = new HamaMaster(conf, identifier);
-
- return result;
- }
-
- public static InetSocketAddress getAddress(Configuration conf) {
- String hamaMasterStr = conf.get("hama.master.address", "localhost:40000");
- return NetUtils.createSocketAddr(hamaMasterStr);
- }
-
- public int getPort() {
- return this.conf.getInt("hama.master.port", 0);
- }
-
- public HamaConfiguration getConfiguration() {
- return this.conf;
- }
-
- private static SimpleDateFormat getDateFormat() {
- return new SimpleDateFormat("yyyyMMddHHmm");
- }
-
- private static String generateNewIdentifier() {
- return getDateFormat().format(new Date());
- }
-
- public void offerService() throws InterruptedException, IOException {
- this.interTrackerServer.start();
-
- synchronized (this) {
- state = State.RUNNING;
- }
- LOG.info("Starting RUNNING");
-
- this.interTrackerServer.join();
- LOG.info("Stopped interTrackerServer");
- }
-
-
- public static void main(String [] args) {
- StringUtils.startupShutdownMessage(HamaMaster.class, args, LOG);
- if (args.length != 0) {
- System.out.println("usage: HamaMaster");
- System.exit(-1);
- }
-
- try {
- HamaConfiguration conf = new HamaConfiguration();
- HamaMaster master = startMaster(conf);
- master.offerService();
- } catch (Throwable e) {
- LOG.fatal(StringUtils.stringifyException(e));
- System.exit(-1);
- }
- }
-
- @Override
- public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
- if (protocol.equals(InterTrackerProtocol.class.getName())) {
- return InterTrackerProtocol.versionID;
- } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
- return JobSubmissionProtocol.versionID;
- } else {
- throw new IOException("Unknown protocol to job tracker: " + protocol);
- }
- }
-
- @Override
- public HeartbeatResponse heartbeat(short responseId) {
- LOG.debug(">>> return the heartbeat message.");
- return new HeartbeatResponse((short)1);
- }
-
- @Override
- public String getSystemDir() {
- Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
- return fs.makeQualified(sysDir).toString();
- }
-
- @Override
- public JobID getNewJobId() throws IOException {
- return new JobID(this.masterIdentifier, nextJobId++);
- }
-
- @Override
- public JobStatus submitJob(JobID jobName) throws IOException {
-
- return null;
- }
-}
Index: src/java/org/apache/hama/bsp/BSPMaster.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 0)
+++ src/java/org/apache/hama/bsp/BSPMaster.java (리비전 0)
@@ -0,0 +1,235 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.HeartbeatResponse;
+import org.apache.hama.ipc.InterTrackerProtocol;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+
+/**
+ * BSPMaster is responsible to control all the bsp peers and to manage bsp jobs.
+ */
+public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol {
+ static {
+ Configuration.addDefaultResource("hama-default.xml");
+ }
+
+ public static final Log LOG = LogFactory.getLog(BSPMaster.class);
+
+ private HamaConfiguration conf;
+
+ public static enum State {
+ INITIALIZING, RUNNING
+ }
+
+ State state = State.INITIALIZING;
+
+ String masterIdentifier;
+
+ private Server interTrackerServer;
+
+ FileSystem fs = null;
+ Path systemDir = null;
+
+ // system directories are world-wide readable and owner readable
+ final static FsPermission SYSTEM_DIR_PERMISSION = FsPermission
+ .createImmutable((short) 0733); // rwx-wx-wx
+
+ // system files should have 700 permission
+ final static FsPermission SYSTEM_FILE_PERMISSION = FsPermission
+ .createImmutable((short) 0700); // rwx------
+
+ private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+
+ private int nextJobId = 1;
+
+ public BSPMaster(HamaConfiguration conf, String identifier)
+ throws IOException, InterruptedException {
+ this.conf = conf;
+
+ this.masterIdentifier = identifier;
+
+ InetSocketAddress addr = getAddress(conf);
+ this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr
+ .getPort(), conf);
+
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ if (fs == null) {
+ fs = FileSystem.get(conf);
+ }
+ // clean up the system dir, which will only work if hdfs is out of
+ // safe mode
+ if (systemDir == null) {
+ systemDir = new Path(getSystemDir());
+ }
+
+ LOG.info("Cleaning up the system directory");
+ fs.delete(systemDir, true);
+ if (FileSystem.mkdirs(fs, systemDir, new FsPermission(
+ SYSTEM_DIR_PERMISSION))) {
+ break;
+ }
+ LOG.error("Mkdirs failed to create " + systemDir);
+
+ } catch (AccessControlException ace) {
+ LOG.warn("Failed to operate on bspd.system.dir (" + systemDir
+ + ") because of permissions.");
+ LOG.warn("Manually delete the bspd.system.dir (" + systemDir
+ + ") and then start the JobTracker.");
+ LOG.warn("Bailing out ... ");
+ throw ace;
+ } catch (IOException ie) {
+ LOG.info("problem cleaning system directory: " + systemDir, ie);
+ }
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+ }
+
+ // deleteLocalFiles(SUBDIR);
+ }
+
+ public static BSPMaster startMaster(HamaConfiguration conf)
+ throws IOException, InterruptedException {
+ return startTracker(conf, generateNewIdentifier());
+ }
+
+ public static BSPMaster startTracker(HamaConfiguration conf, String identifier)
+ throws IOException, InterruptedException {
+
+ BSPMaster result = null;
+ result = new BSPMaster(conf, identifier);
+
+ return result;
+ }
+
+ public static InetSocketAddress getAddress(Configuration conf) {
+ String hamaMasterStr = conf.get("hama.master.address", "localhost:40000");
+ return NetUtils.createSocketAddr(hamaMasterStr);
+ }
+
+ public int getPort() {
+ return this.conf.getInt("hama.master.port", 0);
+ }
+
+ public HamaConfiguration getConfiguration() {
+ return this.conf;
+ }
+
+ private static SimpleDateFormat getDateFormat() {
+ return new SimpleDateFormat("yyyyMMddHHmm");
+ }
+
+ /**
+ *
+ * @return
+ */
+ private static String generateNewIdentifier() {
+ return getDateFormat().format(new Date());
+ }
+
+ public void offerService() throws InterruptedException, IOException {
+ this.interTrackerServer.start();
+
+ synchronized (this) {
+ state = State.RUNNING;
+ }
+ LOG.info("Starting RUNNING");
+
+ this.interTrackerServer.join();
+ LOG.info("Stopped interTrackerServer");
+ }
+
+ public static void main(String[] args) {
+ StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
+ if (args.length != 0) {
+ System.out.println("usage: HamaMaster");
+ System.exit(-1);
+ }
+
+ try {
+ HamaConfiguration conf = new HamaConfiguration();
+ BSPMaster master = startMaster(conf);
+ master.offerService();
+ } catch (Throwable e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ if (protocol.equals(InterTrackerProtocol.class.getName())) {
+ return InterTrackerProtocol.versionID;
+ } else if (protocol.equals(JobSubmissionProtocol.class.getName())) {
+ return JobSubmissionProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol to job tracker: " + protocol);
+ }
+ }
+
+ /**
+ * A RPC method for transmitting each peer status from peer to master.
+ */
+ @Override
+ public HeartbeatResponse heartbeat(short responseId) {
+ LOG.debug(">>> return the heartbeat message.");
+ return new HeartbeatResponse((short) 1);
+ }
+
+ /**
+ * Return system directory to which BSP store control files.
+ */
+ @Override
+ public String getSystemDir() {
+ Path sysDir = new Path(conf
+ .get("bspd.system.dir", "/tmp/hadoop/bsp/system"));
+ return fs.makeQualified(sysDir).toString();
+ }
+
+ /**
+ * This method returns new job id. The returned job id increases sequentially.
+ */
+ public JobID getNewJobId() throws IOException {
+ return new JobID(this.masterIdentifier, nextJobId++);
+ }
+
+ public JobStatus submitJob(JobID jobName) throws IOException {
+
+ return null;
+ }
+}
Index: src/java/org/apache/hama/bsp/GroomServer.java
===================================================================
--- src/java/org/apache/hama/bsp/GroomServer.java (리비전 0)
+++ src/java/org/apache/hama/bsp/GroomServer.java (리비전 0)
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.HeartbeatResponse;
+import org.apache.hama.ipc.InterTrackerProtocol;
+
+public class GroomServer implements Runnable {
+ public static final Log LOG = LogFactory.getLog(GroomServer.class);
+
+ static {
+ Configuration.addDefaultResource("hama-default.xml");
+ }
+
+ static enum State {
+ NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
+ };
+
+ HamaConfiguration conf;
+
+ volatile boolean running = true;
+ volatile boolean shuttingDown = false;
+ boolean justInited = true;
+
+ String groomserverName;
+ String localHostname;
+
+ InetSocketAddress masterAddr;
+ InterTrackerProtocol jobClient;
+ BSPPeer bspPeer;
+
+ short heartbeatResponseId = -1;
+ private volatile int heartbeatInterval = 3 * 1000;
+
+ private LocalDirAllocator localDirAllocator;
+ Path systemDirectory = null;
+ FileSystem systemFS = null;
+
+ public GroomServer(HamaConfiguration conf) throws IOException {
+ this.conf = conf;
+ masterAddr = BSPMaster.getAddress(conf);
+
+ FileSystem local = FileSystem.getLocal(conf);
+ this.localDirAllocator = new LocalDirAllocator("bspd.groom.local.dir");
+
+ initialize();
+ }
+
+ synchronized void initialize() throws IOException {
+ if (this.conf.get("slave.host.name") != null) {
+ this.localHostname = conf.get("slave.host.name");
+ }
+
+ if (localHostname == null) {
+ this.localHostname = DNS.getDefaultHost(conf.get(
+ "bspd.groom.dns.interface", "default"), conf.get(
+ "bspd.groom.dns.nameserver", "default"));
+ }
+
+ checkLocalDirs(conf.getStrings("bspd.groom.local.dir"));
+ deleteLocalFiles("groomserver");
+
+ this.groomserverName = "groomd_" + localHostname;
+ LOG.info("Starting tracker " + this.groomserverName);
+
+ DistributedCache.purgeCache(this.conf);
+
+ this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(
+ InterTrackerProtocol.class, InterTrackerProtocol.versionID, masterAddr,
+ conf);
+ this.running = true;
+ // this.bspPeer = new BSPPeer(this.conf);
+ }
+
+ private static void checkLocalDirs(String[] localDirs)
+ throws DiskErrorException {
+ boolean writable = false;
+
+ if (localDirs != null) {
+ for (int i = 0; i < localDirs.length; i++) {
+ try {
+ DiskChecker.checkDir(new File(localDirs[i]));
+ writable = true;
+ } catch (DiskErrorException e) {
+ LOG.warn("Graph Processor local " + e.getMessage());
+ }
+ }
+ }
+
+ if (!writable)
+ throw new DiskErrorException("all local directories are not writable");
+ }
+
+ public String[] getLocalDirs() {
+ return conf.getStrings("bspd.groom.local.dir");
+ }
+
+ public void deleteLocalFiles() throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true);
+ }
+ }
+
+ public void deleteLocalFiles(String subdir) throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir), true);
+ }
+ }
+
+ public void cleanupStorage() throws IOException {
+ deleteLocalFiles();
+ }
+
+ private void startCleanupThreads() throws IOException {
+
+ }
+
+ public State offerService() throws Exception {
+ long lastHeartbeat = 0;
+
+ while (running && !shuttingDown) {
+ try {
+ long now = System.currentTimeMillis();
+
+ long waitTime = heartbeatInterval - (now - lastHeartbeat);
+ if (waitTime > 0) {
+ // sleeps for the wait time
+ Thread.sleep(waitTime);
+ }
+
+ if (justInited) {
+ String dir = jobClient.getSystemDir();
+ if (dir == null) {
+ throw new IOException("Failed to get system directory");
+ }
+ systemDirectory = new Path(dir);
+ systemFS = systemDirectory.getFileSystem(conf);
+ }
+
+ // Send the heartbeat and process the jobtracker's directives
+ HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+
+ // Note the time when the heartbeat returned, use this to decide when to
+ // send the
+ // next heartbeat
+ lastHeartbeat = System.currentTimeMillis();
+
+ justInited = false;
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted. Closing down.");
+ return State.INTERRUPTED;
+ } catch (DiskErrorException de) {
+ String msg = "Exiting task tracker for disk error:\n"
+ + StringUtils.stringifyException(de);
+ LOG.error(msg);
+
+ return State.STALE;
+ } catch (RemoteException re) {
+ return State.DENIED;
+ } catch (Exception except) {
+ String msg = "Caught exception: "
+ + StringUtils.stringifyException(except);
+ LOG.error(msg);
+ }
+ }
+
+ return State.NORMAL;
+ }
+
+ private class WalkerLauncher extends Thread {
+ // TODO:
+ }
+
+ private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+ HeartbeatResponse heartbeatResponse = jobClient
+ .heartbeat(heartbeatResponseId);
+ return heartbeatResponse;
+ }
+
+ public void run() {
+ try {
+ startCleanupThreads();
+ boolean denied = false;
+ while (running && !shuttingDown && !denied) {
+ boolean staleState = false;
+ try {
+ while (running && !staleState && !shuttingDown && !denied) {
+ try {
+ State osState = offerService();
+ if (osState == State.STALE) {
+ staleState = true;
+ } else if (osState == State.DENIED) {
+ denied = true;
+ }
+ } catch (Exception e) {
+ if (!shuttingDown) {
+ LOG.info("Lost connection to GraphProcessor [" + masterAddr
+ + "]. Retrying...", e);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+ } finally {
+ // close();
+ }
+
+ if (shuttingDown) {
+ return;
+ }
+ LOG.warn("Reinitializing local state");
+ initialize();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Got fatal exception while reinitializing TaskTracker: "
+ + StringUtils.stringifyException(ioe));
+ return;
+ }
+ }
+
+ public synchronized void shutdown() throws IOException {
+ shuttingDown = true;
+ close();
+ }
+
+ public synchronized void close() throws IOException {
+ this.running = false;
+
+ cleanupStorage();
+
+ // shutdown RPC connections
+ RPC.stopProxy(jobClient);
+ }
+
+ public static void main(String[] args) {
+ StringUtils.startupShutdownMessage(GroomServer.class, args, LOG);
+ if (args.length != 0) {
+ System.out.println("usage: GroomServer");
+ System.exit(-1);
+ }
+
+ try {
+ HamaConfiguration conf = new HamaConfiguration();
+ new GroomServer(conf).run();
+ } catch (Throwable e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
+}
Index: src/java/org/apache/hama/bsp/ID.java
===================================================================
--- src/java/org/apache/hama/bsp/ID.java (리비전 934620)
+++ src/java/org/apache/hama/bsp/ID.java (작업 사본)
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hama.graph;
+package org.apache.hama.bsp;
import java.io.DataInput;
import java.io.DataOutput;
Index: src/java/org/apache/hama/bsp/JobID.java
===================================================================
--- src/java/org/apache/hama/bsp/JobID.java (리비전 934620)
+++ src/java/org/apache/hama/bsp/JobID.java (작업 사본)
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hama.graph;
+package org.apache.hama.bsp;
import java.io.DataInput;
Index: src/java/org/apache/hama/bsp/JobStatus.java
===================================================================
--- src/java/org/apache/hama/bsp/JobStatus.java (리비전 934620)
+++ src/java/org/apache/hama/bsp/JobStatus.java (작업 사본)
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hama.graph;
+package org.apache.hama.bsp;
import java.io.DataInput;
Index: src/java/org/apache/hama/bsp/TaskAttemptID.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskAttemptID.java (리비전 934620)
+++ src/java/org/apache/hama/bsp/TaskAttemptID.java (작업 사본)
@@ -16,12 +16,14 @@
* limitations under the License.
*/
-package org.apache.hama.graph;
+package org.apache.hama.bsp;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hama.bsp.ID;
+
public class TaskAttemptID extends ID {
protected static final String ATTEMPT = "attempt";
private TaskID taskId;
Index: src/java/org/apache/hama/bsp/TaskID.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskID.java (리비전 934620)
+++ src/java/org/apache/hama/bsp/TaskID.java (작업 사본)
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hama.graph;
+package org.apache.hama.bsp;
import java.io.DataInput;
import java.io.DataOutput;
Index: src/java/org/apache/hama/graph/GroomServer.java
===================================================================
--- src/java/org/apache/hama/graph/GroomServer.java (리비전 934620)
+++ src/java/org/apache/hama/graph/GroomServer.java (작업 사본)
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.graph;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.HamaMaster;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.ipc.HeartbeatResponse;
-import org.apache.hama.ipc.InterTrackerProtocol;
-
-public class GroomServer implements Runnable {
- public static final Log LOG = LogFactory.getLog(GroomServer.class);
-
- static {
- Configuration.addDefaultResource("groomserver-default.xml");
- }
-
- static enum State {
- NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
- };
-
- HamaConfiguration conf;
-
- volatile boolean running = true;
- volatile boolean shuttingDown = false;
- boolean justInited = true;
-
- String groomserverName;
- String localHostname;
-
- InetSocketAddress masterAddr;
- InterTrackerProtocol jobClient;
- BSPPeer bspPeer;
-
- short heartbeatResponseId = -1;
- private volatile int heartbeatInterval = 3 * 1000;
-
- private LocalDirAllocator localDirAllocator;
- Path systemDirectory = null;
- FileSystem systemFS = null;
-
- public GroomServer(HamaConfiguration conf) throws IOException {
- this.conf = conf;
- masterAddr = HamaMaster.getAddress(conf);
-
- FileSystem local = FileSystem.getLocal(conf);
- this.localDirAllocator = new LocalDirAllocator("hama.groomserver.local.dir");
-
- initialize();
- }
-
- synchronized void initialize() throws IOException {
- if (this.conf.get("slave.host.name") != null) {
- this.localHostname = conf.get("slave.host.name");
- }
-
- if (localHostname == null) {
- this.localHostname = DNS.getDefaultHost(conf.get(
- "hama.groomserver.dns.interface", "default"), conf.get(
- "hama.groomserver.dns.nameserver", "default"));
- }
-
- checkLocalDirs(conf.getStrings("hama.groomserver.local.dir"));
- deleteLocalFiles("groomserver");
-
- this.groomserverName = "groomserver_" + localHostname;
- LOG.info("Starting tracker " + this.groomserverName);
-
- DistributedCache.purgeCache(this.conf);
-
- this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(
- InterTrackerProtocol.class, InterTrackerProtocol.versionID, masterAddr,
- conf);
- this.running = true;
- // this.bspPeer = new BSPPeer(this.conf);
- }
-
- private static void checkLocalDirs(String[] localDirs)
- throws DiskErrorException {
- boolean writable = false;
-
- if (localDirs != null) {
- for (int i = 0; i < localDirs.length; i++) {
- try {
- DiskChecker.checkDir(new File(localDirs[i]));
- writable = true;
- } catch (DiskErrorException e) {
- LOG.warn("Graph Processor local " + e.getMessage());
- }
- }
- }
-
- if (!writable)
- throw new DiskErrorException("all local directories are not writable");
- }
-
- public String[] getLocalDirs() {
- return conf.getStrings("hama.groomserver.local.dir");
- }
-
- public void deleteLocalFiles() throws IOException {
- String[] localDirs = getLocalDirs();
- for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]));
- }
- }
-
- public void deleteLocalFiles(String subdir) throws IOException {
- String[] localDirs = getLocalDirs();
- for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir));
- }
- }
-
- public void cleanupStorage() throws IOException {
- deleteLocalFiles();
- }
-
- private void startCleanupThreads() throws IOException {
-
- }
-
- public State offerService() throws Exception {
- long lastHeartbeat = 0;
-
- while (running && !shuttingDown) {
- try {
- long now = System.currentTimeMillis();
-
- long waitTime = heartbeatInterval - (now - lastHeartbeat);
- if (waitTime > 0) {
- // sleeps for the wait time
- Thread.sleep(waitTime);
- }
-
- if (justInited) {
- String dir = jobClient.getSystemDir();
- if (dir == null) {
- throw new IOException("Failed to get system directory");
- }
- systemDirectory = new Path(dir);
- systemFS = systemDirectory.getFileSystem(conf);
- }
-
- // Send the heartbeat and process the jobtracker's directives
- HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
-
- // Note the time when the heartbeat returned, use this to decide when to
- // send the
- // next heartbeat
- lastHeartbeat = System.currentTimeMillis();
-
- justInited = false;
- } catch (InterruptedException ie) {
- LOG.info("Interrupted. Closing down.");
- return State.INTERRUPTED;
- } catch (DiskErrorException de) {
- String msg = "Exiting task tracker for disk error:\n"
- + StringUtils.stringifyException(de);
- LOG.error(msg);
-
- return State.STALE;
- } catch (RemoteException re) {
- return State.DENIED;
- } catch (Exception except) {
- String msg = "Caught exception: "
- + StringUtils.stringifyException(except);
- LOG.error(msg);
- }
- }
-
- return State.NORMAL;
- }
-
- private class WalkerLauncher extends Thread {
- // TODO:
- }
-
- private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
- HeartbeatResponse heartbeatResponse = jobClient
- .heartbeat(heartbeatResponseId);
- return heartbeatResponse;
- }
-
- public void run() {
- try {
- startCleanupThreads();
- boolean denied = false;
- while (running && !shuttingDown && !denied) {
- boolean staleState = false;
- try {
- while (running && !staleState && !shuttingDown && !denied) {
- try {
- State osState = offerService();
- if (osState == State.STALE) {
- staleState = true;
- } else if (osState == State.DENIED) {
- denied = true;
- }
- } catch (Exception e) {
- if (!shuttingDown) {
- LOG.info("Lost connection to GraphProcessor [" + masterAddr
- + "]. Retrying...", e);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
- } finally {
- // close();
- }
-
- if (shuttingDown) {
- return;
- }
- LOG.warn("Reinitializing local state");
- initialize();
- }
- } catch (IOException ioe) {
- LOG.error("Got fatal exception while reinitializing TaskTracker: "
- + StringUtils.stringifyException(ioe));
- return;
- }
- }
-
- public synchronized void shutdown() throws IOException {
- shuttingDown = true;
- close();
- }
-
- public synchronized void close() throws IOException {
- this.running = false;
-
- cleanupStorage();
-
- // shutdown RPC connections
- RPC.stopProxy(jobClient);
- }
-
- public static void main(String[] args) {
- StringUtils.startupShutdownMessage(GroomServer.class, args, LOG);
- if (args.length != 0) {
- System.out.println("usage: GroomServer");
- System.exit(-1);
- }
-
- try {
- HamaConfiguration conf = new HamaConfiguration();
- new GroomServer(conf).run();
- } catch (Throwable e) {
- LOG.fatal(StringUtils.stringifyException(e));
- System.exit(-1);
- }
- }
-}
Index: src/java/org/apache/hama/graph/ID.java
===================================================================
--- src/java/org/apache/hama/graph/ID.java (리비전 934620)
+++ src/java/org/apache/hama/graph/ID.java (작업 사본)
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.graph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-public abstract class ID implements WritableComparable {
- protected static final char SEPARATOR = '_';
- protected int id;
-
- public ID(int id) {
- this.id = id;
- }
-
- protected ID() {
- }
-
- public int getId() {
- return id;
- }
-
- @Override
- public String toString() {
- return String.valueOf(id);
- }
-
- @Override
- public int hashCode() {
- return Integer.valueOf(id).hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null)
- return false;
- if (o.getClass() == this.getClass()) {
- ID that = (ID) o;
- return this.id == that.id;
- } else
- return false;
- }
-
- public int compareTo(ID that) {
- return this.id - that.id;
- }
-
- public void readFields(DataInput in) throws IOException {
- this.id = in.readInt();
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeInt(id);
- }
-}
Index: src/java/org/apache/hama/graph/JobClient.java
===================================================================
--- src/java/org/apache/hama/graph/JobClient.java (리비전 934620)
+++ src/java/org/apache/hama/graph/JobClient.java (작업 사본)
@@ -27,7 +27,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.HamaMaster;
+import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.ipc.JobSubmissionProtocol;
public class JobClient extends Configured {
@@ -38,7 +38,7 @@
}
static {
- Configuration.addDefaultResource("groomserver-default.xml");
+ Configuration.addDefaultResource("hama-default.xml");
}
private JobSubmissionProtocol jobSubmitClient;
@@ -53,7 +53,7 @@
public void init(HamaConfiguration conf) throws IOException {
String tracker = conf.get("hama.master.address", "local");
- this.jobSubmitClient = createRPCProxy(HamaMaster.getAddress(conf), conf);
+ this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
}
private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
Index: src/java/org/apache/hama/graph/JobContext.java
===================================================================
--- src/java/org/apache/hama/graph/JobContext.java (리비전 934620)
+++ src/java/org/apache/hama/graph/JobContext.java (작업 사본)
@@ -26,6 +26,7 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.JobID;
/**
* A read-only view of the job that is provided to the tasks while they are
Index: src/java/org/apache/hama/graph/JobID.java
===================================================================
--- src/java/org/apache/hama/graph/JobID.java (리비전 934620)
+++ src/java/org/apache/hama/graph/JobID.java (작업 사본)
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.graph;
-
-import java.io.DataInput;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.io.Text;
-
-public class JobID extends ID implements Comparable {
- protected static final String JOB = "job";
- private final Text jtIdentifier;
-
- protected static final NumberFormat idFormat = NumberFormat.getInstance();
- static {
- idFormat.setGroupingUsed(false);
- idFormat.setMinimumIntegerDigits(4);
- }
-
- public JobID(String jtIdentifier, int id) {
- super(id);
- this.jtIdentifier = new Text(jtIdentifier);
- }
-
- public JobID() {
- jtIdentifier = new Text();
- }
-
- public String getJtIdentifier() {
- return jtIdentifier.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if (!super.equals(o))
- return false;
-
- JobID that = (JobID) o;
- return this.jtIdentifier.equals(that.jtIdentifier);
- }
-
- @Override
- public int compareTo(ID o) {
- JobID that = (JobID) o;
- int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
- if (jtComp == 0) {
- return this.id - that.id;
- } else
- return jtComp;
- }
-
- public StringBuilder appendTo(StringBuilder builder) {
- builder.append(SEPARATOR);
- builder.append(jtIdentifier);
- builder.append(SEPARATOR);
- builder.append(idFormat.format(id));
- return builder;
- }
-
- @Override
- public int hashCode() {
- return jtIdentifier.hashCode() + id;
- }
-
- @Override
- public String toString() {
- return appendTo(new StringBuilder(JOB)).toString();
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- this.jtIdentifier.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- jtIdentifier.write(out);
- }
-
- public static JobID forName(String str) throws IllegalArgumentException {
- if (str == null)
- return null;
- try {
- String[] parts = str.split("_");
- if (parts.length == 3) {
- if (parts[0].equals(JOB)) {
- return new JobID(parts[1], Integer.parseInt(parts[2]));
- }
- }
- } catch (Exception ex) {
- }
- throw new IllegalArgumentException("JobId string : " + str
- + " is not properly formed");
- }
-}
Index: src/java/org/apache/hama/graph/JobStatus.java
===================================================================
--- src/java/org/apache/hama/graph/JobStatus.java (리비전 934620)
+++ src/java/org/apache/hama/graph/JobStatus.java (작업 사본)
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.graph;
-
-import java.io.DataInput;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-
-public class JobStatus implements Writable, Cloneable {
-
- static {
- WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
- public Writable newInstance() {
- return new JobStatus();
- }
- });
- }
-
- public static final int RUNNING = 1;
- public static final int SUCCEEDED = 2;
- public static final int FAILED = 3;
- public static final int PREP = 4;
- public static final int KILLED = 5;
-
- private JobID jobid;
- private float progress;
- private float cleanupProgress;
- private float setupProgress;
- private int runState;
- private long startTime;
- private String schedulingInfo = "NA";
-
- public JobStatus() {
- }
-
- public JobStatus(JobID jobid, float progress, int runState) {
- this(jobid, progress, 0.0f, runState);
- }
-
- public JobStatus(JobID jobid, float progress, float cleanupProgress,
- int runState) {
- this(jobid, 0.0f, progress, cleanupProgress, runState);
- }
-
- public JobStatus(JobID jobid, float setupProgress, float progress,
- float cleanupProgress, int runState) {
- this.jobid = jobid;
- this.setupProgress = setupProgress;
- this.progress = progress;
- this.cleanupProgress = cleanupProgress;
- this.runState = runState;
- }
-
- public JobID getJobID() {
- return jobid;
- }
-
- public synchronized float progress() {
- return progress;
- }
-
- synchronized void setprogress(float p) {
- this.progress = (float) Math.min(1.0, Math.max(0.0, p));
- }
-
- public synchronized float cleanupProgress() {
- return cleanupProgress;
- }
-
- synchronized void setCleanupProgress(float p) {
- this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p));
- }
-
- public synchronized float setupProgress() {
- return setupProgress;
- }
-
- synchronized void setSetupProgress(float p) {
- this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p));
- }
-
- public synchronized int getRunState() {
- return runState;
- }
-
- public synchronized void setRunState(int state) {
- this.runState = state;
- }
-
- synchronized void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- synchronized public long getStartTime() {
- return startTime;
- }
-
- @Override
- public Object clone() {
- try {
- return super.clone();
- } catch (CloneNotSupportedException cnse) {
- throw new InternalError(cnse.toString());
- }
- }
-
- public synchronized String getSchedulingInfo() {
- return schedulingInfo;
- }
-
- public synchronized void setSchedulingInfo(String schedulingInfo) {
- this.schedulingInfo = schedulingInfo;
- }
-
- public synchronized boolean isJobComplete() {
- return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED);
- }
-
- public synchronized void write(DataOutput out) throws IOException {
- jobid.write(out);
- out.writeFloat(setupProgress);
- out.writeFloat(progress);
- out.writeFloat(cleanupProgress);
- out.writeInt(runState);
- out.writeLong(startTime);
- Text.writeString(out, schedulingInfo);
- }
-
- public synchronized void readFields(DataInput in) throws IOException {
- this.jobid = new JobID();
- jobid.readFields(in);
- this.setupProgress = in.readFloat();
- this.progress = in.readFloat();
- this.cleanupProgress = in.readFloat();
- this.runState = in.readInt();
- this.startTime = in.readLong();
- this.schedulingInfo = Text.readString(in);
- }
-}
Index: src/java/org/apache/hama/graph/TaskAttemptContext.java
===================================================================
--- src/java/org/apache/hama/graph/TaskAttemptContext.java (리비전 934620)
+++ src/java/org/apache/hama/graph/TaskAttemptContext.java (작업 사본)
@@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
+import org.apache.hama.bsp.TaskAttemptID;
/**
* The context for task attempts.
Index: src/java/org/apache/hama/graph/TaskAttemptID.java
===================================================================
--- src/java/org/apache/hama/graph/TaskAttemptID.java (리비전 934620)
+++ src/java/org/apache/hama/graph/TaskAttemptID.java (작업 사본)
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hama.graph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class TaskAttemptID extends ID {
- protected static final String ATTEMPT = "attempt";
- private TaskID taskId;
-
- public TaskAttemptID(TaskID taskId, int id) {
- super(id);
- if (taskId == null) {
- throw new IllegalArgumentException("taskId cannot be null");
- }
- this.taskId = taskId;
- }
-
- public TaskAttemptID(String jtIdentifier, int jobId, boolean isMatrixTask,
- int taskId, int id) {
- this(new TaskID(jtIdentifier, jobId, isMatrixTask, taskId), id);
- }
-
- public TaskAttemptID() {
- taskId = new TaskID();
- }
-
- public JobID getJobID() {
- return taskId.getJobID();
- }
-
- public TaskID getTaskID() {
- return taskId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!super.equals(o))
- return false;
-
- TaskAttemptID that = (TaskAttemptID) o;
- return this.taskId.equals(that.taskId);
- }
-
- protected StringBuilder appendTo(StringBuilder builder) {
- return taskId.appendTo(builder).append(SEPARATOR).append(id);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- taskId.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- taskId.write(out);
- }
-
- @Override
- public int hashCode() {
- return taskId.hashCode() * 5 + id;
- }
-
- @Override
- public int compareTo(ID o) {
- TaskAttemptID that = (TaskAttemptID) o;
- int tipComp = this.taskId.compareTo(that.taskId);
- if (tipComp == 0) {
- return this.id - that.id;
- } else
- return tipComp;
- }
-
- @Override
- public String toString() {
- return appendTo(new StringBuilder(ATTEMPT)).toString();
- }
-
- public static TaskAttemptID forName(String str)
- throws IllegalArgumentException {
- if (str == null)
- return null;
- try {
- String[] parts = str.split(Character.toString(SEPARATOR));
- if (parts.length == 6) {
- if (parts[0].equals(ATTEMPT)) {
- boolean isMatrixTask = false;
- if (parts[3].equals("m"))
- isMatrixTask = true;
- else if (parts[3].equals("g"))
- isMatrixTask = false;
- else
- throw new Exception();
-
- return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
- isMatrixTask, Integer.parseInt(parts[4]), Integer
- .parseInt(parts[5]));
- }
- }
- } catch (Exception ex) {
- // fall below
- }
- throw new IllegalArgumentException("TaskAttemptId string : " + str
- + " is not properly formed");
- }
-}
Index: src/java/org/apache/hama/graph/TaskID.java
===================================================================
--- src/java/org/apache/hama/graph/TaskID.java (리비전 934620)
+++ src/java/org/apache/hama/graph/TaskID.java (작업 사본)
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hama.graph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-public class TaskID extends ID {
- protected static final String TASK = "task";
- protected static final NumberFormat idFormat = NumberFormat.getInstance();
- static {
- idFormat.setGroupingUsed(false);
- idFormat.setMinimumIntegerDigits(6);
- }
-
- private JobID jobId;
- private boolean isMatrixTask;
-
- public TaskID(JobID jobId, boolean isMatrixTask, int id) {
- super(id);
- if (jobId == null) {
- throw new IllegalArgumentException("jobId cannot be null");
- }
- this.jobId = jobId;
- this.isMatrixTask = isMatrixTask;
- }
-
- public TaskID(String jtIdentifier, int jobId, boolean isGraphTask, int id) {
- this(new JobID(jtIdentifier, jobId), isGraphTask, id);
- }
-
- public TaskID() {
- jobId = new JobID();
- }
-
- /** Returns the {@link JobID} object that this tip belongs to */
- public JobID getJobID() {
- return jobId;
- }
-
- public boolean isGraphTask() {
- return isMatrixTask;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!super.equals(o))
- return false;
-
- TaskID that = (TaskID) o;
- return this.isMatrixTask == that.isMatrixTask
- && this.jobId.equals(that.jobId);
- }
-
- @Override
- public int compareTo(ID o) {
- TaskID that = (TaskID) o;
- int jobComp = this.jobId.compareTo(that.jobId);
- if (jobComp == 0) {
- if (this.isMatrixTask == that.isMatrixTask) {
- return this.id - that.id;
- } else
- return this.isMatrixTask ? -1 : 1;
- } else {
- return jobComp;
- }
- }
-
- @Override
- public String toString() {
- return appendTo(new StringBuilder(TASK)).toString();
- }
-
- protected StringBuilder appendTo(StringBuilder builder) {
- return jobId.appendTo(builder).append(SEPARATOR).append(
- isMatrixTask ? 'm' : 'g').append(SEPARATOR).append(idFormat.format(id));
- }
-
- @Override
- public int hashCode() {
- return jobId.hashCode() * 524287 + id;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- jobId.readFields(in);
- isMatrixTask = in.readBoolean();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- jobId.write(out);
- out.writeBoolean(isMatrixTask);
- }
-
- public static TaskID forName(String str) throws IllegalArgumentException {
- if (str == null)
- return null;
- try {
- String[] parts = str.split("_");
- if (parts.length == 5) {
- if (parts[0].equals(TASK)) {
- boolean isMatrixTask = false;
- if (parts[3].equals("m"))
- isMatrixTask = true;
- else if (parts[3].equals("g"))
- isMatrixTask = false;
- else
- throw new Exception();
- return new TaskID(parts[1], Integer.parseInt(parts[2]), isMatrixTask,
- Integer.parseInt(parts[4]));
- }
- }
- } catch (Exception ex) {
- }
- throw new IllegalArgumentException("TaskId string : " + str
- + " is not properly formed");
- }
-}
Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
===================================================================
--- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 934620)
+++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본)
@@ -21,8 +21,8 @@
import java.io.IOException;
-import org.apache.hama.graph.JobID;
-import org.apache.hama.graph.JobStatus;
+import org.apache.hama.bsp.JobID;
+import org.apache.hama.bsp.JobStatus;
/**
* Protocol that a Walker and the central Master use to communicate. This