Index: bin/hama =================================================================== --- bin/hama (리비전 1005310) +++ bin/hama (작업 사본) @@ -58,6 +58,7 @@ echo " bspmaster run the BSP Master node" echo " groom run the Groom node" echo " zookeeper run a Zookeeper server" + echo " job manipulate BSP jobs" echo " jar run a jar file" echo " or" echo " CLASSNAME run the class named CLASSNAME" @@ -162,6 +163,8 @@ BSP_OPTS="$BSP_OPTS $BSP_GROOMSERVER_OPTS" elif [ "$COMMAND" = "zookeeper" ] ; then CLASS='org.apache.hama.ZooKeeperRunner' +elif [ "$COMMAND" = "job" ] ; then + CLASS='org.apache.hama.bsp.BSPJobClient' elif [ "$COMMAND" = "jar" ] ; then CLASS=org.apache.hama.util.RunJar BSP_OPTS="$BSP_OPTS" Index: src/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 1005362) +++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본) @@ -19,6 +19,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Collection; import javax.security.auth.login.LoginException; @@ -34,9 +35,12 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hama.HamaConfiguration; import org.apache.hama.ipc.JobSubmissionProtocol; -public class BSPJobClient extends Configured { +public class BSPJobClient extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(BSPJobClient.class); public static enum TaskStatusFilter { @@ -177,6 +181,9 @@ init(conf); } + public BSPJobClient() { + } + public void init(Configuration conf) throws IOException { // it will be used to determine if the bspmaster is running on local or not. String master = conf.get("bsp.master.address", "local"); @@ -211,6 +218,26 @@ return fs; } + /** + * Gets the jobs that are submitted. + * + * @return array of {@link JobStatus} for the submitted jobs. + * @throws IOException + */ + public JobStatus[] getAllJobs() throws IOException { + return jobSubmitClient.getAllJobs(); + } + + /** + * Gets the jobs that are not completed and not failed. + * + * @return array of {@link JobStatus} for the running/to-be-run jobs. + * @throws IOException + */ + public JobStatus[] jobsToComplete() throws IOException { + return jobSubmitClient.jobsToComplete(); + } + private UnixUserGroupInformation getUGI(Configuration conf) throws IOException { UnixUserGroupInformation ugi = null; @@ -369,18 +396,171 @@ } // TODO if error found, kill job - //running.killJob(); + // running.killJob(); jc.close(); } /** * Get status information about the BSP cluster * + * @param detailed if true then get a detailed status including the + * groomserver names + * + * @return the status information about the BSP cluster as an object of + * {@link ClusterStatus}. + * * @throws IOException */ - public ClusterStatus getClusterStatus() throws IOException { - // TODO: + public ClusterStatus getClusterStatus(boolean detailed) throws IOException { + return jobSubmitClient.getClusterStatus(detailed); + } - return null; + @Override + public int run(String[] args) throws Exception { + int exitCode = -1; + if (args.length < 1) { + displayUsage(""); + return exitCode; + } + + // process arguments + String cmd = args[0]; + boolean listJobs = false; + boolean listAllJobs = false; + boolean listActiveTrackers = false; + + HamaConfiguration conf = new HamaConfiguration(getConf()); + init(conf); + + if ("-list".equals(cmd)) { + if (args.length != 1 && !(args.length == 2 && "all".equals(args[1]))) { + displayUsage(cmd); + return exitCode; + } + if (args.length == 2 && "all".equals(args[1])) { + listAllJobs = true; + } else { + listJobs = true; + } + } else if ("-list-active-grooms".equals(cmd)) { + if (args.length != 1) { + displayUsage(cmd); + return exitCode; + } + listActiveTrackers = true; + } + + if (listJobs) { + listJobs(); + exitCode = 0; + } else if (listAllJobs) { + listAllJobs(); + exitCode = 0; + } else if (listActiveTrackers) { + listActiveTrackers(); + exitCode = 0; + } + + return 0; } + + /** + * Display usage of the command-line tool and terminate execution + */ + private void displayUsage(String cmd) { + String prefix = "Usage: JobClient "; + String taskStates = "running, completed"; + if ("-submit".equals(cmd)) { + System.err.println(prefix + "[" + cmd + " ]"); + } else if ("-status".equals(cmd) || "-kill".equals(cmd)) { + System.err.println(prefix + "[" + cmd + " ]"); + } else if ("-counter".equals(cmd)) { + System.err.println(prefix + "[" + cmd + + " ]"); + } else if ("-events".equals(cmd)) { + System.err.println(prefix + "[" + cmd + + " <#-of-events>]"); + } else if ("-history".equals(cmd)) { + System.err.println(prefix + "[" + cmd + " ]"); + } else if ("-list".equals(cmd)) { + System.err.println(prefix + "[" + cmd + " [all]]"); + } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) { + System.err.println(prefix + "[" + cmd + " ]"); + } else if ("-list-active-grooms".equals(cmd)) { + System.err.println(prefix + "[" + cmd + "]"); + } else if ("-list-attempt-ids".equals(cmd)) { + System.err.println(prefix + "[" + cmd + + " ]. " + + "Valid values for are " + taskStates); + } else { + System.err.printf(prefix + " \n"); + System.err.printf("\t[-submit ]\n"); + System.err.printf("\t[-status ]\n"); + System.err.printf("\t[-counter ]\n"); + System.err.printf("\t[-kill ]\n"); + System.err.printf("\t[-events <#-of-events>]\n"); + System.err.printf("\t[-history ]\n"); + System.err.printf("\t[-list [all]]\n"); + System.err.printf("\t[-list-active-grooms]\n"); + System.err.println("\t[-list-attempt-ids " + + "]\n"); + System.err.printf("\t[-kill-task ]\n"); + System.err.printf("\t[-fail-task ]\n\n"); + } + } + + /** + * Dump a list of currently running jobs + * + * @throws IOException + */ + private void listJobs() throws IOException { + JobStatus[] jobs = jobsToComplete(); + if (jobs == null) + jobs = new JobStatus[0]; + + System.out.printf("%d jobs currently running\n", jobs.length); + displayJobList(jobs); + } + + /** + * Dump a list of all jobs submitted. + * + * @throws IOException + */ + private void listAllJobs() throws IOException { + JobStatus[] jobs = getAllJobs(); + if (jobs == null) + jobs = new JobStatus[0]; + System.out.printf("%d jobs submitted\n", jobs.length); + System.out.printf("States are:\n\tRunning : 1\tSucceded : 2" + + "\tFailed : 3\tPrep : 4\n"); + displayJobList(jobs); + } + + void displayJobList(JobStatus[] jobs) { + System.out.printf("JobId\tState\tStartTime\tUserName\n"); + for (JobStatus job : jobs) { + System.out.printf("%s\t%d\t%d\t%s\n", job.getJobID(), job.getRunState(), + job.getStartTime(), job.getUsername()); + } + } + + /** + * Display the list of active trackers + */ + private void listActiveTrackers() throws IOException { + ClusterStatus c = jobSubmitClient.getClusterStatus(true); + Collection trackers = c.getActiveGroomNames(); + for (String trackerName : trackers) { + System.out.println(trackerName); + } + } + + /** + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new BSPJobClient(), args); + System.exit(res); + } } Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 1005362) +++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본) @@ -571,6 +571,7 @@ @Override public JobStatus[] getAllJobs() throws IOException { + LOG.debug("returns all jobs: " + jobs.size()); return getJobStatus(jobs.values(), false); } Index: src/java/org/apache/hama/bsp/ClusterStatus.java =================================================================== --- src/java/org/apache/hama/bsp/ClusterStatus.java (리비전 1005362) +++ src/java/org/apache/hama/bsp/ClusterStatus.java (작업 사본) @@ -50,7 +50,7 @@ *

* *

Clients can query for the latest ClusterStatus, via - * {@link BSPJobClient#getClusterStatus()}.

+ * {@link BSPJobClient#getClusterStatus(boolean)}.

* * @see BSPMaster */ Index: src/java/org/apache/hama/bsp/JobStatus.java =================================================================== --- src/java/org/apache/hama/bsp/JobStatus.java (리비전 1005362) +++ src/java/org/apache/hama/bsp/JobStatus.java (작업 사본) @@ -50,6 +50,7 @@ private int runState; private long startTime; private String schedulingInfo = "NA"; + private String user; public JobStatus() { } @@ -116,6 +117,20 @@ return startTime; } + /** + * @param user The username of the job + */ + synchronized void setUsername(String userName) { + this.user = userName; + } + + /** + * @return the username of the job + */ + public synchronized String getUsername() { + return this.user; + } + @Override public Object clone() { try {