Index: bin/hama
===================================================================
--- bin/hama (리비전 943815)
+++ bin/hama (작업 사본)
@@ -168,7 +168,7 @@
CLASS='org.apache.hama.bsp.GroomServer'
BSP_OPTS="$BSP_OPTS $BSP_GROOMSERVER_OPTS"
elif [ "$COMMAND" = "version" ] ; then
- CLASS=org.apache.bsp.util.VersionInfo
+ CLASS=org.apache.hama.util.VersionInfo
BSP_OPTS="$BSP_OPTS $BSP_CLIENT_OPTS"
else
CLASS=$COMMAND
Index: conf/hama-default.xml
===================================================================
--- conf/hama-default.xml (리비전 943815)
+++ conf/hama-default.xml (작업 사본)
@@ -23,27 +23,27 @@
-->
- hama.master.port
+ bsp.master.port
40000
The port master should bind to.
- hama.groom.port
+ bsp.groom.port
40020
The port an groom server binds to.
- bspd.system.dir
+ bsp.local.dir
+ ${hadoop.tmp.dir}/bsp/local
+ local directory for temporal store
+
+
+
+ bsp.system.dir
${hadoop.tmp.dir}/bsp/system
The shared directory where BSP stores control files.
-
-
- bspd.groom.local.dir
- ${hadoop.tmp.dir}/groomserver/local
- local directory for temporal store
-
\ No newline at end of file
Index: src/java/org/apache/hama/Constants.java
===================================================================
--- src/java/org/apache/hama/Constants.java (리비전 943815)
+++ src/java/org/apache/hama/Constants.java (작업 사본)
@@ -25,8 +25,46 @@
/**
* Some constants used in the Hama
*/
-public class Constants {
+public interface Constants {
+ /** default host address */
+ public static final String PEER_HOST = "bsp.peer.hostname";
+ /** default host address */
+ public static final String DEFAULT_PEER_HOST = "0.0.0.0";
+
+ public static final String PEER_PORT = "bsp.peer.port";
+ /** Default port region server listens on. */
+ public static final int DEFAULT_PEER_PORT = 61000;
+
+ public static final long ATLEAST_WAIT_TIME = 100;
+
+ /** zookeeper root */
+ public static final String ZOOKEEPER_ROOT = "bsp.zookeeper.root";
+ /** zookeeper default root */
+ public static final String DEFAULT_ZOOKEEPER_ROOT = "/bsp";
+
+ /** zookeeper server address */
+ public static final String ZOOKEEPER_SERVER_ADDRS = "zookeeper.server";
+ /** zookeeper default server address */
+ static final String DEFAULT_ZOOKEEPER_SERVER_ADDR = "localhost:21810";
+ /** Parameter name for number of times to retry writes to ZooKeeper. */
+ public static final String ZOOKEEPER_RETRIES = "zookeeper.retries";
+ /** Default number of times to retry writes to ZooKeeper. */
+ public static final int DEFAULT_ZOOKEEPER_RETRIES = 5;
+ /** Parameter name for ZooKeeper pause between retries. In milliseconds. */
+ public static final String ZOOKEEPER_PAUSE = "zookeeper.pause";
+ /** Default ZooKeeper pause value. In milliseconds. */
+ public static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
+
+ /**
+ * An empty instance.
+ */
+ public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+ /**
+ * Hbase Structure for matrices
+ */
+
/** Meta-columnFamily to store the matrix-info */
public final static String METADATA = "metadata";
Index: src/java/org/apache/hama/bsp/BSPConstants.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPConstants.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/BSPConstants.java (작업 사본)
@@ -1,58 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hama.bsp;
-
-public interface BSPConstants {
-
- /** default host address */
- static final String PEER_HOST = "bsp.peer.hostname";
- /** default host address */
- static final String DEFAULT_PEER_HOST = "0.0.0.0";
-
- static final String PEER_PORT = "bsp.peer.port";
- /** Default port region server listens on. */
- static final int DEFAULT_PEER_PORT = 61000;
-
- static final long ATLEAST_WAIT_TIME = 100;
-
- /** zookeeper root */
- static final String ZOOKEEPER_ROOT = "bsp.zookeeper.root";
- /** zookeeper default root */
- static final String DEFAULT_ZOOKEEPER_ROOT = "/bsp";
-
- /** zookeeper server address */
- static final String ZOOKEEPER_SERVER_ADDRS = "zookeeper.server";
- /** zookeeper default server address */
- static final String DEFAULT_ZOOKEEPER_SERVER_ADDR = "localhost:21810";
- /** Parameter name for number of times to retry writes to ZooKeeper. */
- static final String ZOOKEEPER_RETRIES = "zookeeper.retries";
- /** Default number of times to retry writes to ZooKeeper. */
- static final int DEFAULT_ZOOKEEPER_RETRIES = 5;
- /** Parameter name for ZooKeeper pause between retries. In milliseconds. */
- static final String ZOOKEEPER_PAUSE = "zookeeper.pause";
- /** Default ZooKeeper pause value. In milliseconds. */
- static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
-
- /**
- * An empty instance.
- */
- static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
-}
Index: src/java/org/apache/hama/bsp/BSPInterface.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPInterface.java (리비전 0)
+++ src/java/org/apache/hama/bsp/BSPInterface.java (리비전 0)
@@ -0,0 +1,5 @@
+package org.apache.hama.bsp;
+
+public interface BSPInterface {
+
+}
Index: src/java/org/apache/hama/bsp/BSPJobClient.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 0)
+++ src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 0)
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+
+public class BSPJobClient extends Configured {
+ private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
+ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
+ private static final long MAX_JOBPROFILE_AGE = 1000 * 2;
+
+ static {
+ Configuration.addDefaultResource("hama-default.xml");
+ Configuration.addDefaultResource("hama-site.xml");
+ }
+
+ class NetworkedJob implements RunningJob {
+ JobProfile profile;
+ JobStatus status;
+ long statustime;
+
+ public NetworkedJob(JobStatus job) throws IOException {
+ this.status = job;
+ this.profile = jobSubmitClient.getJobProfile(job.getJobID());
+ this.statustime = System.currentTimeMillis();
+ }
+
+ /**
+ * Some methods rely on having a recent job profile object. Refresh
+ * it, if necessary
+ */
+ synchronized void ensureFreshStatus() throws IOException {
+ if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
+ updateStatus();
+ }
+ }
+
+ /** Some methods need to update status immediately. So, refresh
+ * immediately
+ * @throws IOException
+ */
+ synchronized void updateStatus() throws IOException {
+ this.status = jobSubmitClient.getJobStatus(profile.getJobID());
+ this.statustime = System.currentTimeMillis();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hama.bsp.RunningJob#getID()
+ */
+ @Override
+ public BSPJobID getID() {
+ return profile.getJobID();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hama.bsp.RunningJob#getJobName()
+ */
+ @Override
+ public String getJobName() {
+ return profile.getJobName();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hama.bsp.RunningJob#getJobFile()
+ */
+ @Override
+ public String getJobFile() {
+ return profile.getJobFile();
+ }
+
+ @Override
+ public float progress() throws IOException {
+ ensureFreshStatus();
+ return status.progress();
+ }
+
+ /**
+ * Returns immediately whether the whole job is done yet or not.
+ */
+ public synchronized boolean isComplete() throws IOException {
+ updateStatus();
+ return (status.getRunState() == JobStatus.SUCCEEDED ||
+ status.getRunState() == JobStatus.FAILED ||
+ status.getRunState() == JobStatus.KILLED);
+ }
+
+ /**
+ * True iff job completed successfully.
+ */
+ public synchronized boolean isSuccessful() throws IOException {
+ updateStatus();
+ return status.getRunState() == JobStatus.SUCCEEDED;
+ }
+
+ /**
+ * Blocks until the job is finished
+ */
+ public void waitForCompletion() throws IOException {
+ while (!isComplete()) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+
+ /**
+ * Tells the service to get the state of the current job.
+ */
+ public synchronized int getJobState() throws IOException {
+ updateStatus();
+ return status.getRunState();
+ }
+
+ /**
+ * Tells the service to terminate the current job.
+ */
+ public synchronized void killJob() throws IOException {
+ jobSubmitClient.killJob(getID());
+ }
+
+ @Override
+ public void killTask(TaskAttemptID taskId, boolean shouldFail)
+ throws IOException {
+ jobSubmitClient.killTask(taskId, shouldFail);
+ }
+ }
+
+ private JobSubmissionProtocol jobSubmitClient = null;
+ private Path sysDir = null;
+ private FileSystem fs = null;
+
+ // job files are world-wide readable and owner writable
+ final private static FsPermission JOB_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+ // job submission directory is world readable/writable/executable
+ final static FsPermission JOB_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
+
+ public BSPJobClient() {
+ }
+
+ public BSPJobClient(Configuration conf) throws IOException {
+ setConf(conf);
+ init(conf);
+ }
+
+ public void init(Configuration conf) throws IOException {
+ // it will be used to determine if the bspmaster is running on local or not.
+ //String tracker = conf.get("bsp.master.address", "local");
+ this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+ }
+
+ private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
+ Configuration conf) throws IOException {
+ return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
+ conf, JobSubmissionProtocol.class));
+ }
+
+ /**
+ * Close the JobClient.
+ */
+ public synchronized void close() throws IOException {
+ RPC.stopProxy(jobSubmitClient);
+ }
+
+ /**
+ * Get a filesystem handle. We need this to prepare jobs
+ * for submission to the BSP system.
+ *
+ * @return the filesystem handle.
+ */
+ public synchronized FileSystem getFs() throws IOException {
+ if (this.fs == null) {
+ Path sysDir = getSystemDir();
+ this.fs = sysDir.getFileSystem(getConf());
+ }
+ return fs;
+ }
+
+ /* see if two file systems are the same or not
+ *
+ */ /*
+ private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
+ URI srcUri = srcFs.getUri();
+ URI dstUri = destFs.getUri();
+ if (srcUri.getScheme() == null) {
+ return false;
+ }
+ if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+ return false;
+ }
+ String srcHost = srcUri.getHost();
+ String dstHost = dstUri.getHost();
+ if ((srcHost != null) && (dstHost != null)) {
+ try {
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+ } catch(UnknownHostException ue) {
+ return false;
+ }
+ if (!srcHost.equals(dstHost)) {
+ return false;
+ }
+ }
+ else if (srcHost == null && dstHost != null) {
+ return false;
+ }
+ else if (srcHost != null && dstHost == null) {
+ return false;
+ }
+ //check for ports
+ if (srcUri.getPort() != dstUri.getPort()) {
+ return false;
+ }
+ return true;
+ } */
+
+ //copies a file to the bspmaster filesystem and returns the path where it
+ // was copied to
+ /*
+ private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath,
+ BSPJobContext job, short replication) throws IOException {
+ //check if we do not need to copy the files
+ // is jt using the same file system.
+ // just checking for uri strings... doing no dns lookups
+ // to see if the filesystems are the same. This is not optimal.
+ // but avoids name resolution.
+
+ FileSystem remoteFs = null;
+ remoteFs = originalPath.getFileSystem(job.getConf());
+ if (compareFs(remoteFs, jtFs)) {
+ return originalPath;
+ }
+ // this might have name collisions. copy will throw an exception
+ //parse the original path to create new path
+ Path newPath = new Path(parentDir, originalPath.getName());
+ FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job.getConf());
+ jtFs.setReplication(newPath, replication);
+ return newPath;
+ }*/
+
+ private UnixUserGroupInformation getUGI(Configuration conf) throws IOException {
+ UnixUserGroupInformation ugi = null;
+ try {
+ ugi = UnixUserGroupInformation.login(conf, true);
+ } catch (LoginException e) {
+ throw (IOException)(new IOException(
+ "Failed to get the current user's information.").initCause(e));
+ }
+ return ugi;
+ }
+
+ /**
+ * Submit a job to the BSP system.
+ * This returns a handle to the {@link RunningJob} which can be used to track
+ * the running-job.
+ *
+ * @param job the job configuration.
+ * @return a handle to the {@link RunningJob} which can be used to track the
+ * running-job.
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ public RunningJob submitJob(BSPJobConf job) throws FileNotFoundException,
+ IOException {
+ return submitJobInternal(job);
+ }
+
+ public
+ RunningJob submitJobInternal(BSPJobConf job) throws IOException {
+ BSPJobID jobId = jobSubmitClient.getNewJobId();
+ Path submitJobDir = new Path(getSystemDir(), jobId.toString());
+ Path submitJarFile = new Path(submitJobDir, "job.jar");
+ Path submitJobFile = new Path(submitJobDir, "job.xml");
+
+ /*
+ * set this user's id in job configuration, so later job files can be
+ * accessed using this user's id
+ */
+ UnixUserGroupInformation ugi = getUGI(job.getConf());
+
+ // Create a number of filenames in the BSPMaster's fs namespace
+ FileSystem fs = getFs();
+ LOG.debug("default FileSystem: " + fs.getUri());
+ fs.delete(submitJobDir, true);
+ submitJobDir = fs.makeQualified(submitJobDir);
+ submitJobDir = new Path(submitJobDir.toUri().getPath());
+ FsPermission bspSysPerms = new FsPermission(JOB_DIR_PERMISSION);
+ FileSystem.mkdirs(fs, submitJobDir, bspSysPerms);
+ short replication = (short)job.getInt("bsp.submit.replication", 10);
+
+ String originalJarPath = job.getJar();
+
+ if (originalJarPath != null) { // copy jar to BSPMaster's fs
+ // use jar name if job is not named.
+ if ("".equals(job.getJobName())){
+ job.setJobName(new Path(originalJarPath).getName());
+ }
+ job.setJar(submitJarFile.toString());
+ fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
+ fs.setReplication(submitJarFile, replication);
+ fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
+ } else {
+ LOG.warn("No job jar file set. User classes may not be found. "+
+ "See BSPJobConf#setJar(String) or check Your jar file.");
+ }
+
+ // Set the user's name and working directory
+ job.setUser(ugi.getUserName());
+ if (ugi.getGroupNames().length > 0) {
+ job.set("group.name", ugi.getGroupNames()[0]);
+ }
+ if (job.getWorkingDirectory() == null) {
+ job.setWorkingDirectory(fs.getWorkingDirectory());
+ }
+
+ // Write job file to BSPMaster's fs
+ FSDataOutputStream out =
+ FileSystem.create(fs, submitJobFile,
+ new FsPermission(JOB_FILE_PERMISSION));
+
+ try {
+ job.writeXml(out);
+ } finally {
+ out.close();
+ }
+
+ //
+ // Now, actually submit the job (using the submit name)
+ //
+ JobStatus status = jobSubmitClient.submitJob(jobId);
+ if (status != null) {
+ return new NetworkedJob(status);
+ } else {
+ throw new IOException("Could not launch job");
+ }
+ }
+
+ /**
+ * Monitor a job and print status in real-time as progress is made and tasks
+ * fail.
+ * @param conf the job's configuration
+ * @param job the job to track
+ * @return true if the job succeeded
+ * @throws IOException
+ * @throws IOException if communication to the BSPMaster fails
+ * @throws InterruptedException
+ */
+ public boolean monitorAndPrintJob (BSPJobConf job, RunningJob info)
+ throws IOException, InterruptedException {
+
+ String lastReport = null;
+ BSPJobID jobId = job.getJobID();
+ LOG.info("Running job: " + jobId);
+
+ while (!job.isComplete()) {
+ Thread.sleep(1000);
+ String report = " bsp " + StringUtils.formatPercent(job.progress(), 0);
+
+ if (!report.equals(lastReport)) {
+ LOG.info(report);
+ lastReport = report;
+ }
+ }
+
+ LOG.info("Job complete: " + jobId);
+ return job.isSuccessful();
+ }
+
+ /**
+ * Grab the bspmaster system directory path where job-specific files are to be placed.
+ *
+ * @return the system directory where job-specific files are to be placed.
+ */
+ public Path getSystemDir() {
+ if (sysDir == null) {
+ sysDir = new Path(jobSubmitClient.getSystemDir());
+ }
+ return sysDir;
+ }
+
+ public RunningJob runJob(BSPJobConf job) throws FileNotFoundException,
+ IOException {
+ return submitJobInternal(job);
+ }
+}
Index: src/java/org/apache/hama/bsp/BSPJobConf.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPJobConf.java (리비전 0)
+++ src/java/org/apache/hama/bsp/BSPJobConf.java (리비전 0)
@@ -0,0 +1,186 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.Enumeration;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hama.HamaConfiguration;
+
+public class BSPJobConf extends BSPJobContext {
+ public static enum JobState {
+ DEFINE, RUNNING
+ };
+
+ private JobState state = JobState.DEFINE;
+ private BSPJobClient jobClient;
+ private RunningJob info;
+
+ public BSPJobConf() throws IOException {
+ this(new HamaConfiguration());
+ }
+
+ public BSPJobConf(HamaConfiguration conf, String jobName) throws IOException {
+ this(conf);
+ setJobName(jobName);
+ }
+
+ public BSPJobConf(HamaConfiguration conf) throws IOException {
+ super(conf, null);
+ jobClient = new BSPJobClient(conf);
+ }
+
+ private void ensureState(JobState state) throws IllegalStateException {
+ if (state != this.state) {
+ throw new IllegalStateException("Job in state " + this.state
+ + " instead of " + state);
+ }
+ }
+
+ public void setWorkingDirectory(Path dir) throws IOException {
+ ensureState(JobState.DEFINE);
+ dir = new Path(getWorkingDirectory(), dir);
+ conf.set("bsp.working.dir", dir.toString());
+ }
+
+ public void setWorkClass(Class extends Work> cls)
+ throws IllegalStateException {
+ ensureState(JobState.DEFINE);
+ conf.setClass(WORK_CLASS_ATTR, cls, Work.class);
+ }
+
+ public void setInputPath(HamaConfiguration conf, Path iNPUTPATH) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void setOutputPath(HamaConfiguration conf, Path oUTPUTPATH) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void setInputFormat(Class extends InputFormat> class1) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void setOutputFormat(Class extends OutputFormat> class1) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void setBSPCode(Class extends BSPInterface> class1) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public String getInputPath() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public String getOutputPath() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void setJar(String jar) {
+ conf.set("bsp.jar", jar);
+ }
+
+ public void setJarByClass(Class> cls) {
+ String jar = findContainingJar(cls);
+ if (jar != null) {
+ conf.set("bsp.jar", jar);
+ }
+ }
+
+ public void setJobName(String name) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
+ conf.set("bsp.job.name", name);
+ }
+
+ public void setUser(String user) {
+ conf.set("user.name", user);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static String findContainingJar(Class my_class) {
+ ClassLoader loader = my_class.getClassLoader();
+ String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+ try {
+ for (Enumeration itr = loader.getResources(class_file); itr
+ .hasMoreElements();) {
+ URL url = (URL) itr.nextElement();
+ if ("jar".equals(url.getProtocol())) {
+ String toReturn = url.getPath();
+ if (toReturn.startsWith("file:")) {
+ toReturn = toReturn.substring("file:".length());
+ }
+ toReturn = URLDecoder.decode(toReturn, "UTF-8");
+ return toReturn.replaceAll("!.*$", "");
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+ public float progress() throws IOException {
+ ensureState(JobState.RUNNING);
+ return info.progress();
+ }
+
+ public boolean isComplete() throws IOException {
+ ensureState(JobState.RUNNING);
+ return info.isComplete();
+ }
+
+ public boolean isSuccessful() throws IOException {
+ ensureState(JobState.RUNNING);
+ return info.isSuccessful();
+ }
+
+ public void killJob() throws IOException {
+ ensureState(JobState.RUNNING);
+ info.killJob();
+ }
+
+ public void killTask(TaskAttemptID taskId) throws IOException {
+ ensureState(JobState.RUNNING);
+ info.killTask(taskId, false);
+ }
+
+ public void failTask(TaskAttemptID taskId) throws IOException {
+ ensureState(JobState.RUNNING);
+ info.killTask(taskId, true);
+ }
+
+ public void submit() throws IOException, InterruptedException {
+ System.out.println("Submitted");
+ ensureState(JobState.DEFINE);
+ info = jobClient.submitJobInternal(this);
+ state = JobState.RUNNING;
+ }
+
+ public boolean waitForCompletion(boolean verbose) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ if (state == JobState.DEFINE) {
+ submit();
+ }
+ if (verbose) {
+ jobClient.monitorAndPrintJob(this, info);
+ } else {
+ info.waitForCompletion();
+ }
+ return isSuccessful();
+ }
+
+ public void set(String name, String value) {
+ conf.set(name, value);
+ }
+}
Index: src/java/org/apache/hama/bsp/BSPJobContext.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPJobContext.java (리비전 0)
+++ src/java/org/apache/hama/bsp/BSPJobContext.java (리비전 0)
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * A read-only view of the bsp job that is provided to the tasks while they are
+ * running.
+ */
+public class BSPJobContext {
+ static{
+ Configuration.addDefaultResource("hama-default.xml");
+ Configuration.addDefaultResource("hama-site.xml");
+ }
+
+ // Put all of the attribute names in here so that BSPJob and JobContext are
+ // consistent.
+ protected static final String WORK_CLASS_ATTR = "bsp.work.class";
+
+ protected final Configuration conf;
+ private final BSPJobID jobId;
+
+ public BSPJobContext(Configuration conf, BSPJobID jobId) {
+ this.conf = conf;
+ this.jobId = jobId;
+ }
+
+ public BSPJobContext(Path config, BSPJobID jobId) throws IOException {
+ this.conf = new HamaConfiguration();
+ this.jobId = jobId;
+ this.conf.addResource(config);
+ }
+
+ public BSPJobID getJobID() {
+ return jobId;
+ }
+
+ public Path getWorkingDirectory() throws IOException {
+ String name = conf.get("bsp.working.dir");
+
+ if (name != null) {
+ return new Path(name);
+ } else {
+ try {
+ Path dir = FileSystem.get(conf).getWorkingDirectory();
+ conf.set("bsp.working.dir", dir.toString());
+ return dir;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public String getJobName() {
+ return conf.get("bsp.job.name", "");
+ }
+
+ public String getJar() {
+ return conf.get("bsp.jar");
+ }
+
+ /**
+ * Constructs a local file name. Files are distributed among configured
+ * local directories.
+ */
+ public Path getLocalPath(String pathString) throws IOException {
+ return conf.getLocalPath("bsp.local.dir", pathString);
+ }
+
+ public String getUser() {
+ return conf.get("user.name");
+ }
+
+ public void writeXml(OutputStream out) throws IOException {
+ conf.writeXml(out);
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ public String get(String name) {
+ return conf.get(name);
+ }
+
+ public int getInt(String name, int defaultValue) {
+ return conf.getInt(name, defaultValue);
+ }
+}
Index: src/java/org/apache/hama/bsp/BSPJobID.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPJobID.java (리비전 0)
+++ src/java/org/apache/hama/bsp/BSPJobID.java (리비전 0)
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.io.Text;
+
+public class BSPJobID extends ID implements Comparable {
+ protected static final String JOB = "job";
+ private final Text jtIdentifier;
+
+ protected static final NumberFormat idFormat = NumberFormat.getInstance();
+ static {
+ idFormat.setGroupingUsed(false);
+ idFormat.setMinimumIntegerDigits(4);
+ }
+
+ public BSPJobID(String jtIdentifier, int id) {
+ super(id);
+ this.jtIdentifier = new Text(jtIdentifier);
+ }
+
+ public BSPJobID() {
+ jtIdentifier = new Text();
+ }
+
+ public String getJtIdentifier() {
+ return jtIdentifier.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!super.equals(o))
+ return false;
+
+ BSPJobID that = (BSPJobID) o;
+ return this.jtIdentifier.equals(that.jtIdentifier);
+ }
+
+ @Override
+ public int compareTo(ID o) {
+ BSPJobID that = (BSPJobID) o;
+ int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
+ if (jtComp == 0) {
+ return this.id - that.id;
+ } else
+ return jtComp;
+ }
+
+ public StringBuilder appendTo(StringBuilder builder) {
+ builder.append(SEPARATOR);
+ builder.append(jtIdentifier);
+ builder.append(SEPARATOR);
+ builder.append(idFormat.format(id));
+ return builder;
+ }
+
+ @Override
+ public int hashCode() {
+ return jtIdentifier.hashCode() + id;
+ }
+
+ @Override
+ public String toString() {
+ return appendTo(new StringBuilder(JOB)).toString();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ this.jtIdentifier.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ jtIdentifier.write(out);
+ }
+
+ public static BSPJobID forName(String str) throws IllegalArgumentException {
+ if (str == null)
+ return null;
+ try {
+ String[] parts = str.split("_");
+ if (parts.length == 3) {
+ if (parts[0].equals(JOB)) {
+ return new BSPJobID(parts[1], Integer.parseInt(parts[2]));
+ }
+ }
+ } catch (Exception ex) {
+ }
+ throw new IllegalArgumentException("JobId string : " + str
+ + " is not properly formed");
+ }
+}
Index: src/java/org/apache/hama/bsp/BSPMaster.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본)
@@ -1,6 +1,4 @@
/**
- * Copyright 2009 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,9 +18,16 @@
package org.apache.hama.bsp;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,77 +39,121 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.ipc.HeartbeatResponse;
import org.apache.hama.ipc.InterTrackerProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;
/**
- * BSPMaster is responsible to control all the bsp peers and to manage bsp jobs.
+ * BSPMaster is responsible to control all the groom servers and to manage bsp
+ * jobs.
*/
-public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol {
- static{
+public class BSPMaster extends Thread implements JobSubmissionProtocol, InterTrackerProtocol,
+ GroomServerManager {
+
+ static {
Configuration.addDefaultResource("hama-default.xml");
+ Configuration.addDefaultResource("hama-site.xml");
}
-
+
public static final Log LOG = LogFactory.getLog(BSPMaster.class);
-
- private HamaConfiguration conf;
- public static enum State { INITIALIZING, RUNNING }
+
+ private Configuration conf;
+
+ // Constants
+ public static enum State {
+ INITIALIZING, RUNNING
+ }
+
+ private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+
+ // States
State state = State.INITIALIZING;
-
+
+ // Attributes
String masterIdentifier;
+ private Server interTrackerServer;
- private Server interTrackerServer;
-
+ // Filesystem
+ static final String SUBDIR = "bspMaster";
FileSystem fs = null;
Path systemDir = null;
-
// system directories are world-wide readable and owner readable
- final static FsPermission SYSTEM_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+ final static FsPermission SYSTEM_DIR_PERMISSION = FsPermission
+ .createImmutable((short) 0733); // rwx-wx-wx
+ // system files should have 700 permission
+ final static FsPermission SYSTEM_FILE_PERMISSION = FsPermission
+ .createImmutable((short) 0700); // rwx------
- // system files should have 700 permission
- final static FsPermission SYSTEM_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0700); // rwx------
-
- private static final int FS_ACCESS_RETRY_PERIOD = 10000;
-
+ // Groom Servers
+ // (groom name --> last sent HeartBeatResponse)
+ Map groomToHeartbeatResponseMap = new TreeMap();
+ private HashMap groomServers = new HashMap();
+
+ // Jobs' Meta Data
private int nextJobId = 1;
-
- public BSPMaster(HamaConfiguration conf, String identifier) throws IOException, InterruptedException {
+ // private long startTime;
+ private int totalSubmissions = 0;
+ private int totalTasks = 0;
+ private int totalTaskCapacity;
+ private Map jobs = new TreeMap();
+ private TaskScheduler taskScheduler;
+
+ /*
+ * private final List jobInProgressListeners = new
+ * CopyOnWriteArrayList();
+ */
+
+ /**
+ * Start the BSPMaster process, listen on the indicated hostname/port
+ */
+ public BSPMaster(Configuration conf) throws IOException, InterruptedException {
+ this(conf, generateNewIdentifier());
+ }
+
+ BSPMaster(Configuration conf, String identifier) throws IOException,
+ InterruptedException {
this.conf = conf;
-
this.masterIdentifier = identifier;
-
- InetSocketAddress addr = getAddress(conf);
- this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), conf);
-
+
+ // Create the scheduler
+ Class extends TaskScheduler> schedulerClass = conf.getClass(
+ "bsp.master.taskscheduler", SimpleTaskScheduler.class,
+ TaskScheduler.class);
+ this.taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(
+ schedulerClass, conf);
+
+ InetSocketAddress addr = getAddress(conf);
+ this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr
+ .getPort(), conf);
+
while (!Thread.currentThread().isInterrupted()) {
try {
if (fs == null) {
fs = FileSystem.get(conf);
}
- // clean up the system dir, which will only work if hdfs is out of
+ // clean up the system dir, which will only work if hdfs is out of
// safe mode
- if(systemDir == null) {
- systemDir = new Path(getSystemDir());
+ if (systemDir == null) {
+ systemDir = new Path(getSystemDir());
}
LOG.info("Cleaning up the system directory");
+ LOG.info(systemDir);
fs.delete(systemDir, true);
- if (FileSystem.mkdirs(fs, systemDir,
- new FsPermission(SYSTEM_DIR_PERMISSION))) {
+ if (FileSystem.mkdirs(fs, systemDir, new FsPermission(
+ SYSTEM_DIR_PERMISSION))) {
break;
}
LOG.error("Mkdirs failed to create " + systemDir);
+ LOG.info(SUBDIR);
} catch (AccessControlException ace) {
- LOG.warn("Failed to operate on bspd.system.dir (" + systemDir
+ LOG.warn("Failed to operate on bsp.system.dir (" + systemDir
+ ") because of permissions.");
- LOG.warn("Manually delete the bspd.system.dir (" + systemDir
- + ") and then start the JobTracker.");
+ LOG.warn("Manually delete the bsp.system.dir (" + systemDir
+ + ") and then start the BSPMaster.");
LOG.warn("Bailing out ... ");
throw ace;
} catch (IOException ie) {
@@ -112,37 +161,100 @@
}
Thread.sleep(FS_ACCESS_RETRY_PERIOD);
}
-
- // deleteLocalFiles(SUBDIR);
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ deleteLocalFiles(SUBDIR);
}
-
- public static BSPMaster startMaster(HamaConfiguration conf) throws IOException,
- InterruptedException {
+
+ // /////////////////////////////////////////////////////
+ // Accessors for objects that want info on jobs, tasks,
+ // grooms, etc.
+ // /////////////////////////////////////////////////////
+ public GroomServerStatus getGroomServer(String groomID) {
+ synchronized (groomServers) {
+ return groomServers.get(groomID);
+ }
+ }
+
+ public List groomServerNames() {
+ List activeGrooms = new ArrayList();
+ synchronized (groomServers) {
+ for (GroomServerStatus status : groomServers.values()) {
+ activeGrooms.add(status.getGroomName());
+ }
+ }
+ return activeGrooms;
+ }
+
+ // /////////////////////////////////////////////////////////////
+ // BSPMaster methods
+ // /////////////////////////////////////////////////////////////
+
+ // Get the job directory in system directory
+ Path getSystemDirectoryForJob(BSPJobID id) {
+ return new Path(getSystemDir(), id.toString());
+ }
+
+ String[] getLocalDirs() throws IOException {
+ return conf.getStrings("bsp.local.dir");
+ }
+
+ void deleteLocalFiles() throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(conf).delete(new Path(localDirs[i]), true);
+ }
+ }
+
+ void deleteLocalFiles(String subdir) throws IOException {
+ try {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir), true);
+ }
+ } catch (NullPointerException e) {
+ LOG.info(e);
+ }
+ }
+
+ /**
+ * Constructs a local file name. Files are distributed among configured local
+ * directories.
+ */
+ Path getLocalPath(String pathString) throws IOException {
+ return conf.getLocalPath("bsp.local.dir", pathString);
+ }
+
+ public BSPMaster startMaster() throws IOException,
+ InterruptedException {
return startTracker(conf, generateNewIdentifier());
}
-
- public static BSPMaster startTracker(HamaConfiguration conf, String identifier)
- throws IOException, InterruptedException {
-
+
+ public static BSPMaster startTracker(Configuration conf, String identifier)
+ throws IOException, InterruptedException {
+
BSPMaster result = null;
result = new BSPMaster(conf, identifier);
-
+
return result;
}
-
+
public static InetSocketAddress getAddress(Configuration conf) {
- String hamaMasterStr = conf.get("hama.master.address", "localhost:40000");
+ String hamaMasterStr = conf.get("bsp.master.address", "localhost:40000");
return NetUtils.createSocketAddr(hamaMasterStr);
}
-
+
public int getPort() {
- return this.conf.getInt("hama.master.port", 0);
+ return this.conf.getInt("bsp.master.port", 0);
}
- public HamaConfiguration getConfiguration() {
+ public Configuration getConf() {
return this.conf;
}
-
+
private static SimpleDateFormat getDateFormat() {
return new SimpleDateFormat("yyyyMMddHHmm");
}
@@ -154,42 +266,115 @@
private static String generateNewIdentifier() {
return getDateFormat().format(new Date());
}
-
+
public void offerService() throws InterruptedException, IOException {
this.interTrackerServer.start();
-
+
synchronized (this) {
state = State.RUNNING;
}
LOG.info("Starting RUNNING");
-
+
this.interTrackerServer.join();
LOG.info("Stopped interTrackerServer");
}
-
-
- public static void main(String [] args) {
+
+ public static void main(String[] args) {
StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
if (args.length != 0) {
System.out.println("usage: HamaMaster");
System.exit(-1);
}
-
+
try {
HamaConfiguration conf = new HamaConfiguration();
- BSPMaster master = startMaster(conf);
- master.offerService();
+ conf.set("bsp.master.port", "40000");
+ conf.set("bsp.groom.port", "40020");
+ conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local");
+ conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system");
+
+ BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf);
+ master.start();
} catch (Throwable e) {
LOG.fatal(StringUtils.stringifyException(e));
System.exit(-1);
}
}
+ public void run() {
+ try {
+ offerService();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ // //////////////////////////////////////////////////
+ // GroomServerManager
+ // //////////////////////////////////////////////////
@Override
- public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ public void addJobInProgressListener(JobInProgressListener listener) {
+ // jobInProgressListeners.add(listener);
+ }
+
+ @Override
+ public void removeJobInProgressListener(JobInProgressListener listener) {
+ // jobInProgressListeners.remove(listener);
+ }
+
+ @Override
+ public void failJob(JobInProgress job) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public JobInProgress getJob(BSPJobID jobid) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getNextHeartbeatInterval() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfUniqueHosts() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public Collection grooms() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void initJob(JobInProgress job) {
+ if (null == job) {
+ LOG.info("Init on null job is not valid");
+ return;
+ }
+
+ // JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+ LOG.info("Initializing " + job.getJobID());
+ }
+
+ // //////////////////////////////////////////////////
+ // InterTrackerProtocol
+ // //////////////////////////////////////////////////
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
if (protocol.equals(InterTrackerProtocol.class.getName())) {
return InterTrackerProtocol.versionID;
- } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
+ } else if (protocol.equals(JobSubmissionProtocol.class.getName())) {
return JobSubmissionProtocol.versionID;
} else {
throw new IOException("Unknown protocol to job tracker: " + protocol);
@@ -198,34 +383,212 @@
/**
* A RPC method for transmitting each peer status from peer to master.
+ *
+ * @throws IOException
*/
@Override
- public HeartbeatResponse heartbeat(short responseId) {
- LOG.debug(">>> return the heartbeat message.");
- return new HeartbeatResponse((short)1);
+ public HeartbeatResponse heartbeat(GroomServerStatus status,
+ boolean restarted, boolean initialContact, boolean acceptNewTasks,
+ short responseId) throws IOException {
+ LOG.debug(">>> Received the heartbeat message from ");
+ LOG.debug(">>> " + status.groomName + "(" + status.getHost() + ")");
+ LOG.debug(">>> restarted:" + restarted + ",first:" + initialContact);
+ LOG.debug(">>> maxTaskCapacity:" + status.getMaxTasks() + ",taskCapacity:"
+ + status.getTaskReports().size());
+
+ // First check if the last heartbeat response got through
+ String groomName = status.getGroomName();
+ long now = System.currentTimeMillis();
+
+ HeartbeatResponse prevHeartbeatResponse = groomToHeartbeatResponseMap
+ .get(groomName);
+
+ // Process this heartbeat
+ short newResponseId = (short) (responseId + 1);
+ status.setLastSeen(now);
+ if (!processHeartbeat(status, initialContact)) {
+ if (prevHeartbeatResponse != null) {
+ groomToHeartbeatResponseMap.remove(groomName);
+ }
+ return new HeartbeatResponse(newResponseId,
+ new GroomServerAction[] { new ReinitTrackerAction() });
+ }
+
+ HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
+ // List actions = new ArrayList();
+
+ // Check for new tasks to be executed on the groom server
+ if (acceptNewTasks) {
+ GroomServerStatus groomStatus = getGroomServer(groomName);
+ if (groomStatus == null) {
+ LOG.warn("Unknown task tracker polling; ignoring: " + groomName);
+ } else {
+ // TODO - assignTasks should be implemented
+ /*
+ * List tasks = taskScheduler.assignTasks(groomStatus); for(Task
+ * task : tasks) { if(tasks != null) { LOG.debug(groomName +
+ * "-> LaunchTask: " + task.getTaskID()); actions.add(new
+ * LaunchTaskAction(task)); } }
+ */
+ }
+ }
+
+ return response;
}
-
+
/**
+ * Process incoming heartbeat messages from the groom.
+ */
+ private synchronized boolean processHeartbeat(GroomServerStatus groomStatus,
+ boolean initialContact) {
+ String groomName = groomStatus.getGroomName();
+
+ synchronized (groomServers) {
+ GroomServerStatus oldStatus = groomServers.get(groomName);
+ if (oldStatus == null) {
+ groomServers.put(groomName, groomStatus);
+ } else { // TODO - to be improved to update status.
+ }
+ }
+
+ return true;
+ }
+
+ // //////////////////////////////////////////////////
+ // JobSubmissionProtocol
+ // //////////////////////////////////////////////////
+ /**
+ * This method returns new job id. The returned job id increases sequentially.
+ */
+ @Override
+ public BSPJobID getNewJobId() throws IOException {
+ return new BSPJobID(this.masterIdentifier, nextJobId++);
+ }
+
+ @Override
+ public JobStatus submitJob(BSPJobID jobId) throws IOException {
+ LOG.info("Submitted a job (" + jobId + ")");
+ if (jobs.containsKey(jobId)) {
+ // job already running, don't start twice
+ LOG.info("The job (" + jobId + ") is already subbmitted");
+ return jobs.get(jobId).getStatus();
+ }
+
+ JobInProgress job = new JobInProgress(jobId, this, this.conf);
+
+ return addJob(jobId, job);
+ }
+
+ @Override
+ public ClusterStatus getClusterStatus(boolean detailed) {
+ synchronized (groomServers) {
+ if (detailed) {
+ List groomNames = groomServerNames();
+ return new ClusterStatus(groomNames, totalTasks, totalTaskCapacity,
+ state);
+ } else {
+ return new ClusterStatus(groomServers.size(), totalTasks,
+ totalTaskCapacity, state);
+ }
+ }
+ }
+
+ /**
+ * Adds a job to the bsp master. Make sure that the checks are inplace before
+ * adding a job. This is the core job submission logic
+ *
+ * @param jobId The id for the job submitted which needs to be added
+ */
+ private synchronized JobStatus addJob(BSPJobID jodId, JobInProgress job) {
+ totalSubmissions++;
+ synchronized (jobs) {
+ jobs.put(job.getProfile().getJobID(), job);
+ taskScheduler.addJob(job);
+ }
+
+ return job.getStatus();
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException {
+ return null;
+ }
+
+ @Override
+ public synchronized String getFilesystemName() throws IOException {
+ if (fs == null) {
+ throw new IllegalStateException("FileSystem object not available yet");
+ }
+ return fs.getUri().toString();
+ }
+
+ /**
* Return system directory to which BSP store control files.
*/
@Override
public String getSystemDir() {
- Path sysDir = new Path(conf.get("bspd.system.dir", "/tmp/hadoop/bsp/system"));
+ Path sysDir = new Path(conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system"));
return fs.makeQualified(sysDir).toString();
}
-
- /**
- * This method returns new job id. The returned job id increases sequentially.
- */
@Override
- public JobID getNewJobId() throws IOException {
- return new JobID(this.masterIdentifier, nextJobId++);
+ public JobProfile getJobProfile(BSPJobID jobid) throws IOException {
+ synchronized (this) {
+ JobInProgress job = jobs.get(jobid);
+ if (job != null) {
+ return job.getProfile();
+ }
+ }
+ return null;
}
@Override
- public JobStatus submitJob(JobID jobName) throws IOException {
-
+ public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
+ synchronized (this) {
+ JobInProgress job = jobs.get(jobid);
+ if (job != null) {
+ return job.getStatus();
+ }
+ }
return null;
}
+
+ @Override
+ public void killJob(BSPJobID jobid) throws IOException {
+ JobInProgress job = jobs.get(jobid);
+
+ if (null == job) {
+ LOG.info("killJob(): JobId " + jobid.toString() + " is not a valid job");
+ return;
+ }
+
+ killJob(job);
+ }
+
+ private synchronized void killJob(JobInProgress job) {
+ LOG.info("Killing job " + job.getJobID());
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+ throws IOException {
+ return false;
+ }
+
+ public static BSPMaster constructMaster(
+ Class extends BSPMaster> masterClass, final Configuration conf) {
+ try {
+ Constructor extends BSPMaster> c = masterClass
+ .getConstructor(Configuration.class);
+ return c.newInstance(conf);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed construction of " + "Master: "
+ + masterClass.toString()
+ + ((e.getCause() != null) ? e.getCause().getMessage() : ""), e);
+ }
+ }
+
+ public void shutdown() {
+ this.interTrackerServer.stop();
+ }
}
Index: src/java/org/apache/hama/bsp/BSPMessage.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPMessage.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/BSPMessage.java (작업 사본)
@@ -1,6 +1,4 @@
/**
- * Copyright 2007 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Index: src/java/org/apache/hama/bsp/BSPPeer.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPPeer.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/BSPPeer.java (작업 사본)
@@ -1,6 +1,4 @@
/**
- * Copyright 2007 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -33,6 +31,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.Constants;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -66,20 +65,23 @@
public BSPPeer(Configuration conf) throws IOException {
this.conf = conf;
- serverName = conf.get(PEER_HOST,DEFAULT_PEER_HOST) +":"+ conf.getInt(PEER_PORT, DEFAULT_PEER_PORT);
- bindAddress = conf.get(PEER_HOST, DEFAULT_PEER_HOST);
- bindPort = conf.getInt(PEER_PORT, DEFAULT_PEER_PORT);
- bspRoot = conf.get(ZOOKEEPER_ROOT, DEFAULT_ZOOKEEPER_ROOT);
- zookeeperAddr = conf.get(ZOOKEEPER_SERVER_ADDRS,"localhost:21810");
+ serverName = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST)
+ + ":" + conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+ bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+ bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+ bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
+ Constants.DEFAULT_ZOOKEEPER_ROOT);
+ zookeeperAddr = conf.get(Constants.ZOOKEEPER_SERVER_ADDRS,
+ "localhost:21810");
reinitialize();
}
public void reinitialize() {
try {
- System.out.println(bindAddress+":"+bindPort);
+ System.out.println(bindAddress + ":" + bindPort);
server = RPC.getServer(this, bindAddress, bindPort, conf);
- server.start();
+ server.start();
} catch (IOException e) {
e.printStackTrace();
}
@@ -98,9 +100,8 @@
/*
* (non-Javadoc)
- *
* @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress,
- * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
+ * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
*/
@Override
public void send(InetSocketAddress hostname, BSPMessage msg)
@@ -116,7 +117,6 @@
/*
* (non-Javadoc)
- *
* @see org.apache.hama.bsp.BSPPeerInterface#sync()
*/
@Override
@@ -145,7 +145,7 @@
}
}
enterBarrier();
- Thread.sleep(ATLEAST_WAIT_TIME); // TODO - This is temporary work because
+ Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work because
// it can be affected by network condition,
// the number of peers, and the load of zookeeper.
// It should fixed to some flawless way.
Index: src/java/org/apache/hama/bsp/BSPPeerInterface.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPPeerInterface.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/BSPPeerInterface.java (작업 사본)
@@ -1,6 +1,4 @@
/**
- * Copyright 2007 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.bsp;
import java.io.IOException;
Index: src/java/org/apache/hama/bsp/ClusterStatus.java
===================================================================
--- src/java/org/apache/hama/bsp/ClusterStatus.java (리비전 0)
+++ src/java/org/apache/hama/bsp/ClusterStatus.java (리비전 0)
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Status information on the current state of the BSP cluster.
+ *
+ * ClusterStatus provides clients with information such as:
+ *
+ * -
+ * Size of the cluster.
+ *
+ * -
+ * Name of the grooms.
+ *
+ * -
+ * Task capacity of the cluster.
+ *
+ * -
+ * The number of currently running bsp tasks.
+ *
+ * -
+ * State of the
BSPMaster.
+ *
+ *
+ *
+ * Clients can query for the latest ClusterStatus, via
+ * {@link BSPJobClient#getClusterStatus()}.
+ *
+ * @see BSPMaster
+ */
+public class ClusterStatus implements Writable {
+
+ private int numActiveGrooms;
+ private Collection activeGrooms = new ArrayList();
+ private int tasks;
+ private int maxTasks;
+ private BSPMaster.State state;
+
+ /**
+ *
+ */
+ ClusterStatus() {}
+
+ ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) {
+ this.numActiveGrooms = grooms;
+ this.tasks = tasks;
+ this.maxTasks = maxTasks;
+ this.state = state;
+ }
+
+ ClusterStatus(Collection activeGrooms, int tasks, int maxTasks,
+ BSPMaster.State state) {
+ this(activeGrooms.size(), tasks, maxTasks, state);
+ this.activeGrooms = activeGrooms;
+ }
+
+ /**
+ * Get the number of groom servers in the cluster.
+ *
+ * @return the number of groom servers in the cluster.
+ */
+ public int getGroomServers() {
+ return tasks;
+ }
+
+ /**
+ * Get the names of groom servers in the cluster.
+ *
+ * @return the active groom servers in the cluster.
+ */
+ public Collection getActiveGroomNames() {
+ return activeGrooms;
+ }
+
+ /**
+ * Get the number of currently running tasks in the cluster.
+ *
+ * @return the number of currently running tasks in the cluster.
+ */
+ public int getTasks() {
+ return tasks;
+ }
+
+ /**
+ * Get the maximum capacity for running tasks in the cluster.
+ *
+ * @return the maximum capacity for running tasks in the cluster.
+ */
+ public int getMaxTasks() {
+ return maxTasks;
+ }
+
+ /**
+ * Get the current state of the BSPMaster,
+ * as {@link BSPMaster.State}
+ *
+ * @return the current state of the BSPMaster.
+ */
+ public BSPMaster.State getBSPMasterState() {
+ return state;
+ }
+
+ //////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if(activeGrooms.size() == 0) {
+ out.writeInt(numActiveGrooms);
+ out.writeInt(0);
+ } else {
+ out.writeInt(activeGrooms.size());
+ out.writeInt(activeGrooms.size());
+ for(String groom: activeGrooms) {
+ Text.writeString(out, groom);
+ }
+ }
+ out.writeInt(tasks);
+ out.writeInt(maxTasks);
+ WritableUtils.writeEnum(out, state);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ numActiveGrooms = in.readInt();
+ int numGroomNames = in.readInt();
+ String name;
+ if (numGroomNames > 0) {
+ name = Text.readString(in);
+ activeGrooms.add(name);
+ }
+ tasks = in.readInt();
+ maxTasks = in.readInt();
+ state = WritableUtils.readEnum(in, BSPMaster.State.class);
+ }
+}
Index: src/java/org/apache/hama/bsp/CommitTaskAction.java
===================================================================
--- src/java/org/apache/hama/bsp/CommitTaskAction.java (리비전 0)
+++ src/java/org/apache/hama/bsp/CommitTaskAction.java (리비전 0)
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to commit the output
+ * of the task.
+ *
+ */
+class CommitTaskAction extends GroomServerAction {
+ private TaskAttemptID taskId;
+
+ public CommitTaskAction() {
+ super(ActionType.COMMIT_TASK);
+ taskId = new TaskAttemptID();
+ }
+
+ public CommitTaskAction(TaskAttemptID taskId) {
+ super(ActionType.COMMIT_TASK);
+ this.taskId = taskId;
+ }
+
+ public TaskAttemptID getTaskID() {
+ return taskId;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ taskId.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ taskId.readFields(in);
+ }
+}
Index: src/java/org/apache/hama/bsp/DefaultBSPPeer.java
===================================================================
--- src/java/org/apache/hama/bsp/DefaultBSPPeer.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/DefaultBSPPeer.java (작업 사본)
@@ -1,6 +1,4 @@
/**
- * Copyright 2007 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,15 +19,15 @@
import java.io.Closeable;
import java.io.IOException;
-
import java.net.InetSocketAddress;
+import org.apache.hama.Constants;
import org.apache.zookeeper.KeeperException;
/**
*
*/
-public interface DefaultBSPPeer extends Closeable, BSPConstants {
+public interface DefaultBSPPeer extends Closeable, Constants {
/**
* Send a data with a tag to another BSPSlave corresponding to hostname.
Index: src/java/org/apache/hama/bsp/GroomServer.java
===================================================================
--- src/java/org/apache/hama/bsp/GroomServer.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/GroomServer.java (작업 사본)
@@ -19,14 +19,20 @@
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -35,71 +41,87 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.ipc.HeartbeatResponse;
import org.apache.hama.ipc.InterTrackerProtocol;
public class GroomServer implements Runnable {
public static final Log LOG = LogFactory.getLog(GroomServer.class);
-
+
static {
Configuration.addDefaultResource("hama-default.xml");
+ Configuration.addDefaultResource("hama-site.xml");
}
+ Configuration conf;
+ // Constants
static enum State {
NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
};
-
- HamaConfiguration conf;
-
+
+ // Running States and its related things
volatile boolean running = true;
volatile boolean shuttingDown = false;
+ boolean justStarted = true;
boolean justInited = true;
-
- String groomserverName;
- String localHostname;
-
- InetSocketAddress masterAddr;
- InterTrackerProtocol jobClient;
- //BSPPeer bspPeer;
-
+ GroomServerStatus status = null;
short heartbeatResponseId = -1;
private volatile int heartbeatInterval = 3 * 1000;
-
- private LocalDirAllocator localDirAllocator;
+
+ // Attributes
+ String groomServerName;
+ String localHostname;
+ InetSocketAddress bspMasterAddr;
+ InterTrackerProtocol jobClient;
+
+ // Filesystem
+ //private LocalDirAllocator localDirAllocator;
Path systemDirectory = null;
FileSystem systemFS = null;
+
+ // Job
+ boolean acceptNewTasks = true;
+ private int failures;
+ private int maxCurrentTasks;
+ Map tasks = new HashMap();
+ /** Map from taskId -> TaskInProgress. */
+ Map runningTasks = null;
+ Map runningJobs = null;
- public GroomServer(HamaConfiguration conf) throws IOException {
+ public GroomServer(Configuration conf) throws IOException {
this.conf = conf;
- masterAddr = BSPMaster.getAddress(conf);
+ bspMasterAddr = BSPMaster.getAddress(conf);
- FileSystem local = FileSystem.getLocal(conf);
- this.localDirAllocator = new LocalDirAllocator("bspd.groom.local.dir");
-
- initialize();
+ //FileSystem local = FileSystem.getLocal(conf);
+ //this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
}
- synchronized void initialize() throws IOException {
+ public synchronized void initialize() throws IOException {
if (this.conf.get("slave.host.name") != null) {
this.localHostname = conf.get("slave.host.name");
}
if (localHostname == null) {
this.localHostname = DNS.getDefaultHost(conf.get(
- "bspd.groom.dns.interface", "default"), conf.get(
- "bspd.groom.dns.nameserver", "default"));
+ "bsp.dns.interface", "default"), conf.get(
+ "bsp.dns.nameserver", "default"));
}
- checkLocalDirs(conf.getStrings("bspd.groom.local.dir"));
+ //check local disk
+ checkLocalDirs(conf.getStrings("bsp.local.dir"));
deleteLocalFiles("groomserver");
- this.groomserverName = "groomd_" + localHostname;
- LOG.info("Starting tracker " + this.groomserverName);
+ // Clear out state tables
+ this.tasks.clear();
+ this.runningJobs = new TreeMap();
+ this.runningTasks = new LinkedHashMap();
+ this.acceptNewTasks = true;
+
+ this.groomServerName = "groomd_" + localHostname;
+ LOG.info("Starting tracker " + this.groomServerName);
DistributedCache.purgeCache(this.conf);
this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(
- InterTrackerProtocol.class, InterTrackerProtocol.versionID, masterAddr,
+ InterTrackerProtocol.class, InterTrackerProtocol.versionID, bspMasterAddr,
conf);
this.running = true;
// this.bspPeer = new BSPPeer(this.conf);
@@ -109,9 +131,12 @@
throws DiskErrorException {
boolean writable = false;
+ LOG.info(localDirs);
+
if (localDirs != null) {
for (int i = 0; i < localDirs.length; i++) {
try {
+ LOG.info(localDirs[i]);
DiskChecker.checkDir(new File(localDirs[i]));
writable = true;
} catch (DiskErrorException e) {
@@ -120,26 +145,30 @@
}
}
- if (!writable)
- throw new DiskErrorException("all local directories are not writable");
+ // if (!writable)
+ //throw new DiskErrorException("all local directories are not writable");
}
public String[] getLocalDirs() {
- return conf.getStrings("bspd.groom.local.dir");
+ return conf.getStrings("bsp.local.dir");
}
public void deleteLocalFiles() throws IOException {
String[] localDirs = getLocalDirs();
for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]));
+ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]),true);
}
}
public void deleteLocalFiles(String subdir) throws IOException {
+ try{
String[] localDirs = getLocalDirs();
for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir));
+ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),true);
}
+ } catch (NullPointerException e) {
+ LOG.info(e);
+ }
}
public void cleanupStorage() throws IOException {
@@ -172,14 +201,19 @@
systemFS = systemDirectory.getFileSystem(conf);
}
- // Send the heartbeat and process the jobtracker's directives
+ // Send the heartbeat and process the bspmaster's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
-
+ //
+ // The heartbeat got through successfully!
+ //
+ heartbeatResponseId = heartbeatResponse.getResponseId();
+
// Note the time when the heartbeat returned, use this to decide when to
// send the
// next heartbeat
lastHeartbeat = System.currentTimeMillis();
-
+
+ justStarted = false;
justInited = false;
} catch (InterruptedException ie) {
LOG.info("Interrupted. Closing down.");
@@ -203,13 +237,40 @@
}
private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+
+ //
+ // Check if the last heartbeat got through...
+ // if so then build the heartbeat information for the BSPMaster;
+ // else resend the previous status information.
+ //
+ if (status == null) {
+ synchronized (this) {
+ status = new GroomServerStatus(groomServerName, localHostname,
+ cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks);
+ }
+ } else {
+ LOG.info("Resending 'status' to '" + bspMasterAddr.getHostName() +
+ "' with reponseId '" + heartbeatResponseId+"'");
+ }
+
+ // TODO - Later, acceptNewTask is to be set by the status of groom server.
HeartbeatResponse heartbeatResponse = jobClient
- .heartbeat(heartbeatResponseId);
+ .heartbeat(status,justStarted,justInited,true,heartbeatResponseId);
return heartbeatResponse;
}
+
+ private synchronized List cloneAndResetRunningTaskStatuses() {
+ List result = new ArrayList(runningTasks.size());
+ for(TaskInProgress tip: runningTasks.values()) {
+ TaskStatus status = tip.getStatus();
+ result.add((TaskStatus)status.clone());
+ }
+ return result;
+ }
public void run() {
try {
+ initialize();
startCleanupThreads();
boolean denied = false;
while (running && !shuttingDown && !denied) {
@@ -225,7 +286,7 @@
}
} catch (Exception e) {
if (!shuttingDown) {
- LOG.info("Lost connection to GraphProcessor [" + masterAddr
+ LOG.info("Lost connection to GraphProcessor [" + bspMasterAddr
+ "]. Retrying...", e);
try {
Thread.sleep(5000);
@@ -245,7 +306,7 @@
initialize();
}
} catch (IOException ioe) {
- LOG.error("Got fatal exception while reinitializing TaskTracker: "
+ LOG.error("Got fatal exception while reinitializing GroomServer: "
+ StringUtils.stringifyException(ioe));
return;
}
@@ -273,11 +334,95 @@
}
try {
- HamaConfiguration conf = new HamaConfiguration();
- new GroomServer(conf).run();
+ Configuration conf = new HamaConfiguration();
+ conf.set("bsp.master.port", "40000");
+ conf.set("bsp.groom.port", "40020");
+ conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local");
+ conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system");
+ GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
+ startGroomServer(groom);
} catch (Throwable e) {
LOG.fatal(StringUtils.stringifyException(e));
System.exit(-1);
}
}
+
+ public static Thread startGroomServer(final GroomServer hrs) {
+ return startGroomServer(hrs,
+ "regionserver" + hrs.groomServerName);
+ }
+
+ public static Thread startGroomServer(final GroomServer hrs,
+ final String name) {
+ Thread t = new Thread(hrs);
+ t.setName(name);
+ t.start();
+ return t;
+ }
+
+ ///////////////////////////////////////////////////////
+ // TaskInProgress maintains all the info for a Task that
+ // lives at this GroomServer. It maintains the Task object,
+ // its TaskStatus, and the TaskRunner.
+ ///////////////////////////////////////////////////////
+ class TaskInProgress {
+ Task task;
+ volatile boolean done = false;
+ volatile boolean wasKilled = false;
+ private TaskStatus taskStatus;
+
+ public TaskInProgress(Task task, BSPJobContext job) {
+ this.task = task;
+ }
+
+ /**
+ */
+ public Task getTask() {
+ return task;
+ }
+
+ /**
+ */
+ public synchronized TaskStatus getStatus() {
+ return taskStatus;
+ }
+
+ /**
+ */
+ public TaskStatus.State getRunState() {
+ return taskStatus.getRunState();
+ }
+
+ public boolean wasKilled() {
+ return wasKilled;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return (obj instanceof TaskInProgress) &&
+ task.getTaskID().equals
+ (((TaskInProgress) obj).getTask().getTaskID());
+ }
+
+ @Override
+ public int hashCode() {
+ return task.getTaskID().hashCode();
+ }
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public static GroomServer constructGroomServer(Class extends GroomServer> groomServerClass,
+ final Configuration conf2) {
+ try {
+ Constructor extends GroomServer> c =
+ groomServerClass.getConstructor(Configuration.class);
+ return c.newInstance(conf2);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed construction of " +
+ "Master: " + groomServerClass.toString(), e);
+ }
+ }
}
Index: src/java/org/apache/hama/bsp/GroomServerAction.java
===================================================================
--- src/java/org/apache/hama/bsp/GroomServerAction.java (리비전 0)
+++ src/java/org/apache/hama/bsp/GroomServerAction.java (리비전 0)
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
+ *
+ */
+abstract class GroomServerAction implements Writable {
+
+ /**
+ * Ennumeration of various 'actions' that the {@link BSPMaster}
+ * directs the {@link GroomServer} to perform periodically.
+ *
+ */
+ public static enum ActionType {
+ /** Launch a new task. */
+ LAUNCH_TASK,
+
+ /** Kill a task. */
+ KILL_TASK,
+
+ /** Kill any tasks of this job and cleanup. */
+ KILL_JOB,
+
+ /** Reinitialize the groomserver. */
+ REINIT_GROOM,
+
+ /** Ask a task to save its output. */
+ COMMIT_TASK
+ };
+
+ /**
+ * A factory-method to create objects of given {@link ActionType}.
+ * @param actionType the {@link ActionType} of object to create.
+ * @return an object of {@link ActionType}.
+ */
+ public static GroomServerAction createAction(ActionType actionType) {
+ GroomServerAction action = null;
+
+ switch (actionType) {
+ case LAUNCH_TASK:
+ {
+ action = new LaunchTaskAction();
+ }
+ break;
+ case KILL_TASK:
+ {
+ action = new KillTaskAction();
+ }
+ break;
+ case KILL_JOB:
+ {
+ action = new KillJobAction();
+ }
+ break;
+ case REINIT_GROOM:
+ {
+ action = new ReinitTrackerAction();
+ }
+ break;
+ case COMMIT_TASK:
+ {
+ action = new CommitTaskAction();
+ }
+ break;
+ }
+
+ return action;
+ }
+
+ private ActionType actionType;
+
+ protected GroomServerAction(ActionType actionType) {
+ this.actionType = actionType;
+ }
+
+ /**
+ * Return the {@link ActionType}.
+ * @return the {@link ActionType}.
+ */
+ ActionType getActionId() {
+ return actionType;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeEnum(out, actionType);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ actionType = WritableUtils.readEnum(in, ActionType.class);
+ }
+}
Index: src/java/org/apache/hama/bsp/GroomServerManager.java
===================================================================
--- src/java/org/apache/hama/bsp/GroomServerManager.java (리비전 0)
+++ src/java/org/apache/hama/bsp/GroomServerManager.java (리비전 0)
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Manages information about the {@link GroomServer}s running on a cluster.
+ * This interface exits primarily to test the {@link BSPMaster}, and is not
+ * intended to be implemented by users.
+ */
+interface GroomServerManager {
+
+ /**
+ * @return A collection of the {@link GroomServerStatus} for the grooms
+ * being managed.
+ */
+ public Collection grooms();
+
+ /**
+ * @return The number of unique hosts running grooms.
+ */
+ public int getNumberOfUniqueHosts();
+
+ /**
+ * Get the current status of the cluster
+ * @param detailed if true then report groom names as well
+ * @return summary of the state of the cluster
+ */
+ public ClusterStatus getClusterStatus(boolean detailed);
+
+ /**
+ * Registers a {@link JobInProgressListener} for updates from this
+ * {@link GroomServerManager}.
+ * @param jobInProgressListener the {@link JobInProgressListener} to add
+ */
+ public void addJobInProgressListener(JobInProgressListener listener);
+
+ /**
+ * Unregisters a {@link JobInProgressListener} from this
+ * {@link GroomServerManager}.
+ * @param jobInProgressListener the {@link JobInProgressListener} to remove
+ */
+ public void removeJobInProgressListener(JobInProgressListener listener);
+
+ /**
+ * Return the current heartbeat interval that's used by {@link GroomServer}s.
+ *
+ * @return the heartbeat interval used by {@link GroomServer}s
+ */
+ public int getNextHeartbeatInterval();
+
+ /**
+ * Kill the job identified by jobid
+ *
+ * @param jobid
+ * @throws IOException
+ */
+ public void killJob(BSPJobID jobid)
+ throws IOException;
+
+ /**
+ * Obtain the job object identified by jobid
+ *
+ * @param jobid
+ * @return jobInProgress object
+ */
+ public JobInProgress getJob(BSPJobID jobid);
+
+ /**
+ * Initialize the Job
+ *
+ * @param job JobInProgress object
+ */
+ public void initJob(JobInProgress job);
+
+ /**
+ * Fail a job.
+ *
+ * @param job JobInProgress object
+ */
+ public void failJob(JobInProgress job);
+}
Index: src/java/org/apache/hama/bsp/GroomServerStatus.java
===================================================================
--- src/java/org/apache/hama/bsp/GroomServerStatus.java (리비전 0)
+++ src/java/org/apache/hama/bsp/GroomServerStatus.java (리비전 0)
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ *
+ */
+public class GroomServerStatus implements Writable {
+
+ static {
+ WritableFactories.setFactory
+ (GroomServerStatus.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new GroomServerStatus(); }
+ });
+ }
+
+ String groomName;
+ String host;
+ int failures;
+ List taskReports;
+
+ volatile long lastSeen;
+ private int maxTasks;
+
+ public GroomServerStatus() {
+ taskReports = new ArrayList();
+ }
+
+ public GroomServerStatus(String groomName, String host,
+ List taskReports, int failures, int maxTasks) {
+ this.groomName = groomName;
+ this.host = host;
+
+ this.taskReports = new ArrayList(taskReports);
+ this.failures = failures;
+ this.maxTasks = maxTasks;
+ }
+
+ public String getGroomName() {
+ return groomName;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * Get the current tasks at the GroomServer.
+ * Tasks are tracked by a {@link TaskStatus} object.
+ *
+ * @return a list of {@link TaskStatus} representing
+ * the current tasks at the GroomServer.
+ */
+ public List getTaskReports() {
+ return taskReports;
+ }
+
+ public int getFailures() {
+ return failures;
+ }
+
+ public long getLastSeen() {
+ return lastSeen;
+ }
+
+ public void setLastSeen(long lastSeen) {
+ this.lastSeen = lastSeen;
+ }
+
+ public int getMaxTasks() {
+ return maxTasks;
+ }
+
+ /**
+ * Return the current MapTask count
+ */
+ public int countTasks() {
+ int taskCount = 0;
+ for (Iterator it = taskReports.iterator(); it.hasNext();) {
+ TaskStatus ts = it.next();
+ TaskStatus.State state = ts.getRunState();
+ if(state == TaskStatus.State.RUNNING ||
+ state == TaskStatus.State.UNASSIGNED) {
+ taskCount++;
+ }
+ }
+
+ return taskCount;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.groomName = Text.readString(in);
+ this.host = Text.readString(in);
+ this.failures = in.readInt();
+ this.maxTasks = in.readInt();
+ taskReports.clear();
+ int numTasks = in.readInt();
+
+ TaskStatus status;
+ for (int i = 0; i < numTasks; i++) {
+ status = new TaskStatus();
+ status.readFields(in);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, groomName);
+ Text.writeString(out, host);
+ out.writeInt(failures);
+ out.writeInt(maxTasks);
+ out.writeInt(taskReports.size());
+ for(TaskStatus taskStatus : taskReports) {
+ taskStatus.write(out);
+ }
+ }
+
+}
Index: src/java/org/apache/hama/bsp/HeartbeatResponse.java
===================================================================
--- src/java/org/apache/hama/bsp/HeartbeatResponse.java (리비전 0)
+++ src/java/org/apache/hama/bsp/HeartbeatResponse.java (리비전 0)
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class HeartbeatResponse implements Writable, Configurable {
+ private Configuration conf;
+ short responseId;
+ private GroomServerAction [] actions;
+
+ public HeartbeatResponse() {}
+
+ public HeartbeatResponse(short responseId, GroomServerAction [] actions) {
+ this.responseId = responseId;
+ this.actions = actions;
+ }
+
+ public void setResponseId(short responseId) {
+ this.responseId = responseId;
+ }
+
+ public short getResponseId() {
+ return responseId;
+ }
+
+ public void setActions(GroomServerAction [] actions) {
+ this.actions = actions;
+ }
+
+ public GroomServerAction [] getActions() {
+ return actions;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.responseId = in.readShort();
+ int length = WritableUtils.readVInt(in);
+ if (length > 0) {
+ actions = new GroomServerAction[length];
+ for(int i=0; i< length; ++i) {
+ GroomServerAction.ActionType actionType =
+ WritableUtils.readEnum(in, GroomServerAction.ActionType.class);
+ actions[i] = GroomServerAction.createAction(actionType);
+ actions[i].readFields(in);
+ }
+ } else {
+ actions = null;
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeShort(this.responseId);
+ if(actions == null) {
+ WritableUtils.writeVInt(out, 0);
+ } else {
+ WritableUtils.writeVInt(out, actions.length);
+ for(GroomServerAction action: actions) {
+ WritableUtils.writeEnum(out, action.getActionId());
+ action.write(out);
+ }
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+}
Index: src/java/org/apache/hama/bsp/JobChangeEvent.java
===================================================================
--- src/java/org/apache/hama/bsp/JobChangeEvent.java (리비전 0)
+++ src/java/org/apache/hama/bsp/JobChangeEvent.java (리비전 0)
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * {@link JobChangeEvent} is used to capture state changes in a job. A job can
+ * change its state w.r.t priority, progress, run-state etc.
+ */
+abstract class JobChangeEvent {
+ private JobInProgress jip;
+
+ JobChangeEvent(JobInProgress jip) {
+ this.jip = jip;
+ }
+
+ /**
+ * Get the job object for which the change is reported
+ */
+ JobInProgress getJobInProgress() {
+ return jip;
+ }
+}
Index: src/java/org/apache/hama/bsp/JobClient.java
===================================================================
--- src/java/org/apache/hama/bsp/JobClient.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/JobClient.java (작업 사본)
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPMaster;
-import org.apache.hama.ipc.JobSubmissionProtocol;
-
-public class JobClient extends Configured {
- private static final Log LOG = LogFactory.getLog(JobClient.class);
-
- public static enum TaskStatusFilter {
- NONE, KILLED, FAILED, SUCCEEDED, ALL
- }
-
- static {
- Configuration.addDefaultResource("hama-default.xml");
- }
-
- @SuppressWarnings("unused")
- private JobSubmissionProtocol jobSubmitClient = null;
-
- public JobClient() {
- }
-
- public JobClient(HamaConfiguration conf) throws IOException {
- setConf(conf);
- init(conf);
- }
-
- public void init(HamaConfiguration conf) throws IOException {
- String tracker = conf.get("hama.master.address", "local");
- this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
- }
-
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
- conf, JobSubmissionProtocol.class));
- }
-}
Index: src/java/org/apache/hama/bsp/JobContext.java
===================================================================
--- src/java/org/apache/hama/bsp/JobContext.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/JobContext.java (작업 사본)
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hama.bsp;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hama.graph.InputFormat;
-import org.apache.hama.graph.OutputFormat;
-
-/**
- * A read-only view of the job that is provided to the tasks while they are
- * running.
- */
-public class JobContext {
- // Put all of the attribute names in here so that Job and JobContext are
- // consistent.
- protected static final String INPUT_FORMAT_CLASS_ATTR = "angrapa.inputformat.class";
- protected static final String WALKER_CLASS_ATTR = "angrapa.walker.class";
- protected static final String OUTPUT_FORMAT_CLASS_ATTR = "angrapa.outputformat.class";
-
- protected final Configuration conf;
- private final JobID jobId;
-
- public JobContext(Configuration conf, JobID jobId) {
- this.conf = conf;
- this.jobId = jobId;
- }
-
- public Configuration getConfiguration() {
- return conf;
- }
-
- public JobID getJobID() {
- return jobId;
- }
-
- public Path getWorkingDirectory() throws IOException {
- String name = conf.get("angrapa.working.dir");
-
- if (name != null) {
- return new Path(name);
- } else {
- try {
- Path dir = FileSystem.get(conf).getWorkingDirectory();
- conf.set("angrapa.working.dir", dir.toString());
- return dir;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public Class> getOutputKeyClass() {
- return conf.getClass("angrapa.output.key.class", LongWritable.class,
- Object.class);
- }
-
- public Class> getOutputValueClass() {
- return conf
- .getClass("angrapa.output.value.class", Text.class, Object.class);
- }
-
- public String getJobName() {
- return conf.get("angrapa.job.name", "");
- }
-
- @SuppressWarnings("unchecked")
- public Class extends InputFormat, ?>> getInputFormatClass()
- throws ClassNotFoundException {
- return (Class extends InputFormat, ?>>) conf.getClass(
- INPUT_FORMAT_CLASS_ATTR, InputFormat.class); // TODO: To be corrected
- // to an implemented class
- }
-
- @SuppressWarnings("unchecked")
- public Class extends OutputFormat, ?>> getOutputFormatClass()
- throws ClassNotFoundException {
- return (Class extends OutputFormat, ?>>) conf.getClass(
- OUTPUT_FORMAT_CLASS_ATTR, OutputFormat.class); // TODO: To be corrected
- // to an implemented
- // class
- }
-
- public RawComparator> getSortComparator() {
- return null;
- }
-
- public String getJar() {
- return conf.get("walker.jar");
- }
-}
Index: src/java/org/apache/hama/bsp/JobID.java
===================================================================
--- src/java/org/apache/hama/bsp/JobID.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/JobID.java (작업 사본)
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.DataInput;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.io.Text;
-
-public class JobID extends ID implements Comparable {
- protected static final String JOB = "job";
- private final Text jtIdentifier;
-
- protected static final NumberFormat idFormat = NumberFormat.getInstance();
- static {
- idFormat.setGroupingUsed(false);
- idFormat.setMinimumIntegerDigits(4);
- }
-
- public JobID(String jtIdentifier, int id) {
- super(id);
- this.jtIdentifier = new Text(jtIdentifier);
- }
-
- public JobID() {
- jtIdentifier = new Text();
- }
-
- public String getJtIdentifier() {
- return jtIdentifier.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if (!super.equals(o))
- return false;
-
- JobID that = (JobID) o;
- return this.jtIdentifier.equals(that.jtIdentifier);
- }
-
- @Override
- public int compareTo(ID o) {
- JobID that = (JobID) o;
- int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
- if (jtComp == 0) {
- return this.id - that.id;
- } else
- return jtComp;
- }
-
- public StringBuilder appendTo(StringBuilder builder) {
- builder.append(SEPARATOR);
- builder.append(jtIdentifier);
- builder.append(SEPARATOR);
- builder.append(idFormat.format(id));
- return builder;
- }
-
- @Override
- public int hashCode() {
- return jtIdentifier.hashCode() + id;
- }
-
- @Override
- public String toString() {
- return appendTo(new StringBuilder(JOB)).toString();
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- this.jtIdentifier.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- jtIdentifier.write(out);
- }
-
- public static JobID forName(String str) throws IllegalArgumentException {
- if (str == null)
- return null;
- try {
- String[] parts = str.split("_");
- if (parts.length == 3) {
- if (parts[0].equals(JOB)) {
- return new JobID(parts[1], Integer.parseInt(parts[2]));
- }
- }
- } catch (Exception ex) {
- }
- throw new IllegalArgumentException("JobId string : " + str
- + " is not properly formed");
- }
-}
Index: src/java/org/apache/hama/bsp/JobInProgress.java
===================================================================
--- src/java/org/apache/hama/bsp/JobInProgress.java (리비전 0)
+++ src/java/org/apache/hama/bsp/JobInProgress.java (리비전 0)
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/*************************************************************
+ * JobInProgress maintains all the info for keeping a Job on the straight and
+ * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
+ * tables for doing bookkeeping of its Tasks.
+ * ***********************************************************
+ */
+class JobInProgress {
+ /**
+ * Used when the a kill is issued to a job which is initializing.
+ */
+ static class KillInterruptedException extends InterruptedException {
+ private static final long serialVersionUID = 1L;
+
+ public KillInterruptedException(String msg) {
+ super(msg);
+ }
+ }
+
+ static final Log LOG = LogFactory.getLog(JobInProgress.class);
+
+ JobProfile profile;
+ JobStatus status;
+ Path jobFile = null;
+ Path localJobFile = null;
+ Path localJarFile = null;
+
+ long startTime;
+ long launchTime;
+ long finishTime;
+
+ // private LocalFileSystem localFs;
+ private BSPJobID jobId;
+
+ final BSPMaster master;
+
+ public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
+ throws IOException {
+ this.jobId = jobId;
+
+ this.master = master;
+ this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
+ this.startTime = System.currentTimeMillis();
+ status.setStartTime(startTime);
+ // this.localFs = FileSystem.getLocal(conf);
+
+ this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+ + ".xml");
+ this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+ + ".jar");
+ Path jobDir = master.getSystemDirectoryForJob(jobId);
+ FileSystem fs = jobDir.getFileSystem(conf);
+ jobFile = new Path(jobDir, "job.xml");
+ fs.copyToLocalFile(jobFile, localJobFile);
+ BSPJobContext job = new BSPJobContext(localJobFile, jobId);
+
+ System.out.println("user:" + job.getUser());
+ System.out.println("jobId:" + jobId);
+ System.out.println("jobFile:" + jobFile.toString());
+ System.out.println("jobName:" + job.getJobName());
+
+ this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
+ .getJobName());
+
+ String jarFile = job.getJar();
+ if (jarFile != null) {
+ fs.copyToLocalFile(new Path(jarFile), localJarFile);
+ }
+ }
+
+ // ///////////////////////////////////////////////////
+ // Accessors for the JobInProgress
+ // ///////////////////////////////////////////////////
+ public JobProfile getProfile() {
+ return profile;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ public synchronized long getLaunchTime() {
+ return launchTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ /**
+ * @return The JobID of this JobInProgress.
+ */
+ public BSPJobID getJobID() {
+ return jobId;
+ }
+
+ public String toString() {
+ return "jobName:" + profile.getJobName() + "\n" + "submit user:"
+ + profile.getUser() + "\n" + "JobId:" + jobId + "\n" + "JobFile:"
+ + jobFile + "\n";
+ }
+
+ // ///////////////////////////////////////////////////
+ // Create/manage tasks
+ // ///////////////////////////////////////////////////
+ public synchronized Task obtainNewTask(GroomServerStatus status,
+ int clusterSize, int numUniqueHosts) {
+
+ return null;
+ }
+}
Index: src/java/org/apache/hama/bsp/JobInProgressListener.java
===================================================================
--- src/java/org/apache/hama/bsp/JobInProgressListener.java (리비전 0)
+++ src/java/org/apache/hama/bsp/JobInProgressListener.java (리비전 0)
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * A listener for changes in a {@link JobInProgress job}'s lifecycle in the
+ * {@link BSPMaster}.
+ */
+abstract class JobInProgressListener {
+
+ /**
+ * Invoked when a new job has been added to the {@link BSPMaster}.
+ *
+ * @param job The added job.
+ * @throws IOException
+ */
+ public abstract void jobAdded(JobInProgress job) throws IOException;
+
+ /**
+ * Invoked when a job has been removed from the {@link BSPMaster}.
+ *
+ * @param job The removed job.
+ */
+ public abstract void jobRemoved(JobInProgress job);
+
+ /**
+ * Invoked when a job has been updated in the {@link BSPMaster}. This change
+ * in the job is tracker using {@link JobChangeEvent}.
+ *
+ * @param event the event that tracks the change
+ */
+ public abstract void jobUpdated(JobChangeEvent event);
+}
Index: src/java/org/apache/hama/bsp/JobProfile.java
===================================================================
--- src/java/org/apache/hama/bsp/JobProfile.java (리비전 0)
+++ src/java/org/apache/hama/bsp/JobProfile.java (리비전 0)
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**************************************************
+ * A JobProfile tracks job's status
+ *
+ **************************************************/
+public class JobProfile implements Writable {
+
+ static { // register a ctor
+ WritableFactories.setFactory(JobProfile.class, new WritableFactory() {
+ public Writable newInstance() {
+ return new JobProfile();
+ }
+ });
+ }
+
+ String user;
+ final BSPJobID jobid;
+ String jobFile;
+ String name;
+
+ /**
+ * Construct an empty {@link JobProfile}.
+ */
+ public JobProfile() {
+ jobid = new BSPJobID();
+ }
+
+ /**
+ * Construct a {@link JobProfile} the userid, jobid, job config-file,
+ * job-details url and job name.
+ *
+ * @param user userid of the person who submitted the job.
+ * @param jobid id of the job.
+ * @param jobFile job configuration file.
+ * @param url link to the web-ui for details of the job.
+ * @param name user-specified job name.
+ * @param queueName name of the queue to which the job is submitted
+ */
+ public JobProfile(String user, BSPJobID jobid, String jobFile, String name) {
+ this.user = user;
+ this.jobid = jobid;
+ this.jobFile = jobFile;
+ this.name = name;
+ }
+
+ /**
+ * Get the user id.
+ */
+ public String getUser() {
+ return user;
+ }
+
+ /**
+ * Get the job id.
+ */
+ public BSPJobID getJobID() {
+ return jobid;
+ }
+
+ /**
+ * Get the configuration file for the job.
+ */
+ public String getJobFile() {
+ return jobFile;
+ }
+
+ /**
+ * Get the user-specified job name.
+ */
+ public String getJobName() {
+ return name;
+ }
+
+ // /////////////////////////////////////
+ // Writable
+ // /////////////////////////////////////
+ public void write(DataOutput out) throws IOException {
+ jobid.write(out);
+ Text.writeString(out, jobFile);
+ Text.writeString(out, user);
+ Text.writeString(out, name);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ jobid.readFields(in);
+ this.jobFile = Text.readString(in);
+ this.user = Text.readString(in);
+ this.name = Text.readString(in);
+ }
+}
Index: src/java/org/apache/hama/bsp/JobStatus.java
===================================================================
--- src/java/org/apache/hama/bsp/JobStatus.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/JobStatus.java (작업 사본)
@@ -43,7 +43,7 @@
public static final int PREP = 4;
public static final int KILLED = 5;
- private JobID jobid;
+ private BSPJobID jobid;
private float progress;
private float cleanupProgress;
private float setupProgress;
@@ -54,16 +54,16 @@
public JobStatus() {
}
- public JobStatus(JobID jobid, float progress, int runState) {
+ public JobStatus(BSPJobID jobid, float progress, int runState) {
this(jobid, progress, 0.0f, runState);
}
- public JobStatus(JobID jobid, float progress, float cleanupProgress,
+ public JobStatus(BSPJobID jobid, float progress, float cleanupProgress,
int runState) {
this(jobid, 0.0f, progress, cleanupProgress, runState);
}
- public JobStatus(JobID jobid, float setupProgress, float progress,
+ public JobStatus(BSPJobID jobid, float setupProgress, float progress,
float cleanupProgress, int runState) {
this.jobid = jobid;
this.setupProgress = setupProgress;
@@ -72,7 +72,7 @@
this.runState = runState;
}
- public JobID getJobID() {
+ public BSPJobID getJobID() {
return jobid;
}
@@ -148,7 +148,7 @@
}
public synchronized void readFields(DataInput in) throws IOException {
- this.jobid = new JobID();
+ this.jobid = new BSPJobID();
jobid.readFields(in);
this.setupProgress = in.readFloat();
this.progress = in.readFloat();
Index: src/java/org/apache/hama/bsp/KillJobAction.java
===================================================================
--- src/java/org/apache/hama/bsp/KillJobAction.java (리비전 0)
+++ src/java/org/apache/hama/bsp/KillJobAction.java (리비전 0)
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup
+ * resources.
+ *
+ */
+class KillJobAction extends GroomServerAction {
+ final BSPJobID jobId;
+
+ public KillJobAction() {
+ super(ActionType.KILL_JOB);
+ jobId = new BSPJobID();
+ }
+
+ public KillJobAction(BSPJobID jobId) {
+ super(ActionType.KILL_JOB);
+ this.jobId = jobId;
+ }
+
+ public BSPJobID getJobID() {
+ return jobId;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ jobId.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ jobId.readFields(in);
+ }
+
+}
Index: src/java/org/apache/hama/bsp/KillTaskAction.java
===================================================================
--- src/java/org/apache/hama/bsp/KillTaskAction.java (리비전 0)
+++ src/java/org/apache/hama/bsp/KillTaskAction.java (리비전 0)
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ *
+ */
+class KillTaskAction extends GroomServerAction {
+ final TaskAttemptID taskId;
+
+ public KillTaskAction() {
+ super(ActionType.KILL_TASK);
+ taskId = new TaskAttemptID();
+ }
+
+ public KillTaskAction(TaskAttemptID taskId) {
+ super(ActionType.KILL_TASK);
+ this.taskId = taskId;
+ }
+
+ public TaskAttemptID getTaskID() {
+ return taskId;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskId.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskId.readFields(in);
+ }
+}
Index: src/java/org/apache/hama/bsp/LaunchTaskAction.java
===================================================================
--- src/java/org/apache/hama/bsp/LaunchTaskAction.java (리비전 0)
+++ src/java/org/apache/hama/bsp/LaunchTaskAction.java (리비전 0)
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ *
+ */
+class LaunchTaskAction extends GroomServerAction {
+ private Task task;
+
+ public LaunchTaskAction() {
+ super(ActionType.LAUNCH_TASK);
+ }
+
+ public LaunchTaskAction(Task task) {
+ super(ActionType.LAUNCH_TASK);
+ this.task = task;
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ task.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ task = new Task();
+ task.readFields(in);
+ }
+
+}
Index: src/java/org/apache/hama/bsp/LocalBSPCluster.java
===================================================================
--- src/java/org/apache/hama/bsp/LocalBSPCluster.java (리비전 0)
+++ src/java/org/apache/hama/bsp/LocalBSPCluster.java (리비전 0)
@@ -0,0 +1,97 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.util.ClusterUtil;
+
+public class LocalBSPCluster {
+ public static final Log LOG = LogFactory.getLog(LocalBSPCluster.class);
+
+ private final BSPMaster master;
+ private final List groomThreads;
+ private final Configuration conf;
+ private Class extends GroomServer> groomServerClass;
+ private final static int DEFAULT_NO = 1;
+
+ public LocalBSPCluster(final Configuration conf) throws IOException, InterruptedException {
+ this(conf, DEFAULT_NO);
+ }
+
+ public LocalBSPCluster(final Configuration conf, final int noGroomServers)
+ throws IOException, InterruptedException {
+ this(conf, noGroomServers, BSPMaster.class,
+ getGroomServerImplementation(conf));
+ }
+
+ @SuppressWarnings("unchecked")
+ public LocalBSPCluster(final Configuration conf, final int noGroomServers,
+ final Class extends BSPMaster> masterClass,
+ final Class extends GroomServer> groomServerClass) throws IOException, InterruptedException {
+ conf.set("bsp.master.port", "40000");
+ conf.set("bsp.groom.port", "40020");
+ conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local");
+ conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system");
+ this.conf = conf;
+
+ // Create the master
+ this.master = BSPMaster.constructMaster(masterClass, conf);
+ this.groomThreads = new CopyOnWriteArrayList();
+ this.groomServerClass = (Class extends GroomServer>) conf.getClass(
+ HConstants.REGION_SERVER_IMPL, groomServerClass);
+ for (int i = 0; i < noGroomServers; i++) {
+ addGroomServer(i);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Class extends GroomServer> getGroomServerImplementation(
+ final Configuration conf) {
+ return (Class extends GroomServer>) conf.getClass(
+ HConstants.REGION_SERVER_IMPL, GroomServer.class);
+ }
+
+ public ClusterUtil.GroomServerThread addGroomServer(final int index)
+ throws IOException {
+ LOG.info("Adding Groom Server");
+ ClusterUtil.GroomServerThread rst = ClusterUtil
+ .createGroomServerThread(this.conf, this.groomServerClass, index);
+ this.groomThreads.add(rst);
+ return rst;
+ }
+
+ public void startup() {
+ try {
+ ClusterUtil.startup(this.master, this.groomThreads, conf);
+ } catch (IOException e) {
+ LOG.info(e);
+ } catch (InterruptedException e) {
+ LOG.info(e);
+ }
+ }
+
+ public void shutdown() {
+ ClusterUtil.shutdown(this.master, this.groomThreads, conf);
+ }
+
+ /**
+ * Test things basically work.
+ *
+ * @param args
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static void main(String[] args) throws IOException, InterruptedException {
+ StringUtils.startupShutdownMessage(LocalBSPCluster.class, args, LOG);
+ HamaConfiguration conf = new HamaConfiguration();
+ LocalBSPCluster cluster = new LocalBSPCluster(conf);
+ cluster.startup();
+ }
+}
Index: src/java/org/apache/hama/bsp/ReinitTrackerAction.java
===================================================================
--- src/java/org/apache/hama/bsp/ReinitTrackerAction.java (리비전 0)
+++ src/java/org/apache/hama/bsp/ReinitTrackerAction.java (리비전 0)
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to reinitialize itself.
+ *
+ */
+class ReinitTrackerAction extends GroomServerAction {
+
+ public ReinitTrackerAction() {
+ super(ActionType.REINIT_GROOM);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ }
+
+}
Index: src/java/org/apache/hama/bsp/RunningJob.java
===================================================================
--- src/java/org/apache/hama/bsp/RunningJob.java (리비전 0)
+++ src/java/org/apache/hama/bsp/RunningJob.java (리비전 0)
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * RunningJob is the user-interface to query for details on a
+ * running BSP job.
+ *
+ *
+ * Clients can get hold of RunningJob via the {@link BSPJobClient}
+ * and then query the running-job for details such as name, configuration,
+ * progress etc.
+ *
+ *
+ * @see BSPJobClient
+ */
+public interface RunningJob {
+ /**
+ * Get the job identifier.
+ *
+ * @return the job identifier.
+ */
+ public BSPJobID getID();
+
+ /**
+ * Get the name of the job.
+ *
+ * @return the name of the job.
+ */
+ public String getJobName();
+
+ /**
+ * Get the path of the submitted job configuration.
+ *
+ * @return the path of the submitted job configuration.
+ */
+ public String getJobFile();
+
+ /**
+ * Get the progress of the job's tasks, as a float between 0.0 and 1.0.
+ * When all bsp tasks have completed, the function returns 1.0.
+ *
+ * @return the progress of the job's tasks.
+ * @throws IOException
+ */
+ public float progress() throws IOException;
+
+ /**
+ * Check if the job is finished or not. This is a non-blocking call.
+ *
+ * @return true if the job is complete, else false.
+ * @throws IOException
+ */
+ public boolean isComplete() throws IOException;
+
+ /**
+ * Check if the job completed successfully.
+ *
+ * @return true if the job succeeded, else false.
+ * @throws IOException
+ */
+ public boolean isSuccessful() throws IOException;
+
+ /**
+ * Blocks until the job is complete.
+ *
+ * @throws IOException
+ */
+ public void waitForCompletion() throws IOException;
+
+ /**
+ * Returns the current state of the Job. {@link JobStatus}
+ *
+ * @throws IOException
+ */
+ public int getJobState() throws IOException;
+
+ /**
+ * Kill the running job. Blocks until all job tasks have been killed as well.
+ * If the job is no longer running, it simply returns.
+ *
+ * @throws IOException
+ */
+ public void killJob() throws IOException;
+
+ /**
+ * Kill indicated task attempt.
+ *
+ * @param taskId the id of the task to be terminated.
+ * @param shouldFail if true the task is failed and added to failed tasks
+ * list, otherwise it is just killed, w/o affecting job failure
+ * status.
+ * @throws IOException
+ */
+ public void killTask(TaskAttemptID taskId, boolean shouldFail)
+ throws IOException;
+}
Index: src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
===================================================================
--- src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (리비전 0)
+++ src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (리비전 0)
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class SimpleTaskScheduler extends TaskScheduler {
+ private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
+ List jobQueue;
+
+ public SimpleTaskScheduler() {
+ jobQueue = new ArrayList();
+ }
+
+ @Override
+ public void addJob(JobInProgress job) {
+ LOG.debug(">> Added a job (" + job + ") to scheduler (remaining jobs: "
+ + (jobQueue.size() + 1) + ")");
+ jobQueue.add(job);
+ }
+
+ @Override
+ public Collection getJobs() {
+ return jobQueue;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @seeorg.apache.hama.bsp.TaskScheduler#assignTasks(org.apache.hama.bsp.
+ * GroomServerStatus)
+ */
+ @Override
+ public List assignTasks(GroomServerStatus groomStatus)
+ throws IOException {
+ ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
+
+ final int numGroomServers = clusterStatus.getGroomServers();
+ // final int clusterTaskCapacity = clusterStatus.getMaxTasks();
+
+ //
+ // Get task counts for the current groom.
+ //
+ // final int groomTaskCapacity = groom.getMaxTasks();
+ final int groomRunningTasks = groomStatus.countTasks();
+
+ // Assigned tasks
+ List assignedTasks = new ArrayList();
+
+ // Task task = null;
+ if (groomRunningTasks == 0) {
+ // TODO - Each time a job is submitted in BSPMaster, add a JobInProgress
+ // instance to the scheduler.
+ synchronized (jobQueue) {
+ for (JobInProgress job : jobQueue) {
+ if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+ continue;
+ }
+
+ Task t = null;
+
+ t = job.obtainNewTask(groomStatus, numGroomServers,
+ groomServerManager.getNumberOfUniqueHosts());
+ if (t != null) {
+ assignedTasks.add(t);
+ break; // TODO - Now, simple scheduler assigns only one task to
+ // each groom. Later, it will be improved for scheduler to
+ // assign one or more tasks to each groom according to
+ // its capacity.
+ }
+ }
+ }
+ }
+
+ return assignedTasks;
+ }
+}
Index: src/java/org/apache/hama/bsp/Task.java
===================================================================
--- src/java/org/apache/hama/bsp/Task.java (리비전 0)
+++ src/java/org/apache/hama/bsp/Task.java (리비전 0)
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ *
+ */
+public class Task implements Writable {
+ ////////////////////////////////////////////
+ // Fields
+ ////////////////////////////////////////////
+ private String jobFile;
+ private TaskAttemptID taskId;
+ private int partition;
+
+ protected LocalDirAllocator lDirAlloc;
+ /**
+ *
+ */
+ public Task() {
+ taskId = new TaskAttemptID();
+ }
+
+ public Task(String jobFile, TaskAttemptID taskId, int partition) {
+ this.jobFile = jobFile;
+ this.taskId = taskId;
+
+ this.partition = partition;
+ }
+
+ ////////////////////////////////////////////
+ // Accessors
+ ////////////////////////////////////////////
+ public void setJobFile(String jobFile) {
+ this.jobFile = jobFile;
+ }
+
+ public String getJobFile() {
+ return jobFile;
+ }
+
+ public TaskAttemptID getTaskID() {
+ return taskId;
+ }
+
+ /**
+ * Get the job name for this task.
+ * @return the job name
+ */
+ public BSPJobID getJobID() {
+ return taskId.getJobID();
+ }
+
+ /**
+ * Get the index of this task within the job.
+ * @return the integer part of the task id
+ */
+ public int getPartition() {
+ return partition;
+ }
+
+ @Override
+ public String toString() {
+ return taskId.toString();
+ }
+
+ ////////////////////////////////////////////
+ // Writable
+ ////////////////////////////////////////////
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, jobFile);
+ taskId.write(out);
+ out.writeInt(partition);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ jobFile = Text.readString(in);
+ taskId.readFields(in);
+ partition = in.readInt();
+ }
+}
Index: src/java/org/apache/hama/bsp/TaskAttemptContext.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskAttemptContext.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/TaskAttemptContext.java (작업 사본)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.bsp;
import java.io.IOException;
@@ -26,7 +25,7 @@
/**
* The context for task attempts.
*/
-public class TaskAttemptContext extends JobContext implements Progressable {
+public class TaskAttemptContext extends BSPJobContext implements Progressable {
private final TaskAttemptID taskId;
private String status = "";
Index: src/java/org/apache/hama/bsp/TaskAttemptID.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskAttemptID.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/TaskAttemptID.java (작업 사본)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.bsp;
import java.io.DataInput;
@@ -43,7 +42,7 @@
taskId = new TaskID();
}
- public JobID getJobID() {
+ public BSPJobID getJobID() {
return taskId.getJobID();
}
Index: src/java/org/apache/hama/bsp/TaskID.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskID.java (리비전 943815)
+++ src/java/org/apache/hama/bsp/TaskID.java (작업 사본)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.bsp;
import java.io.DataInput;
@@ -31,10 +30,10 @@
idFormat.setMinimumIntegerDigits(6);
}
- private JobID jobId;
+ private BSPJobID jobId;
private boolean isMatrixTask;
- public TaskID(JobID jobId, boolean isMatrixTask, int id) {
+ public TaskID(BSPJobID jobId, boolean isMatrixTask, int id) {
super(id);
if (jobId == null) {
throw new IllegalArgumentException("jobId cannot be null");
@@ -44,15 +43,15 @@
}
public TaskID(String jtIdentifier, int jobId, boolean isGraphTask, int id) {
- this(new JobID(jtIdentifier, jobId), isGraphTask, id);
+ this(new BSPJobID(jtIdentifier, jobId), isGraphTask, id);
}
public TaskID() {
- jobId = new JobID();
+ jobId = new BSPJobID();
}
- /** Returns the {@link JobID} object that this tip belongs to */
- public JobID getJobID() {
+ /** Returns the {@link BSPJobID} object that this tip belongs to */
+ public BSPJobID getJobID() {
return jobId;
}
Index: src/java/org/apache/hama/bsp/TaskInProgress.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 0)
+++ src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 0)
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ *
+ */
+class TaskInProgress {
+ public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
+
+ private BSPJobContext context;
+
+ // Constants
+ static final int MAX_TASK_EXECS = 1;
+ int maxTaskAttempts = 4;
+
+ // Job Meta
+ private String jobFile = null;
+ private int partition;
+ private BSPMaster bspMaster;
+ private TaskID id;
+ private JobInProgress job;
+ private int completes = 0;
+
+ // Status
+ // private double progress = 0;
+ // private String state = "";
+ private long startTime = 0;
+
+ // The 'next' usable taskid of this tip
+ int nextTaskId = 0;
+
+ // The taskid that took this TIP to SUCCESS
+ // private TaskAttemptID successfulTaskId;
+
+ // The first taskid of this tip
+ private TaskAttemptID firstTaskId;
+
+ // Map from task Id -> GroomServer Id, contains tasks that are
+ // currently runnings
+ private TreeMap activeTasks = new TreeMap();
+ // All attempt Ids of this TIP
+ // private TreeSet tasks = new TreeSet();
+ /**
+ * Map from taskId -> TaskStatus
+ */
+ private TreeMap taskStatuses = new TreeMap();
+
+ public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
+ BSPJobContext context, JobInProgress job, int partition) {
+ this.jobFile = jobFile;
+ this.bspMaster = master;
+ this.job = job;
+ this.context = context;
+ this.partition = partition;
+ }
+
+ // //////////////////////////////////
+ // Accessors
+ // //////////////////////////////////
+ /**
+ * Return the start time
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Return the parent job
+ */
+ public JobInProgress getJob() {
+ return job;
+ }
+
+ public TaskID getTIPId() {
+ return this.id;
+ }
+
+ /**
+ * Is the Task associated with taskid is the first attempt of the tip?
+ *
+ * @param taskId
+ * @return Returns true if the Task is the first attempt of the tip
+ */
+ public boolean isFirstAttempt(TaskAttemptID taskId) {
+ return firstTaskId == null ? false : firstTaskId.equals(taskId);
+ }
+
+ /**
+ * Is this tip currently running any tasks?
+ *
+ * @return true if any tasks are running
+ */
+ public boolean isRunning() {
+ return !activeTasks.isEmpty();
+ }
+
+ /**
+ * Is this tip complete?
+ *
+ * @return true if the tip is complete, else false
+ */
+ public synchronized boolean isComplete() {
+ return (completes > 0);
+ }
+}
Index: src/java/org/apache/hama/bsp/TaskScheduler.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskScheduler.java (리비전 0)
+++ src/java/org/apache/hama/bsp/TaskScheduler.java (리비전 0)
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer}
+ * s.
+ */
+abstract class TaskScheduler implements Configurable {
+
+ protected Configuration conf;
+ protected GroomServerManager groomServerManager;
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public synchronized void setGroomServerManager(
+ GroomServerManager groomServerManager) {
+ this.groomServerManager = groomServerManager;
+ }
+
+ /**
+ * Lifecycle method to allow the scheduler to start any work in separate
+ * threads.
+ *
+ * @throws IOException
+ */
+ public void start() throws IOException {
+ // do nothing
+ }
+
+ /**
+ * Lifecycle method to allow the scheduler to stop any work it is doing.
+ *
+ * @throws IOException
+ */
+ public void terminate() throws IOException {
+ // do nothing
+ }
+
+ public abstract void addJob(JobInProgress job);
+
+ /**
+ * Returns a collection of jobs in an order which is specific to the
+ * particular scheduler.
+ *
+ * @param queueName
+ * @return
+ */
+ public abstract Collection getJobs();
+
+ /**
+ * Returns the tasks we'd like the GroomServer to execute right now.
+ *
+ * @param groomServer The GroomServer for which we're looking for tasks.
+ * @return A list of tasks to run on that GroomServer, possibly empty.
+ */
+ public abstract List assignTasks(GroomServerStatus groomStatus)
+ throws IOException;
+}
Index: src/java/org/apache/hama/bsp/TaskStatus.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskStatus.java (리비전 0)
+++ src/java/org/apache/hama/bsp/TaskStatus.java (리비전 0)
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+class TaskStatus implements Writable {
+ static final Log LOG = LogFactory.getLog(TaskStatus.class);
+
+ // enumeration for reporting current phase of a task.
+ public static enum Phase {
+ STARTING, COMPUTE, BARRIER_SYNC, CLEANUP
+ }
+
+ // what state is the task in?
+ public static enum State {
+ RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
+ }
+
+ private final TaskAttemptID taskId;
+ private float progress;
+ private volatile State runState;
+ private String stateString;
+ private String groomServer;
+
+ private long startTime;
+ private long finishTime;
+
+ private volatile Phase phase = Phase.STARTING;
+
+ /**
+ *
+ */
+ public TaskStatus() {
+ taskId = new TaskAttemptID();
+ }
+
+ public TaskStatus(TaskAttemptID taskId, float progress, State runState,
+ String stateString, String groomServer, Phase phase) {
+ this.taskId = taskId;
+ this.progress = progress;
+ this.runState = runState;
+ this.stateString = stateString;
+ this.groomServer = groomServer;
+ this.phase = phase;
+ }
+
+ // //////////////////////////////////////////////////
+ // Accessors and Modifiers
+ // //////////////////////////////////////////////////
+
+ public TaskAttemptID getTaskId() {
+ return taskId;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ public State getRunState() {
+ return runState;
+ }
+
+ public void setRunState(State state) {
+ this.runState = state;
+ }
+
+ public String getStateString() {
+ return stateString;
+ }
+
+ public void setStateString(String stateString) {
+ this.stateString = stateString;
+ }
+
+ public String getGroomServer() {
+ return groomServer;
+ }
+
+ public void setGroomServer(String groomServer) {
+ this.groomServer = groomServer;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ /**
+ * Get start time of the task.
+ *
+ * @return 0 is start time is not set, else returns start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Set startTime of the task.
+ *
+ * @param startTime start time
+ */
+ void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ /**
+ * Get current phase of this task.
+ *
+ * @return .
+ */
+ public Phase getPhase() {
+ return this.phase;
+ }
+
+ /**
+ * Set current phase of this task.
+ *
+ * @param phase phase of this task
+ */
+ void setPhase(Phase phase) {
+ this.phase = phase;
+ }
+
+ /**
+ * Update the status of the task.
+ *
+ * This update is done by ping thread before sending the status.
+ *
+ * @param progress
+ * @param state
+ * @param counters
+ */
+ synchronized void statusUpdate(float progress, String state) {
+ setProgress(progress);
+ setStateString(state);
+ }
+
+ /**
+ * Update the status of the task.
+ *
+ * @param status updated status
+ */
+ synchronized void statusUpdate(TaskStatus status) {
+ this.progress = status.getProgress();
+ this.runState = status.getRunState();
+ this.stateString = status.getStateString();
+
+ if (status.getStartTime() != 0) {
+ this.startTime = status.getStartTime();
+ }
+ if (status.getFinishTime() != 0) {
+ this.finishTime = status.getFinishTime();
+ }
+
+ this.phase = status.getPhase();
+ }
+
+ /**
+ * Update specific fields of task status
+ *
+ * This update is done in BSPMaster when a cleanup attempt of task reports its
+ * status. Then update only specific fields, not all.
+ *
+ * @param runState
+ * @param progress
+ * @param state
+ * @param phase
+ * @param finishTime
+ */
+ synchronized void statusUpdate(State runState, float progress, String state,
+ Phase phase, long finishTime) {
+ setRunState(runState);
+ setProgress(progress);
+ setStateString(state);
+ setPhase(phase);
+ if (finishTime != 0) {
+ this.finishTime = finishTime;
+ }
+ }
+
+ @Override
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException cnse) {
+ // Shouldn't happen since we do implement Clonable
+ throw new InternalError(cnse.toString());
+ }
+ }
+
+ // ////////////////////////////////////////////
+ // Writable
+ // ////////////////////////////////////////////
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.taskId.readFields(in);
+ this.progress = in.readFloat();
+ this.runState = WritableUtils.readEnum(in, State.class);
+ this.stateString = Text.readString(in);
+ this.phase = WritableUtils.readEnum(in, Phase.class);
+ this.startTime = in.readLong();
+ this.finishTime = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskId.write(out);
+ out.writeFloat(progress);
+ WritableUtils.writeEnum(out, runState);
+ Text.writeString(out, stateString);
+ WritableUtils.writeEnum(out, phase);
+ out.writeLong(startTime);
+ out.writeLong(finishTime);
+ }
+}
Index: src/java/org/apache/hama/bsp/Work.java
===================================================================
--- src/java/org/apache/hama/bsp/Work.java (리비전 0)
+++ src/java/org/apache/hama/bsp/Work.java (리비전 0)
@@ -0,0 +1,5 @@
+package org.apache.hama.bsp;
+
+public class Work {
+
+}
Index: src/java/org/apache/hama/graph/InputFormat.java
===================================================================
--- src/java/org/apache/hama/graph/InputFormat.java (리비전 943815)
+++ src/java/org/apache/hama/graph/InputFormat.java (작업 사본)
@@ -23,7 +23,7 @@
import java.util.List;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hama.bsp.JobContext;
+import org.apache.hama.bsp.BSPJobContext;
public abstract class InputFormat {
@@ -44,7 +44,7 @@
* @param context job configuration.
* @return an array of {@link InputSplit}s for the job.
*/
- public abstract List getSplits(JobContext context)
+ public abstract List getSplits(BSPJobContext context)
throws IOException, InterruptedException;
/**
Index: src/java/org/apache/hama/graph/InputSplit.java
===================================================================
--- src/java/org/apache/hama/graph/InputSplit.java (리비전 943815)
+++ src/java/org/apache/hama/graph/InputSplit.java (작업 사본)
@@ -24,8 +24,8 @@
import org.apache.hadoop.mapreduce.RecordReader;
/**
- * InputSplit represents the data to be processed by an
- * individual {@link Walker}.
+ * InputSplit represents the data to be processed by an individual
+ * {@link Walker}.
*
*
* Typically, it presents a byte-oriented view on the input and is the
Index: src/java/org/apache/hama/graph/OutputCommitter.java
===================================================================
--- src/java/org/apache/hama/graph/OutputCommitter.java (리비전 943815)
+++ src/java/org/apache/hama/graph/OutputCommitter.java (작업 사본)
@@ -20,12 +20,12 @@
import java.io.IOException;
-import org.apache.hama.bsp.JobContext;
+import org.apache.hama.bsp.BSPJobContext;
import org.apache.hama.bsp.TaskAttemptContext;
/**
*
- * @see JobContext
+ * @see BSPJobContext
* @see TaskAttemptContext
*
*/
@@ -36,7 +36,7 @@
* @param jobContext Context of the job whose output is being written.
* @throws IOException if temporary output could not be created
*/
- public abstract void setupJob(JobContext jobContext) throws IOException;
+ public abstract void setupJob(BSPJobContext jobContext) throws IOException;
/**
* For cleaning up the job's output after job completion
@@ -44,7 +44,7 @@
* @param jobContext Context of the job whose output is being written.
* @throws IOException
*/
- public abstract void cleanupJob(JobContext jobContext) throws IOException;
+ public abstract void cleanupJob(BSPJobContext jobContext) throws IOException;
/**
* Sets up output for the task.
Index: src/java/org/apache/hama/graph/OutputFormat.java
===================================================================
--- src/java/org/apache/hama/graph/OutputFormat.java (리비전 943815)
+++ src/java/org/apache/hama/graph/OutputFormat.java (작업 사본)
@@ -21,7 +21,7 @@
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hama.bsp.JobContext;
+import org.apache.hama.bsp.BSPJobContext;
import org.apache.hama.bsp.TaskAttemptContext;
/**
@@ -66,7 +66,7 @@
* @param context information about the job
* @throws IOException when output should not be attempted
*/
- public abstract void checkOutputSpecs(JobContext context) throws IOException,
+ public abstract void checkOutputSpecs(BSPJobContext context) throws IOException,
InterruptedException;
/**
Index: src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java
===================================================================
--- src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java (리비전 943815)
+++ src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java (작업 사본)
@@ -1,6 +1,4 @@
/**
- * Copyright 2008 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Index: src/java/org/apache/hama/ipc/HeartbeatResponse.java
===================================================================
--- src/java/org/apache/hama/ipc/HeartbeatResponse.java (리비전 943815)
+++ src/java/org/apache/hama/ipc/HeartbeatResponse.java (작업 사본)
@@ -1,69 +0,0 @@
-/**
- * Copyright 2008 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.ipc;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-
-public class HeartbeatResponse implements Writable, Configurable {
- private Configuration conf;
-
- private short responseId;
-
- public HeartbeatResponse() {
- }
-
- public HeartbeatResponse(short responseId) {
- this.responseId = responseId;
- }
-
- public void setResponseId(short responseId) {
- this.responseId = responseId;
- }
-
- public short getResponseId() {
- return responseId;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.responseId = in.readShort();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeShort(this.responseId);
- }
-
- @Override
- public Configuration getConf() {
- return this.conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-}
Index: src/java/org/apache/hama/ipc/InterTrackerProtocol.java
===================================================================
--- src/java/org/apache/hama/ipc/InterTrackerProtocol.java (리비전 943815)
+++ src/java/org/apache/hama/ipc/InterTrackerProtocol.java (작업 사본)
@@ -1,6 +1,4 @@
/**
- * Copyright 2008 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,8 +17,15 @@
*/
package org.apache.hama.ipc;
+import java.io.IOException;
+
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.HeartbeatResponse;
+
public interface InterTrackerProtocol extends HamaRPCProtocolVersion {
- public HeartbeatResponse heartbeat(short responseId);
+ public HeartbeatResponse heartbeat(GroomServerStatus status,
+ boolean restarted, boolean initialContact, boolean acceptNewTasks,
+ short responseId) throws IOException;
public String getSystemDir();
}
Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
===================================================================
--- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 943815)
+++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본)
@@ -1,6 +1,4 @@
/**
- * Copyright 2008 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,15 +19,89 @@
import java.io.IOException;
-import org.apache.hama.bsp.JobID;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.JobProfile;
import org.apache.hama.bsp.JobStatus;
+import org.apache.hama.bsp.TaskAttemptID;
/**
- * Protocol that a Walker and the central Master use to communicate. This
+ * Protocol that a groom server and the central BSP Master use to communicate. This
* interface will contains several methods: submitJob, killJob, and killTask.
*/
public interface JobSubmissionProtocol extends HamaRPCProtocolVersion {
- public JobID getNewJobId() throws IOException;
+
+ /**
+ * Allocate a new id for the job.
+ * @return
+ * @throws IOException
+ */
+ public BSPJobID getNewJobId() throws IOException;
+
+
+ /**
+ * Submit a Job for execution. Returns the latest profile for
+ * that job.
+ * The job files should be submitted in system-dir/jobName.
+ *
+ * @param jobName
+ * @return
+ * @throws IOException
+ */
+ public JobStatus submitJob(BSPJobID jobName) throws IOException;
+
+ /**
+ * Get the current status of the cluster
+ * @param detailed if true then report groom names as well
+ * @return summary of the state of the cluster
+ */
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
+
+ /**
+ * Grab a handle to a job that is already known to the BSPMaster.
+ * @return Profile of the job, or null if not found.
+ */
+ public JobProfile getJobProfile(BSPJobID jobid) throws IOException;
+
+ /**
+ * Grab a handle to a job that is already known to the BSPMaster.
+ * @return Status of the job, or null if not found.
+ */
+ public JobStatus getJobStatus(BSPJobID jobid) throws IOException;
+
+ /**
+ * A BSP system always operates on a single filesystem. This
+ * function returns the fs name. ('local' if the localfs; 'addr:port'
+ * if dfs). The client can then copy files into the right locations
+ * prior to submitting the job.
+ */
+ public String getFilesystemName() throws IOException;
+
+ /**
+ * Get all the jobs submitted.
+ * @return array of JobStatus for the submitted jobs
+ */
+ public JobStatus[] getAllJobs() throws IOException;
- public JobStatus submitJob(JobID jobName) throws IOException;
+ /**
+ * Grab the bspmaster system directory path where job-specific files are to be placed.
+ *
+ * @return the system directory where job-specific files are to be placed.
+ */
+ public String getSystemDir();
+
+ /**
+ * Kill the indicated job
+ */
+ public void killJob(BSPJobID jobid) throws IOException;
+
+
+ /**
+ * Kill indicated task attempt.
+ * @param taskId the id of the task to kill.
+ * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
+ * it is just killed, w/o affecting job failure status.
+ */
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
+
}
Index: src/java/org/apache/hama/util/ClusterUtil.java
===================================================================
--- src/java/org/apache/hama/util/ClusterUtil.java (리비전 0)
+++ src/java/org/apache/hama/util/ClusterUtil.java (리비전 0)
@@ -0,0 +1,98 @@
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.GroomServer;
+
+public class ClusterUtil {
+ public static final Log LOG = LogFactory.getLog(ClusterUtil.class);
+
+ /**
+ * Datastructure to hold GroomServer Thread and GroomServer instance
+ */
+ public static class GroomServerThread extends Thread {
+ private final GroomServer groomServer;
+
+ public GroomServerThread(final GroomServer r, final int index) {
+ super(r, "GroomServer:" + index);
+ this.groomServer = r;
+ }
+
+ /** @return the groom server */
+ public GroomServer getGroomServer() {
+ return this.groomServer;
+ }
+
+ /**
+ * Block until the groom server has come online, indicating it is ready
+ * to be used.
+ */
+ public void waitForServerOnline() {
+ while (!groomServer.isRunning()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // continue waiting
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a {@link GroomServerThread}.
+ * Call 'start' on the returned thread to make it run.
+ * @param c Configuration to use.
+ * @param hrsc Class to create.
+ * @param index Used distingushing the object returned.
+ * @throws IOException
+ * @return Groom server added.
+ */
+ public static ClusterUtil.GroomServerThread createGroomServerThread(final Configuration c,
+ final Class extends GroomServer> hrsc, final int index)
+ throws IOException {
+ GroomServer server;
+ try {
+ server = hrsc.getConstructor(Configuration.class).newInstance(c);
+ } catch (Exception e) {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ }
+ return new ClusterUtil.GroomServerThread(server, index);
+ }
+
+ /**
+ * Start the cluster.
+ * @param m
+ * @param conf
+ * @param groomServers
+ * @return Address to use contacting master.
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public static String startup(final BSPMaster m,
+ final List groomservers, Configuration conf) throws IOException, InterruptedException {
+ if (m != null) {
+ m.start();
+ }
+
+ if (groomservers != null) {
+ for (ClusterUtil.GroomServerThread t: groomservers) {
+ t.start();
+ }
+ }
+
+ return m == null? null: BSPMaster.getAddress(conf).getHostName();
+ }
+
+ public static void shutdown(BSPMaster master,
+ List groomThreads, Configuration conf) {
+ LOG.debug("Shutting down HAMA Cluster");
+ // TODO:
+ }
+}
Index: src/java/org/apache/hama/util/VersionInfo.java
===================================================================
--- src/java/org/apache/hama/util/VersionInfo.java (리비전 0)
+++ src/java/org/apache/hama/util/VersionInfo.java (리비전 0)
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+public class VersionInfo {
+
+ public static void main(String[] args) {
+ System.out.println("Apache Hama - 0.2");
+ }
+}
Index: src/test/org/apache/hama/HamaCluster.java
===================================================================
--- src/test/org/apache/hama/HamaCluster.java (리비전 943815)
+++ src/test/org/apache/hama/HamaCluster.java (작업 사본)
@@ -19,18 +19,35 @@
*/
package org.apache.hama;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.bsp.LocalBSPCluster;
/**
* Forming up the miniDfs and miniHbase
*/
public abstract class HamaCluster extends HBaseClusterTestCase {
+ public static final Log LOG = LogFactory.getLog(HamaCluster.class);
protected final static HamaConfiguration conf = new HamaConfiguration();
- public void setUp() throws Exception {
+
+ protected void setUp() throws Exception {
super.setUp();
+
+ String[] args = new String[0];
+ StringUtils.startupShutdownMessage(LocalBSPCluster.class, args, LOG);
+ HamaConfiguration conf = new HamaConfiguration();
+ LocalBSPCluster cluster = new LocalBSPCluster(conf);
+ cluster.startup();
}
-
- public static HamaConfiguration getConf() {
+
+ protected static HamaConfiguration getConf() {
return conf;
}
+
+ protected void setMiniBSPCluster() {
+ // TODO Auto-generated method stub
+
+ }
}
Index: src/test/org/apache/hama/MiniBSPCluster.java
===================================================================
--- src/test/org/apache/hama/MiniBSPCluster.java (리비전 0)
+++ src/test/org/apache/hama/MiniBSPCluster.java (리비전 0)
@@ -0,0 +1,5 @@
+package org.apache.hama;
+
+public class MiniBSPCluster {
+
+}
Index: src/test/org/apache/hama/bsp/BSPPeerTest.java
===================================================================
--- src/test/org/apache/hama/bsp/BSPPeerTest.java (리비전 943815)
+++ src/test/org/apache/hama/bsp/BSPPeerTest.java (작업 사본)
@@ -30,6 +30,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -60,14 +61,14 @@
Stat s = null;
if (zk != null) {
try {
- s = zk.exists(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, false);
+ s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
} catch (Exception e) {
LOG.error(s);
}
if (s == null) {
try {
- zk.create(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+ zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
LOG.error(e);
@@ -155,9 +156,9 @@
BSPPeerThread thread;
for (int i = 0; i < NUM_PEER; i++) {
conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
- conf.set(BSPConstants.PEER_HOST, "localhost");
- conf.set(BSPConstants.PEER_PORT, String.valueOf(30000 + i));
- conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+ conf.set(Constants.PEER_HOST, "localhost");
+ conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+ conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
thread = new BSPPeerThread(conf);
list.add(thread);
}
@@ -178,25 +179,25 @@
Configuration conf = new Configuration();
BSPPeer peer = new BSPPeer(conf);
- System.out.println(peer.bindAddress+" = "+BSPConstants.DEFAULT_PEER_HOST);
- System.out.println(peer.bindPort+" = "+BSPConstants.DEFAULT_PEER_PORT);
- assertEquals(peer.bindAddress,BSPConstants.DEFAULT_PEER_HOST);
- assertEquals(peer.bindPort,BSPConstants.DEFAULT_PEER_PORT);
- assertEquals(peer.zookeeperAddr,BSPConstants.DEFAULT_ZOOKEEPER_SERVER_ADDR);
+ System.out.println(peer.bindAddress+" = "+Constants.DEFAULT_PEER_HOST);
+ System.out.println(peer.bindPort+" = "+Constants.DEFAULT_PEER_PORT);
+ assertEquals(peer.bindAddress,Constants.DEFAULT_PEER_HOST);
+ assertEquals(peer.bindPort,Constants.DEFAULT_PEER_PORT);
+ assertEquals(peer.zookeeperAddr,Constants.DEFAULT_ZOOKEEPER_SERVER_ADDR);
int peerPort;
int zkPort;
conf = new Configuration();
- conf.set(BSPConstants.PEER_HOST, "localhost");
+ conf.set(Constants.PEER_HOST, "localhost");
do{
peerPort = r.nextInt(Short.MAX_VALUE);
} while(peerPort == 0);
- conf.setInt(BSPConstants.PEER_PORT, peerPort);
+ conf.setInt(Constants.PEER_PORT, peerPort);
do{
zkPort = r.nextInt(Short.MAX_VALUE);
} while(zkPort == peerPort || zkPort == 0);
- conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:"+zkPort);
+ conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:"+zkPort);
peer = new BSPPeer(conf);
assertEquals(peer.bindAddress,"localhost");
assertEquals(peer.bindPort,peerPort);
Index: src/test/org/apache/hama/bsp/SerializePrinting.java
===================================================================
--- src/test/org/apache/hama/bsp/SerializePrinting.java (리비전 943815)
+++ src/test/org/apache/hama/bsp/SerializePrinting.java (작업 사본)
@@ -7,6 +7,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -37,14 +38,14 @@
Stat s = null;
if (zk != null) {
try {
- s = zk.exists(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, false);
+ s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
} catch (Exception e) {
LOG.error(s);
}
if (s == null) {
try {
- zk.create(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+ zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
LOG.error(e);
@@ -60,10 +61,10 @@
int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 };
for (int i = 0; i < NUM_PEER; i++) {
conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
- conf.set(BSPConstants.PEER_HOST, "localhost");
- conf.set(BSPConstants.PEER_PORT, String
+ conf.set(Constants.PEER_HOST, "localhost");
+ conf.set(Constants.PEER_PORT, String
.valueOf(30000 + randomSequence[i]));
- conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+ conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
thread = new BSPPeerThread(conf, randomSequence[i]);
System.out.println(randomSequence[i] + ", " + thread.getName());
list.add(thread);
Index: src/test/org/apache/hama/bsp/UserInterface.java
===================================================================
--- src/test/org/apache/hama/bsp/UserInterface.java (리비전 0)
+++ src/test/org/apache/hama/bsp/UserInterface.java (리비전 0)
@@ -0,0 +1,105 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class UserInterface extends HamaCluster implements Watcher {
+ private HamaConfiguration conf;
+ private String JOBNAME = "hama.test.bsp";
+ private Path INPUTPATH = new Path("/tmp/input");
+ private Path OUTPUTPATH = new Path("/tmp/output");
+
+ public UserInterface() {
+ this.conf = getConf();
+ }
+
+ public void testScenario() throws InterruptedException, IOException {
+ // BSP job configuration
+ BSPJobConf bsp = new BSPJobConf(conf);
+ // Set the job name
+ bsp.setJobName(JOBNAME);
+
+ // Set in/output path and formatter
+ bsp.setInputPath(conf, INPUTPATH);
+ bsp.setOutputPath(conf, OUTPUTPATH);
+ bsp.setInputFormat(MyInputFormat.class);
+ bsp.setOutputFormat(MyOutputFormat.class);
+
+ // Set the BSP code
+ bsp.setBSPCode(MyBSP.class);
+ bsp.submit();
+
+ //*******************
+ // assertion checking
+ assertEquals(bsp.getJobName(), JOBNAME);
+ //assertEquals(bsp.getInputPath(), INPUTPATH);
+ //assertEquals(bsp.getOutputPath(), OUTPUTPATH);
+ }
+
+ class MyBSP implements BSPInterface {
+ // TODO: implement some BSP example
+ }
+
+ class MyInputFormat extends InputFormat {
+
+ @Override
+ public RecordReader createRecordReader(InputSplit arg0,
+ TaskAttemptContext arg1) throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List getSplits(JobContext arg0) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ // TODO: implement some input Formatter
+ }
+
+ class MyOutputFormat extends OutputFormat {
+
+ @Override
+ public void checkOutputSpecs(JobContext arg0) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
+ throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext arg0)
+ throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ // TODO: implement some input Formatter
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
+}
Index: src/test/org/apache/hama/examples/TestBlockMatrixMapReduce.java
===================================================================
--- src/test/org/apache/hama/examples/TestBlockMatrixMapReduce.java (리비전 0)
+++ src/test/org/apache/hama/examples/TestBlockMatrixMapReduce.java (리비전 0)
@@ -0,0 +1,61 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hama.HamaCluster;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.log4j.Logger;
+import org.apache.hama.examples.MatrixMultiplication;
+
+public class TestBlockMatrixMapReduce extends HamaCluster {
+ static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
+ static final int SIZE = 32;
+
+ /** constructor */
+ public TestBlockMatrixMapReduce() {
+ super();
+ }
+
+ public void testBlockMatrixMapReduce() throws IOException,
+ ClassNotFoundException {
+ DenseMatrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
+ DenseMatrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
+
+ DenseMatrix c = MatrixMultiplication.mult(m1, m2, 16);
+
+ double[][] mem = new double[SIZE][SIZE];
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ for (int k = 0; k < SIZE; k++) {
+ mem[i][k] += m1.get(i, j) * m2.get(j, k);
+ }
+ }
+ }
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ double gap = (mem[i][j] - c.get(i, j));
+ assertTrue(gap < 0.000001 || gap < -0.000001);
+ }
+ }
+ }
+}
Index: src/test/org/apache/hama/examples/TestRandomMatrixMapReduce.java
===================================================================
--- src/test/org/apache/hama/examples/TestRandomMatrixMapReduce.java (리비전 0)
+++ src/test/org/apache/hama/examples/TestRandomMatrixMapReduce.java (리비전 0)
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hama.HamaCluster;
+import org.apache.hama.examples.RandomMatrix;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.matrix.SparseMatrix;
+import org.apache.log4j.Logger;
+
+public class TestRandomMatrixMapReduce extends HamaCluster {
+ static final Logger LOG = Logger.getLogger(TestRandomMatrixMapReduce.class);
+
+ public void testRandomMatrixMapReduce() throws IOException {
+ DenseMatrix rand = RandomMatrix.random_mapred(conf, 20, 20);
+ assertEquals(20, rand.getRows());
+ assertEquals(20, rand.getColumns());
+
+ for(int i = 0; i < 20; i++) {
+ for(int j = 0; j < 20; j++) {
+ assertTrue(rand.get(i, j) > 0);
+ }
+ }
+
+ rand.close();
+
+ SparseMatrix rand2 = RandomMatrix.random_mapred(conf, 20, 20, 30);
+ assertEquals(20, rand2.getRows());
+ assertEquals(20, rand2.getColumns());
+ boolean zeroAppear = false;
+ for(int i = 0; i < 20; i++) {
+ for(int j = 0; j < 20; j++) {
+ if(rand2.get(i, j) == 0.0)
+ zeroAppear = true;
+ }
+ }
+ assertTrue(zeroAppear);
+ rand2.close();
+ }
+}
Index: src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java
===================================================================
--- src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java (리비전 943815)
+++ src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java (작업 사본)
@@ -1,61 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hama.HamaCluster;
-import org.apache.hama.matrix.DenseMatrix;
-import org.apache.log4j.Logger;
-import org.apache.hama.examples.MatrixMultiplication;
-
-public class TestBlockMatrixMapReduce extends HamaCluster {
- static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
- static final int SIZE = 32;
-
- /** constructor */
- public TestBlockMatrixMapReduce() {
- super();
- }
-
- public void testBlockMatrixMapReduce() throws IOException,
- ClassNotFoundException {
- DenseMatrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
- DenseMatrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
-
- DenseMatrix c = MatrixMultiplication.mult(m1, m2, 16);
-
- double[][] mem = new double[SIZE][SIZE];
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- for (int k = 0; k < SIZE; k++) {
- mem[i][k] += m1.get(i, j) * m2.get(j, k);
- }
- }
- }
-
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- double gap = (mem[i][j] - c.get(i, j));
- assertTrue(gap < 0.000001 || gap < -0.000001);
- }
- }
- }
-}
Index: src/test/org/apache/hama/mapreduce/TestRandomMatrixMapReduce.java
===================================================================
--- src/test/org/apache/hama/mapreduce/TestRandomMatrixMapReduce.java (리비전 943815)
+++ src/test/org/apache/hama/mapreduce/TestRandomMatrixMapReduce.java (작업 사본)
@@ -1,59 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hama.HamaCluster;
-import org.apache.hama.examples.RandomMatrix;
-import org.apache.hama.matrix.DenseMatrix;
-import org.apache.hama.matrix.SparseMatrix;
-import org.apache.log4j.Logger;
-
-public class TestRandomMatrixMapReduce extends HamaCluster {
- static final Logger LOG = Logger.getLogger(TestRandomMatrixMapReduce.class);
-
- public void testRandomMatrixMapReduce() throws IOException {
- DenseMatrix rand = RandomMatrix.random_mapred(conf, 20, 20);
- assertEquals(20, rand.getRows());
- assertEquals(20, rand.getColumns());
-
- for(int i = 0; i < 20; i++) {
- for(int j = 0; j < 20; j++) {
- assertTrue(rand.get(i, j) > 0);
- }
- }
-
- rand.close();
-
- SparseMatrix rand2 = RandomMatrix.random_mapred(conf, 20, 20, 30);
- assertEquals(20, rand2.getRows());
- assertEquals(20, rand2.getColumns());
- boolean zeroAppear = false;
- for(int i = 0; i < 20; i++) {
- for(int j = 0; j < 20; j++) {
- if(rand2.get(i, j) == 0.0)
- zeroAppear = true;
- }
- }
- assertTrue(zeroAppear);
- rand2.close();
- }
-}