Index: bin/hama
===================================================================
--- bin/hama (리비전 1005310)
+++ bin/hama (작업 사본)
@@ -58,6 +58,7 @@
echo " bspmaster run the BSP Master node"
echo " groom run the Groom node"
echo " zookeeper run a Zookeeper server"
+ echo " job manipulate BSP jobs"
echo " jar run a jar file"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
@@ -162,6 +163,8 @@
BSP_OPTS="$BSP_OPTS $BSP_GROOMSERVER_OPTS"
elif [ "$COMMAND" = "zookeeper" ] ; then
CLASS='org.apache.hama.ZooKeeperRunner'
+elif [ "$COMMAND" = "job" ] ; then
+ CLASS='org.apache.hama.bsp.BSPJobClient'
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hama.util.RunJar
BSP_OPTS="$BSP_OPTS"
Index: src/java/org/apache/hama/bsp/BSPJobClient.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 1005362)
+++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본)
@@ -19,6 +19,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Collection;
import javax.security.auth.login.LoginException;
@@ -34,9 +35,12 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.ipc.JobSubmissionProtocol;
-public class BSPJobClient extends Configured {
+public class BSPJobClient extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
public static enum TaskStatusFilter {
@@ -177,6 +181,9 @@
init(conf);
}
+ public BSPJobClient() {
+ }
+
public void init(Configuration conf) throws IOException {
// it will be used to determine if the bspmaster is running on local or not.
String master = conf.get("bsp.master.address", "local");
@@ -211,6 +218,26 @@
return fs;
}
+ /**
+ * Gets the jobs that are submitted.
+ *
+ * @return array of {@link JobStatus} for the submitted jobs.
+ * @throws IOException
+ */
+ public JobStatus[] getAllJobs() throws IOException {
+ return jobSubmitClient.getAllJobs();
+ }
+
+ /**
+ * Gets the jobs that are not completed and not failed.
+ *
+ * @return array of {@link JobStatus} for the running/to-be-run jobs.
+ * @throws IOException
+ */
+ public JobStatus[] jobsToComplete() throws IOException {
+ return jobSubmitClient.jobsToComplete();
+ }
+
private UnixUserGroupInformation getUGI(Configuration conf)
throws IOException {
UnixUserGroupInformation ugi = null;
@@ -369,18 +396,171 @@
}
// TODO if error found, kill job
- //running.killJob();
+ // running.killJob();
jc.close();
}
/**
* Get status information about the BSP cluster
*
+ * @param detailed if true then get a detailed status including the
+ * groomserver names
+ *
+ * @return the status information about the BSP cluster as an object of
+ * {@link ClusterStatus}.
+ *
* @throws IOException
*/
- public ClusterStatus getClusterStatus() throws IOException {
- // TODO:
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+ return jobSubmitClient.getClusterStatus(detailed);
+ }
- return null;
+ @Override
+ public int run(String[] args) throws Exception {
+ int exitCode = -1;
+ if (args.length < 1) {
+ displayUsage("");
+ return exitCode;
+ }
+
+ // process arguments
+ String cmd = args[0];
+ boolean listJobs = false;
+ boolean listAllJobs = false;
+ boolean listActiveTrackers = false;
+
+ HamaConfiguration conf = new HamaConfiguration(getConf());
+ init(conf);
+
+ if ("-list".equals(cmd)) {
+ if (args.length != 1 && !(args.length == 2 && "all".equals(args[1]))) {
+ displayUsage(cmd);
+ return exitCode;
+ }
+ if (args.length == 2 && "all".equals(args[1])) {
+ listAllJobs = true;
+ } else {
+ listJobs = true;
+ }
+ } else if ("-list-active-grooms".equals(cmd)) {
+ if (args.length != 1) {
+ displayUsage(cmd);
+ return exitCode;
+ }
+ listActiveTrackers = true;
+ }
+
+ if (listJobs) {
+ listJobs();
+ exitCode = 0;
+ } else if (listAllJobs) {
+ listAllJobs();
+ exitCode = 0;
+ } else if (listActiveTrackers) {
+ listActiveTrackers();
+ exitCode = 0;
+ }
+
+ return 0;
}
+
+ /**
+ * Display usage of the command-line tool and terminate execution
+ */
+ private void displayUsage(String cmd) {
+ String prefix = "Usage: JobClient ";
+ String taskStates = "running, completed";
+ if ("-submit".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd + " ]");
+ } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd + " ]");
+ } else if ("-counter".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd
+ + " ]");
+ } else if ("-events".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd
+ + " <#-of-events>]");
+ } else if ("-history".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd + " ]");
+ } else if ("-list".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd + " [all]]");
+ } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd + " ]");
+ } else if ("-list-active-grooms".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd + "]");
+ } else if ("-list-attempt-ids".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd
+ + " ]. "
+ + "Valid values for are " + taskStates);
+ } else {
+ System.err.printf(prefix + " \n");
+ System.err.printf("\t[-submit ]\n");
+ System.err.printf("\t[-status ]\n");
+ System.err.printf("\t[-counter ]\n");
+ System.err.printf("\t[-kill ]\n");
+ System.err.printf("\t[-events <#-of-events>]\n");
+ System.err.printf("\t[-history ]\n");
+ System.err.printf("\t[-list [all]]\n");
+ System.err.printf("\t[-list-active-grooms]\n");
+ System.err.println("\t[-list-attempt-ids "
+ + "]\n");
+ System.err.printf("\t[-kill-task ]\n");
+ System.err.printf("\t[-fail-task ]\n\n");
+ }
+ }
+
+ /**
+ * Dump a list of currently running jobs
+ *
+ * @throws IOException
+ */
+ private void listJobs() throws IOException {
+ JobStatus[] jobs = jobsToComplete();
+ if (jobs == null)
+ jobs = new JobStatus[0];
+
+ System.out.printf("%d jobs currently running\n", jobs.length);
+ displayJobList(jobs);
+ }
+
+ /**
+ * Dump a list of all jobs submitted.
+ *
+ * @throws IOException
+ */
+ private void listAllJobs() throws IOException {
+ JobStatus[] jobs = getAllJobs();
+ if (jobs == null)
+ jobs = new JobStatus[0];
+ System.out.printf("%d jobs submitted\n", jobs.length);
+ System.out.printf("States are:\n\tRunning : 1\tSucceded : 2"
+ + "\tFailed : 3\tPrep : 4\n");
+ displayJobList(jobs);
+ }
+
+ void displayJobList(JobStatus[] jobs) {
+ System.out.printf("JobId\tState\tStartTime\tUserName\n");
+ for (JobStatus job : jobs) {
+ System.out.printf("%s\t%d\t%d\t%s\n", job.getJobID(), job.getRunState(),
+ job.getStartTime(), job.getUsername());
+ }
+ }
+
+ /**
+ * Display the list of active trackers
+ */
+ private void listActiveTrackers() throws IOException {
+ ClusterStatus c = jobSubmitClient.getClusterStatus(true);
+ Collection trackers = c.getActiveGroomNames();
+ for (String trackerName : trackers) {
+ System.out.println(trackerName);
+ }
+ }
+
+ /**
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new BSPJobClient(), args);
+ System.exit(res);
+ }
}
Index: src/java/org/apache/hama/bsp/BSPMaster.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 1005362)
+++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본)
@@ -571,6 +571,7 @@
@Override
public JobStatus[] getAllJobs() throws IOException {
+ LOG.debug("returns all jobs: " + jobs.size());
return getJobStatus(jobs.values(), false);
}
Index: src/java/org/apache/hama/bsp/ClusterStatus.java
===================================================================
--- src/java/org/apache/hama/bsp/ClusterStatus.java (리비전 1005362)
+++ src/java/org/apache/hama/bsp/ClusterStatus.java (작업 사본)
@@ -50,7 +50,7 @@
*
*
* Clients can query for the latest ClusterStatus, via
- * {@link BSPJobClient#getClusterStatus()}.
+ * {@link BSPJobClient#getClusterStatus(boolean)}.
*
* @see BSPMaster
*/
Index: src/java/org/apache/hama/bsp/JobStatus.java
===================================================================
--- src/java/org/apache/hama/bsp/JobStatus.java (리비전 1005362)
+++ src/java/org/apache/hama/bsp/JobStatus.java (작업 사본)
@@ -50,6 +50,7 @@
private int runState;
private long startTime;
private String schedulingInfo = "NA";
+ private String user;
public JobStatus() {
}
@@ -116,6 +117,20 @@
return startTime;
}
+ /**
+ * @param user The username of the job
+ */
+ synchronized void setUsername(String userName) {
+ this.user = userName;
+ }
+
+ /**
+ * @return the username of the job
+ */
+ public synchronized String getUsername() {
+ return this.user;
+ }
+
@Override
public Object clone() {
try {