Index: bin/hama =================================================================== --- bin/hama (리비전 934620) +++ bin/hama (작업 사본) @@ -162,10 +162,10 @@ # figure out which class to run if [ "$COMMAND" = "bspmaster" ] ; then - CLASS='org.apache.hama.HamaMaster' + CLASS='org.apache.hama.bsp.BSPMaster' BSP_OPTS="$BSP_OPTS $BSP_BSPMASTER_OPTS" elif [ "$COMMAND" = "groom" ] ; then - CLASS='org.apache.hama.graph.GroomServer' + CLASS='org.apache.hama.bsp.GroomServer' BSP_OPTS="$BSP_OPTS $BSP_GROOMSERVER_OPTS" elif [ "$COMMAND" = "version" ] ; then CLASS=org.apache.bsp.util.VersionInfo Index: conf/hama-default.xml =================================================================== --- conf/hama-default.xml (리비전 934620) +++ conf/hama-default.xml (작업 사본) @@ -31,7 +31,19 @@ hama.groom.port 40020 - The port an groom server binds to. + The port an groom server binds to. + + + + bspd.system.dir + ${hadoop.tmp.dir}/bsp/system + The shared directory where BSP stores control files. + + + bspd.groom.local.dir + ${hadoop.tmp.dir}/groomserver/local + local directory for temporal store + \ No newline at end of file Index: src/java/groomserver-default.xml =================================================================== --- src/java/groomserver-default.xml (리비전 934620) +++ src/java/groomserver-default.xml (작업 사본) @@ -1,13 +0,0 @@ - - - - - - - - - - hama.groomserver.local.dir - ${hadoop.tmp.dir}/groomserver/local - - Index: src/java/org/apache/hama/HamaMaster.java =================================================================== --- src/java/org/apache/hama/HamaMaster.java (리비전 934620) +++ src/java/org/apache/hama/HamaMaster.java (작업 사본) @@ -1,215 +0,0 @@ -/** - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.text.SimpleDateFormat; -import java.util.Date; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.util.StringUtils; -import org.apache.hama.graph.JobID; -import org.apache.hama.graph.JobStatus; -import org.apache.hama.ipc.HeartbeatResponse; -import org.apache.hama.ipc.InterTrackerProtocol; -import org.apache.hama.ipc.JobSubmissionProtocol; - -public class HamaMaster implements JobSubmissionProtocol, InterTrackerProtocol { - static{ - Configuration.addDefaultResource("groomserver-default.xml"); - } - - public static final Log LOG = LogFactory.getLog(HamaMaster.class); - - private HamaConfiguration conf; - public static enum State { INITIALIZING, RUNNING } - State state = State.INITIALIZING; - - String masterIdentifier; - - private Server interTrackerServer; - - FileSystem fs = null; - Path systemDir = null; - - // system directories are world-wide readable and owner readable - final static FsPermission SYSTEM_DIR_PERMISSION = - FsPermission.createImmutable((short) 0733); // rwx-wx-wx - - // system files should have 700 permission - final static FsPermission SYSTEM_FILE_PERMISSION = - FsPermission.createImmutable((short) 0700); // rwx------ - - private static final int FS_ACCESS_RETRY_PERIOD = 10000; - - private int nextJobId = 1; - - public HamaMaster(HamaConfiguration conf, String identifier) throws IOException, InterruptedException { - this.conf = conf; - - this.masterIdentifier = identifier; - - InetSocketAddress addr = getAddress(conf); - this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), conf); - - while (!Thread.currentThread().isInterrupted()) { - try { - if (fs == null) { - fs = FileSystem.get(conf); - } - // clean up the system dir, which will only work if hdfs is out of - // safe mode - if(systemDir == null) { - systemDir = new Path(getSystemDir()); - } - - LOG.info("Cleaning up the system directory"); - fs.delete(systemDir, true); - if (FileSystem.mkdirs(fs, systemDir, - new FsPermission(SYSTEM_DIR_PERMISSION))) { - break; - } - LOG.error("Mkdirs failed to create " + systemDir); - - } catch (AccessControlException ace) { - LOG.warn("Failed to operate on mapred.system.dir (" + systemDir - + ") because of permissions."); - LOG.warn("Manually delete the mapred.system.dir (" + systemDir - + ") and then start the JobTracker."); - LOG.warn("Bailing out ... "); - throw ace; - } catch (IOException ie) { - LOG.info("problem cleaning system directory: " + systemDir, ie); - } - Thread.sleep(FS_ACCESS_RETRY_PERIOD); - } - - // deleteLocalFiles(SUBDIR); - } - - public static HamaMaster startMaster(HamaConfiguration conf) throws IOException, - InterruptedException { - return startTracker(conf, generateNewIdentifier()); - } - - public static HamaMaster startTracker(HamaConfiguration conf, String identifier) - throws IOException, InterruptedException { - - HamaMaster result = null; - result = new HamaMaster(conf, identifier); - - return result; - } - - public static InetSocketAddress getAddress(Configuration conf) { - String hamaMasterStr = conf.get("hama.master.address", "localhost:40000"); - return NetUtils.createSocketAddr(hamaMasterStr); - } - - public int getPort() { - return this.conf.getInt("hama.master.port", 0); - } - - public HamaConfiguration getConfiguration() { - return this.conf; - } - - private static SimpleDateFormat getDateFormat() { - return new SimpleDateFormat("yyyyMMddHHmm"); - } - - private static String generateNewIdentifier() { - return getDateFormat().format(new Date()); - } - - public void offerService() throws InterruptedException, IOException { - this.interTrackerServer.start(); - - synchronized (this) { - state = State.RUNNING; - } - LOG.info("Starting RUNNING"); - - this.interTrackerServer.join(); - LOG.info("Stopped interTrackerServer"); - } - - - public static void main(String [] args) { - StringUtils.startupShutdownMessage(HamaMaster.class, args, LOG); - if (args.length != 0) { - System.out.println("usage: HamaMaster"); - System.exit(-1); - } - - try { - HamaConfiguration conf = new HamaConfiguration(); - HamaMaster master = startMaster(conf); - master.offerService(); - } catch (Throwable e) { - LOG.fatal(StringUtils.stringifyException(e)); - System.exit(-1); - } - } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - if (protocol.equals(InterTrackerProtocol.class.getName())) { - return InterTrackerProtocol.versionID; - } else if (protocol.equals(JobSubmissionProtocol.class.getName())){ - return JobSubmissionProtocol.versionID; - } else { - throw new IOException("Unknown protocol to job tracker: " + protocol); - } - } - - @Override - public HeartbeatResponse heartbeat(short responseId) { - LOG.debug(">>> return the heartbeat message."); - return new HeartbeatResponse((short)1); - } - - @Override - public String getSystemDir() { - Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system")); - return fs.makeQualified(sysDir).toString(); - } - - @Override - public JobID getNewJobId() throws IOException { - return new JobID(this.masterIdentifier, nextJobId++); - } - - @Override - public JobStatus submitJob(JobID jobName) throws IOException { - - return null; - } -} Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 0) +++ src/java/org/apache/hama/bsp/BSPMaster.java (리비전 0) @@ -0,0 +1,235 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.ipc.HeartbeatResponse; +import org.apache.hama.ipc.InterTrackerProtocol; +import org.apache.hama.ipc.JobSubmissionProtocol; + +/** + * BSPMaster is responsible to control all the bsp peers and to manage bsp jobs. + */ +public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol { + static { + Configuration.addDefaultResource("hama-default.xml"); + } + + public static final Log LOG = LogFactory.getLog(BSPMaster.class); + + private HamaConfiguration conf; + + public static enum State { + INITIALIZING, RUNNING + } + + State state = State.INITIALIZING; + + String masterIdentifier; + + private Server interTrackerServer; + + FileSystem fs = null; + Path systemDir = null; + + // system directories are world-wide readable and owner readable + final static FsPermission SYSTEM_DIR_PERMISSION = FsPermission + .createImmutable((short) 0733); // rwx-wx-wx + + // system files should have 700 permission + final static FsPermission SYSTEM_FILE_PERMISSION = FsPermission + .createImmutable((short) 0700); // rwx------ + + private static final int FS_ACCESS_RETRY_PERIOD = 10000; + + private int nextJobId = 1; + + public BSPMaster(HamaConfiguration conf, String identifier) + throws IOException, InterruptedException { + this.conf = conf; + + this.masterIdentifier = identifier; + + InetSocketAddress addr = getAddress(conf); + this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr + .getPort(), conf); + + while (!Thread.currentThread().isInterrupted()) { + try { + if (fs == null) { + fs = FileSystem.get(conf); + } + // clean up the system dir, which will only work if hdfs is out of + // safe mode + if (systemDir == null) { + systemDir = new Path(getSystemDir()); + } + + LOG.info("Cleaning up the system directory"); + fs.delete(systemDir, true); + if (FileSystem.mkdirs(fs, systemDir, new FsPermission( + SYSTEM_DIR_PERMISSION))) { + break; + } + LOG.error("Mkdirs failed to create " + systemDir); + + } catch (AccessControlException ace) { + LOG.warn("Failed to operate on bspd.system.dir (" + systemDir + + ") because of permissions."); + LOG.warn("Manually delete the bspd.system.dir (" + systemDir + + ") and then start the JobTracker."); + LOG.warn("Bailing out ... "); + throw ace; + } catch (IOException ie) { + LOG.info("problem cleaning system directory: " + systemDir, ie); + } + Thread.sleep(FS_ACCESS_RETRY_PERIOD); + } + + // deleteLocalFiles(SUBDIR); + } + + public static BSPMaster startMaster(HamaConfiguration conf) + throws IOException, InterruptedException { + return startTracker(conf, generateNewIdentifier()); + } + + public static BSPMaster startTracker(HamaConfiguration conf, String identifier) + throws IOException, InterruptedException { + + BSPMaster result = null; + result = new BSPMaster(conf, identifier); + + return result; + } + + public static InetSocketAddress getAddress(Configuration conf) { + String hamaMasterStr = conf.get("hama.master.address", "localhost:40000"); + return NetUtils.createSocketAddr(hamaMasterStr); + } + + public int getPort() { + return this.conf.getInt("hama.master.port", 0); + } + + public HamaConfiguration getConfiguration() { + return this.conf; + } + + private static SimpleDateFormat getDateFormat() { + return new SimpleDateFormat("yyyyMMddHHmm"); + } + + /** + * + * @return + */ + private static String generateNewIdentifier() { + return getDateFormat().format(new Date()); + } + + public void offerService() throws InterruptedException, IOException { + this.interTrackerServer.start(); + + synchronized (this) { + state = State.RUNNING; + } + LOG.info("Starting RUNNING"); + + this.interTrackerServer.join(); + LOG.info("Stopped interTrackerServer"); + } + + public static void main(String[] args) { + StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG); + if (args.length != 0) { + System.out.println("usage: HamaMaster"); + System.exit(-1); + } + + try { + HamaConfiguration conf = new HamaConfiguration(); + BSPMaster master = startMaster(conf); + master.offerService(); + } catch (Throwable e) { + LOG.fatal(StringUtils.stringifyException(e)); + System.exit(-1); + } + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (protocol.equals(InterTrackerProtocol.class.getName())) { + return InterTrackerProtocol.versionID; + } else if (protocol.equals(JobSubmissionProtocol.class.getName())) { + return JobSubmissionProtocol.versionID; + } else { + throw new IOException("Unknown protocol to job tracker: " + protocol); + } + } + + /** + * A RPC method for transmitting each peer status from peer to master. + */ + @Override + public HeartbeatResponse heartbeat(short responseId) { + LOG.debug(">>> return the heartbeat message."); + return new HeartbeatResponse((short) 1); + } + + /** + * Return system directory to which BSP store control files. + */ + @Override + public String getSystemDir() { + Path sysDir = new Path(conf + .get("bspd.system.dir", "/tmp/hadoop/bsp/system")); + return fs.makeQualified(sysDir).toString(); + } + + /** + * This method returns new job id. The returned job id increases sequentially. + */ + public JobID getNewJobId() throws IOException { + return new JobID(this.masterIdentifier, nextJobId++); + } + + public JobStatus submitJob(JobID jobName) throws IOException { + + return null; + } +} Index: src/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServer.java (리비전 0) +++ src/java/org/apache/hama/bsp/GroomServer.java (리비전 0) @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.ipc.HeartbeatResponse; +import org.apache.hama.ipc.InterTrackerProtocol; + +public class GroomServer implements Runnable { + public static final Log LOG = LogFactory.getLog(GroomServer.class); + + static { + Configuration.addDefaultResource("hama-default.xml"); + } + + static enum State { + NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED + }; + + HamaConfiguration conf; + + volatile boolean running = true; + volatile boolean shuttingDown = false; + boolean justInited = true; + + String groomserverName; + String localHostname; + + InetSocketAddress masterAddr; + InterTrackerProtocol jobClient; + BSPPeer bspPeer; + + short heartbeatResponseId = -1; + private volatile int heartbeatInterval = 3 * 1000; + + private LocalDirAllocator localDirAllocator; + Path systemDirectory = null; + FileSystem systemFS = null; + + public GroomServer(HamaConfiguration conf) throws IOException { + this.conf = conf; + masterAddr = BSPMaster.getAddress(conf); + + FileSystem local = FileSystem.getLocal(conf); + this.localDirAllocator = new LocalDirAllocator("bspd.groom.local.dir"); + + initialize(); + } + + synchronized void initialize() throws IOException { + if (this.conf.get("slave.host.name") != null) { + this.localHostname = conf.get("slave.host.name"); + } + + if (localHostname == null) { + this.localHostname = DNS.getDefaultHost(conf.get( + "bspd.groom.dns.interface", "default"), conf.get( + "bspd.groom.dns.nameserver", "default")); + } + + checkLocalDirs(conf.getStrings("bspd.groom.local.dir")); + deleteLocalFiles("groomserver"); + + this.groomserverName = "groomd_" + localHostname; + LOG.info("Starting tracker " + this.groomserverName); + + DistributedCache.purgeCache(this.conf); + + this.jobClient = (InterTrackerProtocol) RPC.waitForProxy( + InterTrackerProtocol.class, InterTrackerProtocol.versionID, masterAddr, + conf); + this.running = true; + // this.bspPeer = new BSPPeer(this.conf); + } + + private static void checkLocalDirs(String[] localDirs) + throws DiskErrorException { + boolean writable = false; + + if (localDirs != null) { + for (int i = 0; i < localDirs.length; i++) { + try { + DiskChecker.checkDir(new File(localDirs[i])); + writable = true; + } catch (DiskErrorException e) { + LOG.warn("Graph Processor local " + e.getMessage()); + } + } + } + + if (!writable) + throw new DiskErrorException("all local directories are not writable"); + } + + public String[] getLocalDirs() { + return conf.getStrings("bspd.groom.local.dir"); + } + + public void deleteLocalFiles() throws IOException { + String[] localDirs = getLocalDirs(); + for (int i = 0; i < localDirs.length; i++) { + FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true); + } + } + + public void deleteLocalFiles(String subdir) throws IOException { + String[] localDirs = getLocalDirs(); + for (int i = 0; i < localDirs.length; i++) { + FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir), true); + } + } + + public void cleanupStorage() throws IOException { + deleteLocalFiles(); + } + + private void startCleanupThreads() throws IOException { + + } + + public State offerService() throws Exception { + long lastHeartbeat = 0; + + while (running && !shuttingDown) { + try { + long now = System.currentTimeMillis(); + + long waitTime = heartbeatInterval - (now - lastHeartbeat); + if (waitTime > 0) { + // sleeps for the wait time + Thread.sleep(waitTime); + } + + if (justInited) { + String dir = jobClient.getSystemDir(); + if (dir == null) { + throw new IOException("Failed to get system directory"); + } + systemDirectory = new Path(dir); + systemFS = systemDirectory.getFileSystem(conf); + } + + // Send the heartbeat and process the jobtracker's directives + HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); + + // Note the time when the heartbeat returned, use this to decide when to + // send the + // next heartbeat + lastHeartbeat = System.currentTimeMillis(); + + justInited = false; + } catch (InterruptedException ie) { + LOG.info("Interrupted. Closing down."); + return State.INTERRUPTED; + } catch (DiskErrorException de) { + String msg = "Exiting task tracker for disk error:\n" + + StringUtils.stringifyException(de); + LOG.error(msg); + + return State.STALE; + } catch (RemoteException re) { + return State.DENIED; + } catch (Exception except) { + String msg = "Caught exception: " + + StringUtils.stringifyException(except); + LOG.error(msg); + } + } + + return State.NORMAL; + } + + private class WalkerLauncher extends Thread { + // TODO: + } + + private HeartbeatResponse transmitHeartBeat(long now) throws IOException { + HeartbeatResponse heartbeatResponse = jobClient + .heartbeat(heartbeatResponseId); + return heartbeatResponse; + } + + public void run() { + try { + startCleanupThreads(); + boolean denied = false; + while (running && !shuttingDown && !denied) { + boolean staleState = false; + try { + while (running && !staleState && !shuttingDown && !denied) { + try { + State osState = offerService(); + if (osState == State.STALE) { + staleState = true; + } else if (osState == State.DENIED) { + denied = true; + } + } catch (Exception e) { + if (!shuttingDown) { + LOG.info("Lost connection to GraphProcessor [" + masterAddr + + "]. Retrying...", e); + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + } + } + } + } + } finally { + // close(); + } + + if (shuttingDown) { + return; + } + LOG.warn("Reinitializing local state"); + initialize(); + } + } catch (IOException ioe) { + LOG.error("Got fatal exception while reinitializing TaskTracker: " + + StringUtils.stringifyException(ioe)); + return; + } + } + + public synchronized void shutdown() throws IOException { + shuttingDown = true; + close(); + } + + public synchronized void close() throws IOException { + this.running = false; + + cleanupStorage(); + + // shutdown RPC connections + RPC.stopProxy(jobClient); + } + + public static void main(String[] args) { + StringUtils.startupShutdownMessage(GroomServer.class, args, LOG); + if (args.length != 0) { + System.out.println("usage: GroomServer"); + System.exit(-1); + } + + try { + HamaConfiguration conf = new HamaConfiguration(); + new GroomServer(conf).run(); + } catch (Throwable e) { + LOG.fatal(StringUtils.stringifyException(e)); + System.exit(-1); + } + } +} Index: src/java/org/apache/hama/bsp/ID.java =================================================================== --- src/java/org/apache/hama/bsp/ID.java (리비전 934620) +++ src/java/org/apache/hama/bsp/ID.java (작업 사본) @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hama.graph; +package org.apache.hama.bsp; import java.io.DataInput; import java.io.DataOutput; Index: src/java/org/apache/hama/bsp/JobID.java =================================================================== --- src/java/org/apache/hama/bsp/JobID.java (리비전 934620) +++ src/java/org/apache/hama/bsp/JobID.java (작업 사본) @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hama.graph; +package org.apache.hama.bsp; import java.io.DataInput; Index: src/java/org/apache/hama/bsp/JobStatus.java =================================================================== --- src/java/org/apache/hama/bsp/JobStatus.java (리비전 934620) +++ src/java/org/apache/hama/bsp/JobStatus.java (작업 사본) @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hama.graph; +package org.apache.hama.bsp; import java.io.DataInput; Index: src/java/org/apache/hama/bsp/TaskAttemptID.java =================================================================== --- src/java/org/apache/hama/bsp/TaskAttemptID.java (리비전 934620) +++ src/java/org/apache/hama/bsp/TaskAttemptID.java (작업 사본) @@ -16,12 +16,14 @@ * limitations under the License. */ -package org.apache.hama.graph; +package org.apache.hama.bsp; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.hama.bsp.ID; + public class TaskAttemptID extends ID { protected static final String ATTEMPT = "attempt"; private TaskID taskId; Index: src/java/org/apache/hama/bsp/TaskID.java =================================================================== --- src/java/org/apache/hama/bsp/TaskID.java (리비전 934620) +++ src/java/org/apache/hama/bsp/TaskID.java (작업 사본) @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hama.graph; +package org.apache.hama.bsp; import java.io.DataInput; import java.io.DataOutput; Index: src/java/org/apache/hama/graph/GroomServer.java =================================================================== --- src/java/org/apache/hama/graph/GroomServer.java (리비전 934620) +++ src/java/org/apache/hama/graph/GroomServer.java (작업 사본) @@ -1,289 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.graph; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.DiskChecker; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.HamaMaster; -import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.ipc.HeartbeatResponse; -import org.apache.hama.ipc.InterTrackerProtocol; - -public class GroomServer implements Runnable { - public static final Log LOG = LogFactory.getLog(GroomServer.class); - - static { - Configuration.addDefaultResource("groomserver-default.xml"); - } - - static enum State { - NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED - }; - - HamaConfiguration conf; - - volatile boolean running = true; - volatile boolean shuttingDown = false; - boolean justInited = true; - - String groomserverName; - String localHostname; - - InetSocketAddress masterAddr; - InterTrackerProtocol jobClient; - BSPPeer bspPeer; - - short heartbeatResponseId = -1; - private volatile int heartbeatInterval = 3 * 1000; - - private LocalDirAllocator localDirAllocator; - Path systemDirectory = null; - FileSystem systemFS = null; - - public GroomServer(HamaConfiguration conf) throws IOException { - this.conf = conf; - masterAddr = HamaMaster.getAddress(conf); - - FileSystem local = FileSystem.getLocal(conf); - this.localDirAllocator = new LocalDirAllocator("hama.groomserver.local.dir"); - - initialize(); - } - - synchronized void initialize() throws IOException { - if (this.conf.get("slave.host.name") != null) { - this.localHostname = conf.get("slave.host.name"); - } - - if (localHostname == null) { - this.localHostname = DNS.getDefaultHost(conf.get( - "hama.groomserver.dns.interface", "default"), conf.get( - "hama.groomserver.dns.nameserver", "default")); - } - - checkLocalDirs(conf.getStrings("hama.groomserver.local.dir")); - deleteLocalFiles("groomserver"); - - this.groomserverName = "groomserver_" + localHostname; - LOG.info("Starting tracker " + this.groomserverName); - - DistributedCache.purgeCache(this.conf); - - this.jobClient = (InterTrackerProtocol) RPC.waitForProxy( - InterTrackerProtocol.class, InterTrackerProtocol.versionID, masterAddr, - conf); - this.running = true; - // this.bspPeer = new BSPPeer(this.conf); - } - - private static void checkLocalDirs(String[] localDirs) - throws DiskErrorException { - boolean writable = false; - - if (localDirs != null) { - for (int i = 0; i < localDirs.length; i++) { - try { - DiskChecker.checkDir(new File(localDirs[i])); - writable = true; - } catch (DiskErrorException e) { - LOG.warn("Graph Processor local " + e.getMessage()); - } - } - } - - if (!writable) - throw new DiskErrorException("all local directories are not writable"); - } - - public String[] getLocalDirs() { - return conf.getStrings("hama.groomserver.local.dir"); - } - - public void deleteLocalFiles() throws IOException { - String[] localDirs = getLocalDirs(); - for (int i = 0; i < localDirs.length; i++) { - FileSystem.getLocal(this.conf).delete(new Path(localDirs[i])); - } - } - - public void deleteLocalFiles(String subdir) throws IOException { - String[] localDirs = getLocalDirs(); - for (int i = 0; i < localDirs.length; i++) { - FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir)); - } - } - - public void cleanupStorage() throws IOException { - deleteLocalFiles(); - } - - private void startCleanupThreads() throws IOException { - - } - - public State offerService() throws Exception { - long lastHeartbeat = 0; - - while (running && !shuttingDown) { - try { - long now = System.currentTimeMillis(); - - long waitTime = heartbeatInterval - (now - lastHeartbeat); - if (waitTime > 0) { - // sleeps for the wait time - Thread.sleep(waitTime); - } - - if (justInited) { - String dir = jobClient.getSystemDir(); - if (dir == null) { - throw new IOException("Failed to get system directory"); - } - systemDirectory = new Path(dir); - systemFS = systemDirectory.getFileSystem(conf); - } - - // Send the heartbeat and process the jobtracker's directives - HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); - - // Note the time when the heartbeat returned, use this to decide when to - // send the - // next heartbeat - lastHeartbeat = System.currentTimeMillis(); - - justInited = false; - } catch (InterruptedException ie) { - LOG.info("Interrupted. Closing down."); - return State.INTERRUPTED; - } catch (DiskErrorException de) { - String msg = "Exiting task tracker for disk error:\n" - + StringUtils.stringifyException(de); - LOG.error(msg); - - return State.STALE; - } catch (RemoteException re) { - return State.DENIED; - } catch (Exception except) { - String msg = "Caught exception: " - + StringUtils.stringifyException(except); - LOG.error(msg); - } - } - - return State.NORMAL; - } - - private class WalkerLauncher extends Thread { - // TODO: - } - - private HeartbeatResponse transmitHeartBeat(long now) throws IOException { - HeartbeatResponse heartbeatResponse = jobClient - .heartbeat(heartbeatResponseId); - return heartbeatResponse; - } - - public void run() { - try { - startCleanupThreads(); - boolean denied = false; - while (running && !shuttingDown && !denied) { - boolean staleState = false; - try { - while (running && !staleState && !shuttingDown && !denied) { - try { - State osState = offerService(); - if (osState == State.STALE) { - staleState = true; - } else if (osState == State.DENIED) { - denied = true; - } - } catch (Exception e) { - if (!shuttingDown) { - LOG.info("Lost connection to GraphProcessor [" + masterAddr - + "]. Retrying...", e); - try { - Thread.sleep(5000); - } catch (InterruptedException ie) { - } - } - } - } - } finally { - // close(); - } - - if (shuttingDown) { - return; - } - LOG.warn("Reinitializing local state"); - initialize(); - } - } catch (IOException ioe) { - LOG.error("Got fatal exception while reinitializing TaskTracker: " - + StringUtils.stringifyException(ioe)); - return; - } - } - - public synchronized void shutdown() throws IOException { - shuttingDown = true; - close(); - } - - public synchronized void close() throws IOException { - this.running = false; - - cleanupStorage(); - - // shutdown RPC connections - RPC.stopProxy(jobClient); - } - - public static void main(String[] args) { - StringUtils.startupShutdownMessage(GroomServer.class, args, LOG); - if (args.length != 0) { - System.out.println("usage: GroomServer"); - System.exit(-1); - } - - try { - HamaConfiguration conf = new HamaConfiguration(); - new GroomServer(conf).run(); - } catch (Throwable e) { - LOG.fatal(StringUtils.stringifyException(e)); - System.exit(-1); - } - } -} Index: src/java/org/apache/hama/graph/ID.java =================================================================== --- src/java/org/apache/hama/graph/ID.java (리비전 934620) +++ src/java/org/apache/hama/graph/ID.java (작업 사본) @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.graph; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.WritableComparable; - -public abstract class ID implements WritableComparable { - protected static final char SEPARATOR = '_'; - protected int id; - - public ID(int id) { - this.id = id; - } - - protected ID() { - } - - public int getId() { - return id; - } - - @Override - public String toString() { - return String.valueOf(id); - } - - @Override - public int hashCode() { - return Integer.valueOf(id).hashCode(); - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null) - return false; - if (o.getClass() == this.getClass()) { - ID that = (ID) o; - return this.id == that.id; - } else - return false; - } - - public int compareTo(ID that) { - return this.id - that.id; - } - - public void readFields(DataInput in) throws IOException { - this.id = in.readInt(); - } - - public void write(DataOutput out) throws IOException { - out.writeInt(id); - } -} Index: src/java/org/apache/hama/graph/JobClient.java =================================================================== --- src/java/org/apache/hama/graph/JobClient.java (리비전 934620) +++ src/java/org/apache/hama/graph/JobClient.java (작업 사본) @@ -27,7 +27,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hama.HamaConfiguration; -import org.apache.hama.HamaMaster; +import org.apache.hama.bsp.BSPMaster; import org.apache.hama.ipc.JobSubmissionProtocol; public class JobClient extends Configured { @@ -38,7 +38,7 @@ } static { - Configuration.addDefaultResource("groomserver-default.xml"); + Configuration.addDefaultResource("hama-default.xml"); } private JobSubmissionProtocol jobSubmitClient; @@ -53,7 +53,7 @@ public void init(HamaConfiguration conf) throws IOException { String tracker = conf.get("hama.master.address", "local"); - this.jobSubmitClient = createRPCProxy(HamaMaster.getAddress(conf), conf); + this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); } private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Index: src/java/org/apache/hama/graph/JobContext.java =================================================================== --- src/java/org/apache/hama/graph/JobContext.java (리비전 934620) +++ src/java/org/apache/hama/graph/JobContext.java (작업 사본) @@ -26,6 +26,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; +import org.apache.hama.bsp.JobID; /** * A read-only view of the job that is provided to the tasks while they are Index: src/java/org/apache/hama/graph/JobID.java =================================================================== --- src/java/org/apache/hama/graph/JobID.java (리비전 934620) +++ src/java/org/apache/hama/graph/JobID.java (작업 사본) @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.graph; - -import java.io.DataInput; - -import java.io.DataOutput; -import java.io.IOException; -import java.text.NumberFormat; - -import org.apache.hadoop.io.Text; - -public class JobID extends ID implements Comparable { - protected static final String JOB = "job"; - private final Text jtIdentifier; - - protected static final NumberFormat idFormat = NumberFormat.getInstance(); - static { - idFormat.setGroupingUsed(false); - idFormat.setMinimumIntegerDigits(4); - } - - public JobID(String jtIdentifier, int id) { - super(id); - this.jtIdentifier = new Text(jtIdentifier); - } - - public JobID() { - jtIdentifier = new Text(); - } - - public String getJtIdentifier() { - return jtIdentifier.toString(); - } - - @Override - public boolean equals(Object o) { - if (!super.equals(o)) - return false; - - JobID that = (JobID) o; - return this.jtIdentifier.equals(that.jtIdentifier); - } - - @Override - public int compareTo(ID o) { - JobID that = (JobID) o; - int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier); - if (jtComp == 0) { - return this.id - that.id; - } else - return jtComp; - } - - public StringBuilder appendTo(StringBuilder builder) { - builder.append(SEPARATOR); - builder.append(jtIdentifier); - builder.append(SEPARATOR); - builder.append(idFormat.format(id)); - return builder; - } - - @Override - public int hashCode() { - return jtIdentifier.hashCode() + id; - } - - @Override - public String toString() { - return appendTo(new StringBuilder(JOB)).toString(); - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - this.jtIdentifier.readFields(in); - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - jtIdentifier.write(out); - } - - public static JobID forName(String str) throws IllegalArgumentException { - if (str == null) - return null; - try { - String[] parts = str.split("_"); - if (parts.length == 3) { - if (parts[0].equals(JOB)) { - return new JobID(parts[1], Integer.parseInt(parts[2])); - } - } - } catch (Exception ex) { - } - throw new IllegalArgumentException("JobId string : " + str - + " is not properly formed"); - } -} Index: src/java/org/apache/hama/graph/JobStatus.java =================================================================== --- src/java/org/apache/hama/graph/JobStatus.java (리비전 934620) +++ src/java/org/apache/hama/graph/JobStatus.java (작업 사본) @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.graph; - -import java.io.DataInput; - -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.io.WritableFactory; - -public class JobStatus implements Writable, Cloneable { - - static { - WritableFactories.setFactory(JobStatus.class, new WritableFactory() { - public Writable newInstance() { - return new JobStatus(); - } - }); - } - - public static final int RUNNING = 1; - public static final int SUCCEEDED = 2; - public static final int FAILED = 3; - public static final int PREP = 4; - public static final int KILLED = 5; - - private JobID jobid; - private float progress; - private float cleanupProgress; - private float setupProgress; - private int runState; - private long startTime; - private String schedulingInfo = "NA"; - - public JobStatus() { - } - - public JobStatus(JobID jobid, float progress, int runState) { - this(jobid, progress, 0.0f, runState); - } - - public JobStatus(JobID jobid, float progress, float cleanupProgress, - int runState) { - this(jobid, 0.0f, progress, cleanupProgress, runState); - } - - public JobStatus(JobID jobid, float setupProgress, float progress, - float cleanupProgress, int runState) { - this.jobid = jobid; - this.setupProgress = setupProgress; - this.progress = progress; - this.cleanupProgress = cleanupProgress; - this.runState = runState; - } - - public JobID getJobID() { - return jobid; - } - - public synchronized float progress() { - return progress; - } - - synchronized void setprogress(float p) { - this.progress = (float) Math.min(1.0, Math.max(0.0, p)); - } - - public synchronized float cleanupProgress() { - return cleanupProgress; - } - - synchronized void setCleanupProgress(float p) { - this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p)); - } - - public synchronized float setupProgress() { - return setupProgress; - } - - synchronized void setSetupProgress(float p) { - this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p)); - } - - public synchronized int getRunState() { - return runState; - } - - public synchronized void setRunState(int state) { - this.runState = state; - } - - synchronized void setStartTime(long startTime) { - this.startTime = startTime; - } - - synchronized public long getStartTime() { - return startTime; - } - - @Override - public Object clone() { - try { - return super.clone(); - } catch (CloneNotSupportedException cnse) { - throw new InternalError(cnse.toString()); - } - } - - public synchronized String getSchedulingInfo() { - return schedulingInfo; - } - - public synchronized void setSchedulingInfo(String schedulingInfo) { - this.schedulingInfo = schedulingInfo; - } - - public synchronized boolean isJobComplete() { - return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED); - } - - public synchronized void write(DataOutput out) throws IOException { - jobid.write(out); - out.writeFloat(setupProgress); - out.writeFloat(progress); - out.writeFloat(cleanupProgress); - out.writeInt(runState); - out.writeLong(startTime); - Text.writeString(out, schedulingInfo); - } - - public synchronized void readFields(DataInput in) throws IOException { - this.jobid = new JobID(); - jobid.readFields(in); - this.setupProgress = in.readFloat(); - this.progress = in.readFloat(); - this.cleanupProgress = in.readFloat(); - this.runState = in.readInt(); - this.startTime = in.readLong(); - this.schedulingInfo = Text.readString(in); - } -} Index: src/java/org/apache/hama/graph/TaskAttemptContext.java =================================================================== --- src/java/org/apache/hama/graph/TaskAttemptContext.java (리비전 934620) +++ src/java/org/apache/hama/graph/TaskAttemptContext.java (작업 사본) @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Progressable; +import org.apache.hama.bsp.TaskAttemptID; /** * The context for task attempts. Index: src/java/org/apache/hama/graph/TaskAttemptID.java =================================================================== --- src/java/org/apache/hama/graph/TaskAttemptID.java (리비전 934620) +++ src/java/org/apache/hama/graph/TaskAttemptID.java (작업 사본) @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.graph; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -public class TaskAttemptID extends ID { - protected static final String ATTEMPT = "attempt"; - private TaskID taskId; - - public TaskAttemptID(TaskID taskId, int id) { - super(id); - if (taskId == null) { - throw new IllegalArgumentException("taskId cannot be null"); - } - this.taskId = taskId; - } - - public TaskAttemptID(String jtIdentifier, int jobId, boolean isMatrixTask, - int taskId, int id) { - this(new TaskID(jtIdentifier, jobId, isMatrixTask, taskId), id); - } - - public TaskAttemptID() { - taskId = new TaskID(); - } - - public JobID getJobID() { - return taskId.getJobID(); - } - - public TaskID getTaskID() { - return taskId; - } - - @Override - public boolean equals(Object o) { - if (!super.equals(o)) - return false; - - TaskAttemptID that = (TaskAttemptID) o; - return this.taskId.equals(that.taskId); - } - - protected StringBuilder appendTo(StringBuilder builder) { - return taskId.appendTo(builder).append(SEPARATOR).append(id); - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - taskId.readFields(in); - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - taskId.write(out); - } - - @Override - public int hashCode() { - return taskId.hashCode() * 5 + id; - } - - @Override - public int compareTo(ID o) { - TaskAttemptID that = (TaskAttemptID) o; - int tipComp = this.taskId.compareTo(that.taskId); - if (tipComp == 0) { - return this.id - that.id; - } else - return tipComp; - } - - @Override - public String toString() { - return appendTo(new StringBuilder(ATTEMPT)).toString(); - } - - public static TaskAttemptID forName(String str) - throws IllegalArgumentException { - if (str == null) - return null; - try { - String[] parts = str.split(Character.toString(SEPARATOR)); - if (parts.length == 6) { - if (parts[0].equals(ATTEMPT)) { - boolean isMatrixTask = false; - if (parts[3].equals("m")) - isMatrixTask = true; - else if (parts[3].equals("g")) - isMatrixTask = false; - else - throw new Exception(); - - return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]), - isMatrixTask, Integer.parseInt(parts[4]), Integer - .parseInt(parts[5])); - } - } - } catch (Exception ex) { - // fall below - } - throw new IllegalArgumentException("TaskAttemptId string : " + str - + " is not properly formed"); - } -} Index: src/java/org/apache/hama/graph/TaskID.java =================================================================== --- src/java/org/apache/hama/graph/TaskID.java (리비전 934620) +++ src/java/org/apache/hama/graph/TaskID.java (작업 사본) @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.graph; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.text.NumberFormat; - -public class TaskID extends ID { - protected static final String TASK = "task"; - protected static final NumberFormat idFormat = NumberFormat.getInstance(); - static { - idFormat.setGroupingUsed(false); - idFormat.setMinimumIntegerDigits(6); - } - - private JobID jobId; - private boolean isMatrixTask; - - public TaskID(JobID jobId, boolean isMatrixTask, int id) { - super(id); - if (jobId == null) { - throw new IllegalArgumentException("jobId cannot be null"); - } - this.jobId = jobId; - this.isMatrixTask = isMatrixTask; - } - - public TaskID(String jtIdentifier, int jobId, boolean isGraphTask, int id) { - this(new JobID(jtIdentifier, jobId), isGraphTask, id); - } - - public TaskID() { - jobId = new JobID(); - } - - /** Returns the {@link JobID} object that this tip belongs to */ - public JobID getJobID() { - return jobId; - } - - public boolean isGraphTask() { - return isMatrixTask; - } - - @Override - public boolean equals(Object o) { - if (!super.equals(o)) - return false; - - TaskID that = (TaskID) o; - return this.isMatrixTask == that.isMatrixTask - && this.jobId.equals(that.jobId); - } - - @Override - public int compareTo(ID o) { - TaskID that = (TaskID) o; - int jobComp = this.jobId.compareTo(that.jobId); - if (jobComp == 0) { - if (this.isMatrixTask == that.isMatrixTask) { - return this.id - that.id; - } else - return this.isMatrixTask ? -1 : 1; - } else { - return jobComp; - } - } - - @Override - public String toString() { - return appendTo(new StringBuilder(TASK)).toString(); - } - - protected StringBuilder appendTo(StringBuilder builder) { - return jobId.appendTo(builder).append(SEPARATOR).append( - isMatrixTask ? 'm' : 'g').append(SEPARATOR).append(idFormat.format(id)); - } - - @Override - public int hashCode() { - return jobId.hashCode() * 524287 + id; - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - jobId.readFields(in); - isMatrixTask = in.readBoolean(); - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - jobId.write(out); - out.writeBoolean(isMatrixTask); - } - - public static TaskID forName(String str) throws IllegalArgumentException { - if (str == null) - return null; - try { - String[] parts = str.split("_"); - if (parts.length == 5) { - if (parts[0].equals(TASK)) { - boolean isMatrixTask = false; - if (parts[3].equals("m")) - isMatrixTask = true; - else if (parts[3].equals("g")) - isMatrixTask = false; - else - throw new Exception(); - return new TaskID(parts[1], Integer.parseInt(parts[2]), isMatrixTask, - Integer.parseInt(parts[4])); - } - } - } catch (Exception ex) { - } - throw new IllegalArgumentException("TaskId string : " + str - + " is not properly formed"); - } -} Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 934620) +++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본) @@ -21,8 +21,8 @@ import java.io.IOException; -import org.apache.hama.graph.JobID; -import org.apache.hama.graph.JobStatus; +import org.apache.hama.bsp.JobID; +import org.apache.hama.bsp.JobStatus; /** * Protocol that a Walker and the central Master use to communicate. This