Index: conf/hama-default.xml =================================================================== --- conf/hama-default.xml (리비전 943815) +++ conf/hama-default.xml (작업 사본) @@ -23,27 +23,27 @@ --> - hama.master.port + bsp.master.port 40000 The port master should bind to. - hama.groom.port + bsp.groom.port 40020 The port an groom server binds to. - bspd.system.dir + bsp.local.dir + ${hadoop.tmp.dir}/bsp/local + local directory for temporal store + + + + bsp.system.dir ${hadoop.tmp.dir}/bsp/system The shared directory where BSP stores control files. - - - bspd.groom.local.dir - ${hadoop.tmp.dir}/groomserver/local - local directory for temporal store - \ No newline at end of file Index: src/java/org/apache/hama/Constants.java =================================================================== --- src/java/org/apache/hama/Constants.java (리비전 943815) +++ src/java/org/apache/hama/Constants.java (작업 사본) @@ -25,8 +25,46 @@ /** * Some constants used in the Hama */ -public class Constants { +public interface Constants { + /** default host address */ + public static final String PEER_HOST = "bsp.peer.hostname"; + /** default host address */ + public static final String DEFAULT_PEER_HOST = "0.0.0.0"; + + public static final String PEER_PORT = "bsp.peer.port"; + /** Default port region server listens on. */ + public static final int DEFAULT_PEER_PORT = 61000; + + public static final long ATLEAST_WAIT_TIME = 100; + + /** zookeeper root */ + public static final String ZOOKEEPER_ROOT = "bsp.zookeeper.root"; + /** zookeeper default root */ + public static final String DEFAULT_ZOOKEEPER_ROOT = "/bsp"; + + /** zookeeper server address */ + public static final String ZOOKEEPER_SERVER_ADDRS = "zookeeper.server"; + /** zookeeper default server address */ + static final String DEFAULT_ZOOKEEPER_SERVER_ADDR = "localhost:21810"; + /** Parameter name for number of times to retry writes to ZooKeeper. */ + public static final String ZOOKEEPER_RETRIES = "zookeeper.retries"; + /** Default number of times to retry writes to ZooKeeper. */ + public static final int DEFAULT_ZOOKEEPER_RETRIES = 5; + /** Parameter name for ZooKeeper pause between retries. In milliseconds. */ + public static final String ZOOKEEPER_PAUSE = "zookeeper.pause"; + /** Default ZooKeeper pause value. In milliseconds. */ + public static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000; + + /** + * An empty instance. + */ + public static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + + /** + * Hbase Structure for matrices + */ + /** Meta-columnFamily to store the matrix-info */ public final static String METADATA = "metadata"; Index: src/java/org/apache/hama/bsp/BSPConstants.java =================================================================== --- src/java/org/apache/hama/bsp/BSPConstants.java (리비전 943815) +++ src/java/org/apache/hama/bsp/BSPConstants.java (작업 사본) @@ -1,58 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.bsp; - -public interface BSPConstants { - - /** default host address */ - static final String PEER_HOST = "bsp.peer.hostname"; - /** default host address */ - static final String DEFAULT_PEER_HOST = "0.0.0.0"; - - static final String PEER_PORT = "bsp.peer.port"; - /** Default port region server listens on. */ - static final int DEFAULT_PEER_PORT = 61000; - - static final long ATLEAST_WAIT_TIME = 100; - - /** zookeeper root */ - static final String ZOOKEEPER_ROOT = "bsp.zookeeper.root"; - /** zookeeper default root */ - static final String DEFAULT_ZOOKEEPER_ROOT = "/bsp"; - - /** zookeeper server address */ - static final String ZOOKEEPER_SERVER_ADDRS = "zookeeper.server"; - /** zookeeper default server address */ - static final String DEFAULT_ZOOKEEPER_SERVER_ADDR = "localhost:21810"; - /** Parameter name for number of times to retry writes to ZooKeeper. */ - static final String ZOOKEEPER_RETRIES = "zookeeper.retries"; - /** Default number of times to retry writes to ZooKeeper. */ - static final int DEFAULT_ZOOKEEPER_RETRIES = 5; - /** Parameter name for ZooKeeper pause between retries. In milliseconds. */ - static final String ZOOKEEPER_PAUSE = "zookeeper.pause"; - /** Default ZooKeeper pause value. In milliseconds. */ - static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000; - - /** - * An empty instance. - */ - static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; -} Index: src/java/org/apache/hama/bsp/BSPInterface.java =================================================================== --- src/java/org/apache/hama/bsp/BSPInterface.java (리비전 0) +++ src/java/org/apache/hama/bsp/BSPInterface.java (리비전 0) @@ -0,0 +1,5 @@ +package org.apache.hama.bsp; + +public interface BSPInterface { + +} Index: src/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 0) +++ src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 0) @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetSocketAddress; + +import javax.security.auth.login.LoginException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.bsp.BSPMaster; +import org.apache.hama.ipc.JobSubmissionProtocol; + +public class BSPJobClient extends Configured { + private static final Log LOG = LogFactory.getLog(BSPJobClient.class); + public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } + private static final long MAX_JOBPROFILE_AGE = 1000 * 2; + + static { + Configuration.addDefaultResource("hama-default.xml"); + Configuration.addDefaultResource("hama-site.xml"); + } + + class NetworkedJob implements RunningJob { + JobProfile profile; + JobStatus status; + long statustime; + + public NetworkedJob(JobStatus job) throws IOException { + this.status = job; + this.profile = jobSubmitClient.getJobProfile(job.getJobID()); + this.statustime = System.currentTimeMillis(); + } + + /** + * Some methods rely on having a recent job profile object. Refresh + * it, if necessary + */ + synchronized void ensureFreshStatus() throws IOException { + if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) { + updateStatus(); + } + } + + /** Some methods need to update status immediately. So, refresh + * immediately + * @throws IOException + */ + synchronized void updateStatus() throws IOException { + this.status = jobSubmitClient.getJobStatus(profile.getJobID()); + this.statustime = System.currentTimeMillis(); + } + + /* (non-Javadoc) + * @see org.apache.hama.bsp.RunningJob#getID() + */ + @Override + public BSPJobID getID() { + return profile.getJobID(); + } + + /* (non-Javadoc) + * @see org.apache.hama.bsp.RunningJob#getJobName() + */ + @Override + public String getJobName() { + return profile.getJobName(); + } + + /* (non-Javadoc) + * @see org.apache.hama.bsp.RunningJob#getJobFile() + */ + @Override + public String getJobFile() { + return profile.getJobFile(); + } + + @Override + public float progress() throws IOException { + ensureFreshStatus(); + return status.progress(); + } + + /** + * Returns immediately whether the whole job is done yet or not. + */ + public synchronized boolean isComplete() throws IOException { + updateStatus(); + return (status.getRunState() == JobStatus.SUCCEEDED || + status.getRunState() == JobStatus.FAILED || + status.getRunState() == JobStatus.KILLED); + } + + /** + * True iff job completed successfully. + */ + public synchronized boolean isSuccessful() throws IOException { + updateStatus(); + return status.getRunState() == JobStatus.SUCCEEDED; + } + + /** + * Blocks until the job is finished + */ + public void waitForCompletion() throws IOException { + while (!isComplete()) { + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + } + } + } + + /** + * Tells the service to get the state of the current job. + */ + public synchronized int getJobState() throws IOException { + updateStatus(); + return status.getRunState(); + } + + /** + * Tells the service to terminate the current job. + */ + public synchronized void killJob() throws IOException { + jobSubmitClient.killJob(getID()); + } + + @Override + public void killTask(TaskAttemptID taskId, boolean shouldFail) + throws IOException { + jobSubmitClient.killTask(taskId, shouldFail); + } + } + + private JobSubmissionProtocol jobSubmitClient = null; + private Path sysDir = null; + private FileSystem fs = null; + + // job files are world-wide readable and owner writable + final private static FsPermission JOB_FILE_PERMISSION = + FsPermission.createImmutable((short) 0644); // rw-r--r-- + + // job submission directory is world readable/writable/executable + final static FsPermission JOB_DIR_PERMISSION = + FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx + + public BSPJobClient() { + } + + public BSPJobClient(Configuration conf) throws IOException { + setConf(conf); + init(conf); + } + + public void init(Configuration conf) throws IOException { + // it will be used to determine if the bspmaster is running on local or not. + //String tracker = conf.get("bsp.master.address", "local"); + this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); + } + + private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, + Configuration conf) throws IOException { + return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory( + conf, JobSubmissionProtocol.class)); + } + + /** + * Close the JobClient. + */ + public synchronized void close() throws IOException { + RPC.stopProxy(jobSubmitClient); + } + + /** + * Get a filesystem handle. We need this to prepare jobs + * for submission to the BSP system. + * + * @return the filesystem handle. + */ + public synchronized FileSystem getFs() throws IOException { + if (this.fs == null) { + Path sysDir = getSystemDir(); + this.fs = sysDir.getFileSystem(getConf()); + } + return fs; + } + + /* see if two file systems are the same or not + * + */ /* + private boolean compareFs(FileSystem srcFs, FileSystem destFs) { + URI srcUri = srcFs.getUri(); + URI dstUri = destFs.getUri(); + if (srcUri.getScheme() == null) { + return false; + } + if (!srcUri.getScheme().equals(dstUri.getScheme())) { + return false; + } + String srcHost = srcUri.getHost(); + String dstHost = dstUri.getHost(); + if ((srcHost != null) && (dstHost != null)) { + try { + srcHost = InetAddress.getByName(srcHost).getCanonicalHostName(); + dstHost = InetAddress.getByName(dstHost).getCanonicalHostName(); + } catch(UnknownHostException ue) { + return false; + } + if (!srcHost.equals(dstHost)) { + return false; + } + } + else if (srcHost == null && dstHost != null) { + return false; + } + else if (srcHost != null && dstHost == null) { + return false; + } + //check for ports + if (srcUri.getPort() != dstUri.getPort()) { + return false; + } + return true; + } */ + + //copies a file to the bspmaster filesystem and returns the path where it + // was copied to + /* + private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath, + BSPJobContext job, short replication) throws IOException { + //check if we do not need to copy the files + // is jt using the same file system. + // just checking for uri strings... doing no dns lookups + // to see if the filesystems are the same. This is not optimal. + // but avoids name resolution. + + FileSystem remoteFs = null; + remoteFs = originalPath.getFileSystem(job.getConf()); + if (compareFs(remoteFs, jtFs)) { + return originalPath; + } + // this might have name collisions. copy will throw an exception + //parse the original path to create new path + Path newPath = new Path(parentDir, originalPath.getName()); + FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job.getConf()); + jtFs.setReplication(newPath, replication); + return newPath; + }*/ + + private UnixUserGroupInformation getUGI(Configuration conf) throws IOException { + UnixUserGroupInformation ugi = null; + try { + ugi = UnixUserGroupInformation.login(conf, true); + } catch (LoginException e) { + throw (IOException)(new IOException( + "Failed to get the current user's information.").initCause(e)); + } + return ugi; + } + + /** + * Submit a job to the BSP system. + * This returns a handle to the {@link RunningJob} which can be used to track + * the running-job. + * + * @param job the job configuration. + * @return a handle to the {@link RunningJob} which can be used to track the + * running-job. + * @throws FileNotFoundException + * @throws IOException + */ + public RunningJob submitJob(BSPJobConf job) throws FileNotFoundException, + IOException { + return submitJobInternal(job); + } + + public + RunningJob submitJobInternal(BSPJobConf job) throws IOException { + BSPJobID jobId = jobSubmitClient.getNewJobId(); + Path submitJobDir = new Path(getSystemDir(), jobId.toString()); + Path submitJarFile = new Path(submitJobDir, "job.jar"); + Path submitJobFile = new Path(submitJobDir, "job.xml"); + + /* + * set this user's id in job configuration, so later job files can be + * accessed using this user's id + */ + UnixUserGroupInformation ugi = getUGI(job.getConf()); + + // Create a number of filenames in the BSPMaster's fs namespace + FileSystem fs = getFs(); + LOG.debug("default FileSystem: " + fs.getUri()); + fs.delete(submitJobDir, true); + submitJobDir = fs.makeQualified(submitJobDir); + submitJobDir = new Path(submitJobDir.toUri().getPath()); + FsPermission bspSysPerms = new FsPermission(JOB_DIR_PERMISSION); + FileSystem.mkdirs(fs, submitJobDir, bspSysPerms); + short replication = (short)job.getInt("bsp.submit.replication", 10); + + String originalJarPath = job.getJar(); + + if (originalJarPath != null) { // copy jar to BSPMaster's fs + // use jar name if job is not named. + if ("".equals(job.getJobName())){ + job.setJobName(new Path(originalJarPath).getName()); + } + job.setJar(submitJarFile.toString()); + fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile); + fs.setReplication(submitJarFile, replication); + fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION)); + } else { + LOG.warn("No job jar file set. User classes may not be found. "+ + "See BSPJobConf#setJar(String) or check Your jar file."); + } + + // Set the user's name and working directory + job.setUser(ugi.getUserName()); + if (ugi.getGroupNames().length > 0) { + job.set("group.name", ugi.getGroupNames()[0]); + } + if (job.getWorkingDirectory() == null) { + job.setWorkingDirectory(fs.getWorkingDirectory()); + } + + // Write job file to BSPMaster's fs + FSDataOutputStream out = + FileSystem.create(fs, submitJobFile, + new FsPermission(JOB_FILE_PERMISSION)); + + try { + job.writeXml(out); + } finally { + out.close(); + } + + // + // Now, actually submit the job (using the submit name) + // + JobStatus status = jobSubmitClient.submitJob(jobId); + if (status != null) { + return new NetworkedJob(status); + } else { + throw new IOException("Could not launch job"); + } + } + + /** + * Monitor a job and print status in real-time as progress is made and tasks + * fail. + * @param conf the job's configuration + * @param job the job to track + * @return true if the job succeeded + * @throws IOException + * @throws IOException if communication to the BSPMaster fails + * @throws InterruptedException + */ + public boolean monitorAndPrintJob (BSPJobConf job, RunningJob info) + throws IOException, InterruptedException { + + String lastReport = null; + BSPJobID jobId = job.getJobID(); + LOG.info("Running job: " + jobId); + + while (!job.isComplete()) { + Thread.sleep(1000); + String report = " bsp " + StringUtils.formatPercent(job.progress(), 0); + + if (!report.equals(lastReport)) { + LOG.info(report); + lastReport = report; + } + } + + LOG.info("Job complete: " + jobId); + return job.isSuccessful(); + } + + /** + * Grab the bspmaster system directory path where job-specific files are to be placed. + * + * @return the system directory where job-specific files are to be placed. + */ + public Path getSystemDir() { + if (sysDir == null) { + sysDir = new Path(jobSubmitClient.getSystemDir()); + } + return sysDir; + } + + public RunningJob runJob(BSPJobConf job) throws FileNotFoundException, + IOException { + return submitJobInternal(job); + } +} Index: src/java/org/apache/hama/bsp/BSPJobConf.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobConf.java (리비전 0) +++ src/java/org/apache/hama/bsp/BSPJobConf.java (리비전 0) @@ -0,0 +1,186 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.Enumeration; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hama.HamaConfiguration; + +public class BSPJobConf extends BSPJobContext { + public static enum JobState { + DEFINE, RUNNING + }; + + private JobState state = JobState.DEFINE; + private BSPJobClient jobClient; + private RunningJob info; + + public BSPJobConf() throws IOException { + this(new HamaConfiguration()); + } + + public BSPJobConf(HamaConfiguration conf, String jobName) throws IOException { + this(conf); + setJobName(jobName); + } + + public BSPJobConf(HamaConfiguration conf) throws IOException { + super(conf, null); + jobClient = new BSPJobClient(conf); + } + + private void ensureState(JobState state) throws IllegalStateException { + if (state != this.state) { + throw new IllegalStateException("Job in state " + this.state + + " instead of " + state); + } + } + + public void setWorkingDirectory(Path dir) throws IOException { + ensureState(JobState.DEFINE); + dir = new Path(getWorkingDirectory(), dir); + conf.set("bsp.working.dir", dir.toString()); + } + + public void setWorkClass(Class cls) + throws IllegalStateException { + ensureState(JobState.DEFINE); + conf.setClass(WORK_CLASS_ATTR, cls, Work.class); + } + + public void setInputPath(HamaConfiguration conf, Path iNPUTPATH) { + // TODO Auto-generated method stub + + } + + public void setOutputPath(HamaConfiguration conf, Path oUTPUTPATH) { + // TODO Auto-generated method stub + + } + + public void setInputFormat(Class class1) { + // TODO Auto-generated method stub + + } + + public void setOutputFormat(Class class1) { + // TODO Auto-generated method stub + + } + + public void setBSPCode(Class class1) { + // TODO Auto-generated method stub + + } + + public String getInputPath() { + // TODO Auto-generated method stub + return null; + } + + public String getOutputPath() { + // TODO Auto-generated method stub + return null; + } + + public void setJar(String jar) { + conf.set("bsp.jar", jar); + } + + public void setJarByClass(Class cls) { + String jar = findContainingJar(cls); + if (jar != null) { + conf.set("bsp.jar", jar); + } + } + + public void setJobName(String name) throws IllegalStateException { + ensureState(JobState.DEFINE); + conf.set("bsp.job.name", name); + } + + public void setUser(String user) { + conf.set("user.name", user); + } + + @SuppressWarnings("unchecked") + private static String findContainingJar(Class my_class) { + ClassLoader loader = my_class.getClassLoader(); + String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; + try { + for (Enumeration itr = loader.getResources(class_file); itr + .hasMoreElements();) { + URL url = (URL) itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + + public float progress() throws IOException { + ensureState(JobState.RUNNING); + return info.progress(); + } + + public boolean isComplete() throws IOException { + ensureState(JobState.RUNNING); + return info.isComplete(); + } + + public boolean isSuccessful() throws IOException { + ensureState(JobState.RUNNING); + return info.isSuccessful(); + } + + public void killJob() throws IOException { + ensureState(JobState.RUNNING); + info.killJob(); + } + + public void killTask(TaskAttemptID taskId) throws IOException { + ensureState(JobState.RUNNING); + info.killTask(taskId, false); + } + + public void failTask(TaskAttemptID taskId) throws IOException { + ensureState(JobState.RUNNING); + info.killTask(taskId, true); + } + + public void submit() throws IOException, InterruptedException { + System.out.println("Submitted"); + ensureState(JobState.DEFINE); + info = jobClient.submitJobInternal(this); + state = JobState.RUNNING; + } + + public boolean waitForCompletion(boolean verbose) throws IOException, + InterruptedException, ClassNotFoundException { + if (state == JobState.DEFINE) { + submit(); + } + if (verbose) { + jobClient.monitorAndPrintJob(this, info); + } else { + info.waitForCompletion(); + } + return isSuccessful(); + } + + public void set(String name, String value) { + conf.set(name, value); + } +} Index: src/java/org/apache/hama/bsp/BSPJobContext.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobContext.java (리비전 0) +++ src/java/org/apache/hama/bsp/BSPJobContext.java (리비전 0) @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hama.HamaConfiguration; + +/** + * A read-only view of the bsp job that is provided to the tasks while they are + * running. + */ +public class BSPJobContext { + static{ + Configuration.addDefaultResource("hama-default.xml"); + Configuration.addDefaultResource("hama-site.xml"); + } + + // Put all of the attribute names in here so that BSPJob and JobContext are + // consistent. + protected static final String WORK_CLASS_ATTR = "bsp.work.class"; + + protected final Configuration conf; + private final BSPJobID jobId; + + public BSPJobContext(Configuration conf, BSPJobID jobId) { + this.conf = conf; + this.jobId = jobId; + } + + public BSPJobContext(Path config, BSPJobID jobId) throws IOException { + this.conf = new HamaConfiguration(); + this.jobId = jobId; + this.conf.addResource(config); + } + + public BSPJobID getJobID() { + return jobId; + } + + public Path getWorkingDirectory() throws IOException { + String name = conf.get("bsp.working.dir"); + + if (name != null) { + return new Path(name); + } else { + try { + Path dir = FileSystem.get(conf).getWorkingDirectory(); + conf.set("bsp.working.dir", dir.toString()); + return dir; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public String getJobName() { + return conf.get("bsp.job.name", ""); + } + + public String getJar() { + return conf.get("bsp.jar"); + } + + /** + * Constructs a local file name. Files are distributed among configured + * local directories. + */ + public Path getLocalPath(String pathString) throws IOException { + return conf.getLocalPath("bsp.local.dir", pathString); + } + + public String getUser() { + return conf.get("user.name"); + } + + public void writeXml(OutputStream out) throws IOException { + conf.writeXml(out); + } + + public Configuration getConf() { + return this.conf; + } + + public String get(String name) { + return conf.get(name); + } + + public int getInt(String name, int defaultValue) { + return conf.getInt(name, defaultValue); + } +} Index: src/java/org/apache/hama/bsp/BSPJobID.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobID.java (리비전 0) +++ src/java/org/apache/hama/bsp/BSPJobID.java (리비전 0) @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; + +import java.io.DataOutput; +import java.io.IOException; +import java.text.NumberFormat; + +import org.apache.hadoop.io.Text; + +public class BSPJobID extends ID implements Comparable { + protected static final String JOB = "job"; + private final Text jtIdentifier; + + protected static final NumberFormat idFormat = NumberFormat.getInstance(); + static { + idFormat.setGroupingUsed(false); + idFormat.setMinimumIntegerDigits(4); + } + + public BSPJobID(String jtIdentifier, int id) { + super(id); + this.jtIdentifier = new Text(jtIdentifier); + } + + public BSPJobID() { + jtIdentifier = new Text(); + } + + public String getJtIdentifier() { + return jtIdentifier.toString(); + } + + @Override + public boolean equals(Object o) { + if (!super.equals(o)) + return false; + + BSPJobID that = (BSPJobID) o; + return this.jtIdentifier.equals(that.jtIdentifier); + } + + @Override + public int compareTo(ID o) { + BSPJobID that = (BSPJobID) o; + int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier); + if (jtComp == 0) { + return this.id - that.id; + } else + return jtComp; + } + + public StringBuilder appendTo(StringBuilder builder) { + builder.append(SEPARATOR); + builder.append(jtIdentifier); + builder.append(SEPARATOR); + builder.append(idFormat.format(id)); + return builder; + } + + @Override + public int hashCode() { + return jtIdentifier.hashCode() + id; + } + + @Override + public String toString() { + return appendTo(new StringBuilder(JOB)).toString(); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.jtIdentifier.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + jtIdentifier.write(out); + } + + public static BSPJobID forName(String str) throws IllegalArgumentException { + if (str == null) + return null; + try { + String[] parts = str.split("_"); + if (parts.length == 3) { + if (parts[0].equals(JOB)) { + return new BSPJobID(parts[1], Integer.parseInt(parts[2])); + } + } + } catch (Exception ex) { + } + throw new IllegalArgumentException("JobId string : " + str + + " is not properly formed"); + } +} Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 943815) +++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본) @@ -1,6 +1,4 @@ /** - * Copyright 2009 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,9 +18,16 @@ package org.apache.hama.bsp; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,77 +39,121 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hama.HamaConfiguration; -import org.apache.hama.ipc.HeartbeatResponse; import org.apache.hama.ipc.InterTrackerProtocol; import org.apache.hama.ipc.JobSubmissionProtocol; /** - * BSPMaster is responsible to control all the bsp peers and to manage bsp jobs. + * BSPMaster is responsible to control all the groom servers and to manage bsp + * jobs. */ -public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol { - static{ +public class BSPMaster extends Thread implements JobSubmissionProtocol, InterTrackerProtocol, + GroomServerManager { + + static { Configuration.addDefaultResource("hama-default.xml"); + Configuration.addDefaultResource("hama-site.xml"); } - + public static final Log LOG = LogFactory.getLog(BSPMaster.class); - - private HamaConfiguration conf; - public static enum State { INITIALIZING, RUNNING } + + private Configuration conf; + + // Constants + public static enum State { + INITIALIZING, RUNNING + } + + private static final int FS_ACCESS_RETRY_PERIOD = 10000; + + // States State state = State.INITIALIZING; - + + // Attributes String masterIdentifier; + private Server interTrackerServer; - private Server interTrackerServer; - + // Filesystem + static final String SUBDIR = "bspMaster"; FileSystem fs = null; Path systemDir = null; - // system directories are world-wide readable and owner readable - final static FsPermission SYSTEM_DIR_PERMISSION = - FsPermission.createImmutable((short) 0733); // rwx-wx-wx + final static FsPermission SYSTEM_DIR_PERMISSION = FsPermission + .createImmutable((short) 0733); // rwx-wx-wx + // system files should have 700 permission + final static FsPermission SYSTEM_FILE_PERMISSION = FsPermission + .createImmutable((short) 0700); // rwx------ - // system files should have 700 permission - final static FsPermission SYSTEM_FILE_PERMISSION = - FsPermission.createImmutable((short) 0700); // rwx------ - - private static final int FS_ACCESS_RETRY_PERIOD = 10000; - + // Groom Servers + // (groom name --> last sent HeartBeatResponse) + Map groomToHeartbeatResponseMap = new TreeMap(); + private HashMap groomServers = new HashMap(); + + // Jobs' Meta Data private int nextJobId = 1; - - public BSPMaster(HamaConfiguration conf, String identifier) throws IOException, InterruptedException { + // private long startTime; + private int totalSubmissions = 0; + private int totalTasks = 0; + private int totalTaskCapacity; + private Map jobs = new TreeMap(); + private TaskScheduler taskScheduler; + + /* + * private final List jobInProgressListeners = new + * CopyOnWriteArrayList(); + */ + + /** + * Start the BSPMaster process, listen on the indicated hostname/port + */ + public BSPMaster(Configuration conf) throws IOException, InterruptedException { + this(conf, generateNewIdentifier()); + } + + BSPMaster(Configuration conf, String identifier) throws IOException, + InterruptedException { this.conf = conf; - this.masterIdentifier = identifier; - - InetSocketAddress addr = getAddress(conf); - this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), conf); - + + // Create the scheduler + Class schedulerClass = conf.getClass( + "bsp.master.taskscheduler", SimpleTaskScheduler.class, + TaskScheduler.class); + this.taskScheduler = (TaskScheduler) ReflectionUtils.newInstance( + schedulerClass, conf); + + InetSocketAddress addr = getAddress(conf); + this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr + .getPort(), conf); + while (!Thread.currentThread().isInterrupted()) { try { if (fs == null) { fs = FileSystem.get(conf); } - // clean up the system dir, which will only work if hdfs is out of + // clean up the system dir, which will only work if hdfs is out of // safe mode - if(systemDir == null) { - systemDir = new Path(getSystemDir()); + if (systemDir == null) { + systemDir = new Path(getSystemDir()); } LOG.info("Cleaning up the system directory"); + LOG.info(systemDir); fs.delete(systemDir, true); - if (FileSystem.mkdirs(fs, systemDir, - new FsPermission(SYSTEM_DIR_PERMISSION))) { + if (FileSystem.mkdirs(fs, systemDir, new FsPermission( + SYSTEM_DIR_PERMISSION))) { break; } LOG.error("Mkdirs failed to create " + systemDir); + LOG.info(SUBDIR); } catch (AccessControlException ace) { - LOG.warn("Failed to operate on bspd.system.dir (" + systemDir + LOG.warn("Failed to operate on bsp.system.dir (" + systemDir + ") because of permissions."); - LOG.warn("Manually delete the bspd.system.dir (" + systemDir - + ") and then start the JobTracker."); + LOG.warn("Manually delete the bsp.system.dir (" + systemDir + + ") and then start the BSPMaster."); LOG.warn("Bailing out ... "); throw ace; } catch (IOException ie) { @@ -112,37 +161,100 @@ } Thread.sleep(FS_ACCESS_RETRY_PERIOD); } - - // deleteLocalFiles(SUBDIR); + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + deleteLocalFiles(SUBDIR); } - - public static BSPMaster startMaster(HamaConfiguration conf) throws IOException, - InterruptedException { + + // ///////////////////////////////////////////////////// + // Accessors for objects that want info on jobs, tasks, + // grooms, etc. + // ///////////////////////////////////////////////////// + public GroomServerStatus getGroomServer(String groomID) { + synchronized (groomServers) { + return groomServers.get(groomID); + } + } + + public List groomServerNames() { + List activeGrooms = new ArrayList(); + synchronized (groomServers) { + for (GroomServerStatus status : groomServers.values()) { + activeGrooms.add(status.getGroomName()); + } + } + return activeGrooms; + } + + // ///////////////////////////////////////////////////////////// + // BSPMaster methods + // ///////////////////////////////////////////////////////////// + + // Get the job directory in system directory + Path getSystemDirectoryForJob(BSPJobID id) { + return new Path(getSystemDir(), id.toString()); + } + + String[] getLocalDirs() throws IOException { + return conf.getStrings("bsp.local.dir"); + } + + void deleteLocalFiles() throws IOException { + String[] localDirs = getLocalDirs(); + for (int i = 0; i < localDirs.length; i++) { + FileSystem.getLocal(conf).delete(new Path(localDirs[i]), true); + } + } + + void deleteLocalFiles(String subdir) throws IOException { + try { + String[] localDirs = getLocalDirs(); + for (int i = 0; i < localDirs.length; i++) { + FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir), true); + } + } catch (NullPointerException e) { + LOG.info(e); + } + } + + /** + * Constructs a local file name. Files are distributed among configured local + * directories. + */ + Path getLocalPath(String pathString) throws IOException { + return conf.getLocalPath("bsp.local.dir", pathString); + } + + public BSPMaster startMaster() throws IOException, + InterruptedException { return startTracker(conf, generateNewIdentifier()); } - - public static BSPMaster startTracker(HamaConfiguration conf, String identifier) - throws IOException, InterruptedException { - + + public static BSPMaster startTracker(Configuration conf, String identifier) + throws IOException, InterruptedException { + BSPMaster result = null; result = new BSPMaster(conf, identifier); - + return result; } - + public static InetSocketAddress getAddress(Configuration conf) { - String hamaMasterStr = conf.get("hama.master.address", "localhost:40000"); + String hamaMasterStr = conf.get("bsp.master.address", "localhost:40000"); return NetUtils.createSocketAddr(hamaMasterStr); } - + public int getPort() { - return this.conf.getInt("hama.master.port", 0); + return this.conf.getInt("bsp.master.port", 0); } - public HamaConfiguration getConfiguration() { + public Configuration getConf() { return this.conf; } - + private static SimpleDateFormat getDateFormat() { return new SimpleDateFormat("yyyyMMddHHmm"); } @@ -154,42 +266,115 @@ private static String generateNewIdentifier() { return getDateFormat().format(new Date()); } - + public void offerService() throws InterruptedException, IOException { this.interTrackerServer.start(); - + synchronized (this) { state = State.RUNNING; } LOG.info("Starting RUNNING"); - + this.interTrackerServer.join(); LOG.info("Stopped interTrackerServer"); } - - - public static void main(String [] args) { + + public static void main(String[] args) { StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG); if (args.length != 0) { System.out.println("usage: HamaMaster"); System.exit(-1); } - + try { HamaConfiguration conf = new HamaConfiguration(); - BSPMaster master = startMaster(conf); - master.offerService(); + conf.set("bsp.master.port", "40000"); + conf.set("bsp.groom.port", "40020"); + conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local"); + conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system"); + + BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf); + master.start(); } catch (Throwable e) { LOG.fatal(StringUtils.stringifyException(e)); System.exit(-1); } } + public void run() { + try { + offerService(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + // ////////////////////////////////////////////////// + // GroomServerManager + // ////////////////////////////////////////////////// @Override - public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + public void addJobInProgressListener(JobInProgressListener listener) { + // jobInProgressListeners.add(listener); + } + + @Override + public void removeJobInProgressListener(JobInProgressListener listener) { + // jobInProgressListeners.remove(listener); + } + + @Override + public void failJob(JobInProgress job) { + // TODO Auto-generated method stub + } + + @Override + public JobInProgress getJob(BSPJobID jobid) { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getNextHeartbeatInterval() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int getNumberOfUniqueHosts() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Collection grooms() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void initJob(JobInProgress job) { + if (null == job) { + LOG.info("Init on null job is not valid"); + return; + } + + // JobStatus prevStatus = (JobStatus)job.getStatus().clone(); + LOG.info("Initializing " + job.getJobID()); + } + + // ////////////////////////////////////////////////// + // InterTrackerProtocol + // ////////////////////////////////////////////////// + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { if (protocol.equals(InterTrackerProtocol.class.getName())) { return InterTrackerProtocol.versionID; - } else if (protocol.equals(JobSubmissionProtocol.class.getName())){ + } else if (protocol.equals(JobSubmissionProtocol.class.getName())) { return JobSubmissionProtocol.versionID; } else { throw new IOException("Unknown protocol to job tracker: " + protocol); @@ -198,34 +383,212 @@ /** * A RPC method for transmitting each peer status from peer to master. + * + * @throws IOException */ @Override - public HeartbeatResponse heartbeat(short responseId) { - LOG.debug(">>> return the heartbeat message."); - return new HeartbeatResponse((short)1); + public HeartbeatResponse heartbeat(GroomServerStatus status, + boolean restarted, boolean initialContact, boolean acceptNewTasks, + short responseId) throws IOException { + LOG.debug(">>> Received the heartbeat message from "); + LOG.debug(">>> " + status.groomName + "(" + status.getHost() + ")"); + LOG.debug(">>> restarted:" + restarted + ",first:" + initialContact); + LOG.debug(">>> maxTaskCapacity:" + status.getMaxTasks() + ",taskCapacity:" + + status.getTaskReports().size()); + + // First check if the last heartbeat response got through + String groomName = status.getGroomName(); + long now = System.currentTimeMillis(); + + HeartbeatResponse prevHeartbeatResponse = groomToHeartbeatResponseMap + .get(groomName); + + // Process this heartbeat + short newResponseId = (short) (responseId + 1); + status.setLastSeen(now); + if (!processHeartbeat(status, initialContact)) { + if (prevHeartbeatResponse != null) { + groomToHeartbeatResponseMap.remove(groomName); + } + return new HeartbeatResponse(newResponseId, + new GroomServerAction[] { new ReinitTrackerAction() }); + } + + HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); + // List actions = new ArrayList(); + + // Check for new tasks to be executed on the groom server + if (acceptNewTasks) { + GroomServerStatus groomStatus = getGroomServer(groomName); + if (groomStatus == null) { + LOG.warn("Unknown task tracker polling; ignoring: " + groomName); + } else { + // TODO - assignTasks should be implemented + /* + * List tasks = taskScheduler.assignTasks(groomStatus); for(Task + * task : tasks) { if(tasks != null) { LOG.debug(groomName + + * "-> LaunchTask: " + task.getTaskID()); actions.add(new + * LaunchTaskAction(task)); } } + */ + } + } + + return response; } - + /** + * Process incoming heartbeat messages from the groom. + */ + private synchronized boolean processHeartbeat(GroomServerStatus groomStatus, + boolean initialContact) { + String groomName = groomStatus.getGroomName(); + + synchronized (groomServers) { + GroomServerStatus oldStatus = groomServers.get(groomName); + if (oldStatus == null) { + groomServers.put(groomName, groomStatus); + } else { // TODO - to be improved to update status. + } + } + + return true; + } + + // ////////////////////////////////////////////////// + // JobSubmissionProtocol + // ////////////////////////////////////////////////// + /** + * This method returns new job id. The returned job id increases sequentially. + */ + @Override + public BSPJobID getNewJobId() throws IOException { + return new BSPJobID(this.masterIdentifier, nextJobId++); + } + + @Override + public JobStatus submitJob(BSPJobID jobId) throws IOException { + LOG.info("Submitted a job (" + jobId + ")"); + if (jobs.containsKey(jobId)) { + // job already running, don't start twice + LOG.info("The job (" + jobId + ") is already subbmitted"); + return jobs.get(jobId).getStatus(); + } + + JobInProgress job = new JobInProgress(jobId, this, this.conf); + + return addJob(jobId, job); + } + + @Override + public ClusterStatus getClusterStatus(boolean detailed) { + synchronized (groomServers) { + if (detailed) { + List groomNames = groomServerNames(); + return new ClusterStatus(groomNames, totalTasks, totalTaskCapacity, + state); + } else { + return new ClusterStatus(groomServers.size(), totalTasks, + totalTaskCapacity, state); + } + } + } + + /** + * Adds a job to the bsp master. Make sure that the checks are inplace before + * adding a job. This is the core job submission logic + * + * @param jobId The id for the job submitted which needs to be added + */ + private synchronized JobStatus addJob(BSPJobID jodId, JobInProgress job) { + totalSubmissions++; + synchronized (jobs) { + jobs.put(job.getProfile().getJobID(), job); + taskScheduler.addJob(job); + } + + return job.getStatus(); + } + + @Override + public JobStatus[] getAllJobs() throws IOException { + return null; + } + + @Override + public synchronized String getFilesystemName() throws IOException { + if (fs == null) { + throw new IllegalStateException("FileSystem object not available yet"); + } + return fs.getUri().toString(); + } + + /** * Return system directory to which BSP store control files. */ @Override public String getSystemDir() { - Path sysDir = new Path(conf.get("bspd.system.dir", "/tmp/hadoop/bsp/system")); + Path sysDir = new Path(conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system")); return fs.makeQualified(sysDir).toString(); } - - /** - * This method returns new job id. The returned job id increases sequentially. - */ @Override - public JobID getNewJobId() throws IOException { - return new JobID(this.masterIdentifier, nextJobId++); + public JobProfile getJobProfile(BSPJobID jobid) throws IOException { + synchronized (this) { + JobInProgress job = jobs.get(jobid); + if (job != null) { + return job.getProfile(); + } + } + return null; } @Override - public JobStatus submitJob(JobID jobName) throws IOException { - + public JobStatus getJobStatus(BSPJobID jobid) throws IOException { + synchronized (this) { + JobInProgress job = jobs.get(jobid); + if (job != null) { + return job.getStatus(); + } + } return null; } + + @Override + public void killJob(BSPJobID jobid) throws IOException { + JobInProgress job = jobs.get(jobid); + + if (null == job) { + LOG.info("killJob(): JobId " + jobid.toString() + " is not a valid job"); + return; + } + + killJob(job); + } + + private synchronized void killJob(JobInProgress job) { + LOG.info("Killing job " + job.getJobID()); + } + + @Override + public boolean killTask(TaskAttemptID taskId, boolean shouldFail) + throws IOException { + return false; + } + + public static BSPMaster constructMaster( + Class masterClass, final Configuration conf) { + try { + Constructor c = masterClass + .getConstructor(Configuration.class); + return c.newInstance(conf); + } catch (Exception e) { + throw new RuntimeException("Failed construction of " + "Master: " + + masterClass.toString() + + ((e.getCause() != null) ? e.getCause().getMessage() : ""), e); + } + } + + public void shutdown() { + this.interTrackerServer.stop(); + } } Index: src/java/org/apache/hama/bsp/BSPMessage.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMessage.java (리비전 943815) +++ src/java/org/apache/hama/bsp/BSPMessage.java (작업 사본) @@ -1,6 +1,4 @@ /** - * Copyright 2007 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information Index: src/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeer.java (리비전 943815) +++ src/java/org/apache/hama/bsp/BSPPeer.java (작업 사본) @@ -1,6 +1,4 @@ /** - * Copyright 2007 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -33,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hama.Constants; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -66,20 +65,23 @@ public BSPPeer(Configuration conf) throws IOException { this.conf = conf; - serverName = conf.get(PEER_HOST,DEFAULT_PEER_HOST) +":"+ conf.getInt(PEER_PORT, DEFAULT_PEER_PORT); - bindAddress = conf.get(PEER_HOST, DEFAULT_PEER_HOST); - bindPort = conf.getInt(PEER_PORT, DEFAULT_PEER_PORT); - bspRoot = conf.get(ZOOKEEPER_ROOT, DEFAULT_ZOOKEEPER_ROOT); - zookeeperAddr = conf.get(ZOOKEEPER_SERVER_ADDRS,"localhost:21810"); + serverName = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST) + + ":" + conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); + bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); + bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); + bspRoot = conf.get(Constants.ZOOKEEPER_ROOT, + Constants.DEFAULT_ZOOKEEPER_ROOT); + zookeeperAddr = conf.get(Constants.ZOOKEEPER_SERVER_ADDRS, + "localhost:21810"); reinitialize(); } public void reinitialize() { try { - System.out.println(bindAddress+":"+bindPort); + System.out.println(bindAddress + ":" + bindPort); server = RPC.getServer(this, bindAddress, bindPort, conf); - server.start(); + server.start(); } catch (IOException e) { e.printStackTrace(); } @@ -98,9 +100,8 @@ /* * (non-Javadoc) - * * @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress, - * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable) + * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable) */ @Override public void send(InetSocketAddress hostname, BSPMessage msg) @@ -116,7 +117,6 @@ /* * (non-Javadoc) - * * @see org.apache.hama.bsp.BSPPeerInterface#sync() */ @Override @@ -145,7 +145,7 @@ } } enterBarrier(); - Thread.sleep(ATLEAST_WAIT_TIME); // TODO - This is temporary work because + Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work because // it can be affected by network condition, // the number of peers, and the load of zookeeper. // It should fixed to some flawless way. Index: src/java/org/apache/hama/bsp/BSPPeerInterface.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeerInterface.java (리비전 943815) +++ src/java/org/apache/hama/bsp/BSPPeerInterface.java (작업 사본) @@ -1,6 +1,4 @@ /** - * Copyright 2007 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.bsp; import java.io.IOException; Index: src/java/org/apache/hama/bsp/ClusterStatus.java =================================================================== --- src/java/org/apache/hama/bsp/ClusterStatus.java (리비전 0) +++ src/java/org/apache/hama/bsp/ClusterStatus.java (리비전 0) @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * Status information on the current state of the BSP cluster. + * + *

ClusterStatus provides clients with information such as: + *

    + *
  1. + * Size of the cluster. + *
  2. + *
  3. + * Name of the grooms. + *
  4. + *
  5. + * Task capacity of the cluster. + *
  6. + *
  7. + * The number of currently running bsp tasks. + *
  8. + *
  9. + * State of the BSPMaster. + *
  10. + *

+ * + *

Clients can query for the latest ClusterStatus, via + * {@link BSPJobClient#getClusterStatus()}.

+ * + * @see BSPMaster + */ +public class ClusterStatus implements Writable { + + private int numActiveGrooms; + private Collection activeGrooms = new ArrayList(); + private int tasks; + private int maxTasks; + private BSPMaster.State state; + + /** + * + */ + ClusterStatus() {} + + ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) { + this.numActiveGrooms = grooms; + this.tasks = tasks; + this.maxTasks = maxTasks; + this.state = state; + } + + ClusterStatus(Collection activeGrooms, int tasks, int maxTasks, + BSPMaster.State state) { + this(activeGrooms.size(), tasks, maxTasks, state); + this.activeGrooms = activeGrooms; + } + + /** + * Get the number of groom servers in the cluster. + * + * @return the number of groom servers in the cluster. + */ + public int getGroomServers() { + return tasks; + } + + /** + * Get the names of groom servers in the cluster. + * + * @return the active groom servers in the cluster. + */ + public Collection getActiveGroomNames() { + return activeGrooms; + } + + /** + * Get the number of currently running tasks in the cluster. + * + * @return the number of currently running tasks in the cluster. + */ + public int getTasks() { + return tasks; + } + + /** + * Get the maximum capacity for running tasks in the cluster. + * + * @return the maximum capacity for running tasks in the cluster. + */ + public int getMaxTasks() { + return maxTasks; + } + + /** + * Get the current state of the BSPMaster, + * as {@link BSPMaster.State} + * + * @return the current state of the BSPMaster. + */ + public BSPMaster.State getBSPMasterState() { + return state; + } + + ////////////////////////////////////////////// + // Writable + ////////////////////////////////////////////// + @Override + public void write(DataOutput out) throws IOException { + if(activeGrooms.size() == 0) { + out.writeInt(numActiveGrooms); + out.writeInt(0); + } else { + out.writeInt(activeGrooms.size()); + out.writeInt(activeGrooms.size()); + for(String groom: activeGrooms) { + Text.writeString(out, groom); + } + } + out.writeInt(tasks); + out.writeInt(maxTasks); + WritableUtils.writeEnum(out, state); + } + + @Override + public void readFields(DataInput in) throws IOException { + numActiveGrooms = in.readInt(); + int numGroomNames = in.readInt(); + String name; + if (numGroomNames > 0) { + name = Text.readString(in); + activeGrooms.add(name); + } + tasks = in.readInt(); + maxTasks = in.readInt(); + state = WritableUtils.readEnum(in, BSPMaster.State.class); + } +} Index: src/java/org/apache/hama/bsp/CommitTaskAction.java =================================================================== --- src/java/org/apache/hama/bsp/CommitTaskAction.java (리비전 0) +++ src/java/org/apache/hama/bsp/CommitTaskAction.java (리비전 0) @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} + * to the {@link org.apache.hama.bsp.GroomServer} to commit the output + * of the task. + * + */ +class CommitTaskAction extends GroomServerAction { + private TaskAttemptID taskId; + + public CommitTaskAction() { + super(ActionType.COMMIT_TASK); + taskId = new TaskAttemptID(); + } + + public CommitTaskAction(TaskAttemptID taskId) { + super(ActionType.COMMIT_TASK); + this.taskId = taskId; + } + + public TaskAttemptID getTaskID() { + return taskId; + } + + public void write(DataOutput out) throws IOException { + taskId.write(out); + } + + public void readFields(DataInput in) throws IOException { + taskId.readFields(in); + } +} Index: src/java/org/apache/hama/bsp/DefaultBSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/DefaultBSPPeer.java (리비전 943815) +++ src/java/org/apache/hama/bsp/DefaultBSPPeer.java (작업 사본) @@ -1,6 +1,4 @@ /** - * Copyright 2007 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,15 +19,15 @@ import java.io.Closeable; import java.io.IOException; - import java.net.InetSocketAddress; +import org.apache.hama.Constants; import org.apache.zookeeper.KeeperException; /** * */ -public interface DefaultBSPPeer extends Closeable, BSPConstants { +public interface DefaultBSPPeer extends Closeable, Constants { /** * Send a data with a tag to another BSPSlave corresponding to hostname. Index: src/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServer.java (리비전 943815) +++ src/java/org/apache/hama/bsp/GroomServer.java (작업 사본) @@ -19,14 +19,20 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -35,71 +41,87 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hama.HamaConfiguration; -import org.apache.hama.ipc.HeartbeatResponse; import org.apache.hama.ipc.InterTrackerProtocol; public class GroomServer implements Runnable { public static final Log LOG = LogFactory.getLog(GroomServer.class); - + static { Configuration.addDefaultResource("hama-default.xml"); + Configuration.addDefaultResource("hama-site.xml"); } + Configuration conf; + // Constants static enum State { NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED }; - - HamaConfiguration conf; - + + // Running States and its related things volatile boolean running = true; volatile boolean shuttingDown = false; + boolean justStarted = true; boolean justInited = true; - - String groomserverName; - String localHostname; - - InetSocketAddress masterAddr; - InterTrackerProtocol jobClient; - //BSPPeer bspPeer; - + GroomServerStatus status = null; short heartbeatResponseId = -1; private volatile int heartbeatInterval = 3 * 1000; - - private LocalDirAllocator localDirAllocator; + + // Attributes + String groomServerName; + String localHostname; + InetSocketAddress bspMasterAddr; + InterTrackerProtocol jobClient; + + // Filesystem + //private LocalDirAllocator localDirAllocator; Path systemDirectory = null; FileSystem systemFS = null; + + // Job + boolean acceptNewTasks = true; + private int failures; + private int maxCurrentTasks; + Map tasks = new HashMap(); + /** Map from taskId -> TaskInProgress. */ + Map runningTasks = null; + Map runningJobs = null; - public GroomServer(HamaConfiguration conf) throws IOException { + public GroomServer(Configuration conf) throws IOException { this.conf = conf; - masterAddr = BSPMaster.getAddress(conf); + bspMasterAddr = BSPMaster.getAddress(conf); - FileSystem local = FileSystem.getLocal(conf); - this.localDirAllocator = new LocalDirAllocator("bspd.groom.local.dir"); - - initialize(); + //FileSystem local = FileSystem.getLocal(conf); + //this.localDirAllocator = new LocalDirAllocator("bsp.local.dir"); } - synchronized void initialize() throws IOException { + public synchronized void initialize() throws IOException { if (this.conf.get("slave.host.name") != null) { this.localHostname = conf.get("slave.host.name"); } if (localHostname == null) { this.localHostname = DNS.getDefaultHost(conf.get( - "bspd.groom.dns.interface", "default"), conf.get( - "bspd.groom.dns.nameserver", "default")); + "bsp.dns.interface", "default"), conf.get( + "bsp.dns.nameserver", "default")); } - checkLocalDirs(conf.getStrings("bspd.groom.local.dir")); + //check local disk + checkLocalDirs(conf.getStrings("bsp.local.dir")); deleteLocalFiles("groomserver"); - this.groomserverName = "groomd_" + localHostname; - LOG.info("Starting tracker " + this.groomserverName); + // Clear out state tables + this.tasks.clear(); + this.runningJobs = new TreeMap(); + this.runningTasks = new LinkedHashMap(); + this.acceptNewTasks = true; + + this.groomServerName = "groomd_" + localHostname; + LOG.info("Starting tracker " + this.groomServerName); DistributedCache.purgeCache(this.conf); this.jobClient = (InterTrackerProtocol) RPC.waitForProxy( - InterTrackerProtocol.class, InterTrackerProtocol.versionID, masterAddr, + InterTrackerProtocol.class, InterTrackerProtocol.versionID, bspMasterAddr, conf); this.running = true; // this.bspPeer = new BSPPeer(this.conf); @@ -109,9 +131,12 @@ throws DiskErrorException { boolean writable = false; + LOG.info(localDirs); + if (localDirs != null) { for (int i = 0; i < localDirs.length; i++) { try { + LOG.info(localDirs[i]); DiskChecker.checkDir(new File(localDirs[i])); writable = true; } catch (DiskErrorException e) { @@ -120,26 +145,30 @@ } } - if (!writable) - throw new DiskErrorException("all local directories are not writable"); + // if (!writable) + //throw new DiskErrorException("all local directories are not writable"); } public String[] getLocalDirs() { - return conf.getStrings("bspd.groom.local.dir"); + return conf.getStrings("bsp.local.dir"); } public void deleteLocalFiles() throws IOException { String[] localDirs = getLocalDirs(); for (int i = 0; i < localDirs.length; i++) { - FileSystem.getLocal(this.conf).delete(new Path(localDirs[i])); + FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]),true); } } public void deleteLocalFiles(String subdir) throws IOException { + try{ String[] localDirs = getLocalDirs(); for (int i = 0; i < localDirs.length; i++) { - FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir)); + FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),true); } + } catch (NullPointerException e) { + LOG.info(e); + } } public void cleanupStorage() throws IOException { @@ -172,14 +201,19 @@ systemFS = systemDirectory.getFileSystem(conf); } - // Send the heartbeat and process the jobtracker's directives + // Send the heartbeat and process the bspmaster's directives HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); - + // + // The heartbeat got through successfully! + // + heartbeatResponseId = heartbeatResponse.getResponseId(); + // Note the time when the heartbeat returned, use this to decide when to // send the // next heartbeat lastHeartbeat = System.currentTimeMillis(); - + + justStarted = false; justInited = false; } catch (InterruptedException ie) { LOG.info("Interrupted. Closing down."); @@ -203,13 +237,40 @@ } private HeartbeatResponse transmitHeartBeat(long now) throws IOException { + + // + // Check if the last heartbeat got through... + // if so then build the heartbeat information for the BSPMaster; + // else resend the previous status information. + // + if (status == null) { + synchronized (this) { + status = new GroomServerStatus(groomServerName, localHostname, + cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks); + } + } else { + LOG.info("Resending 'status' to '" + bspMasterAddr.getHostName() + + "' with reponseId '" + heartbeatResponseId+"'"); + } + + // TODO - Later, acceptNewTask is to be set by the status of groom server. HeartbeatResponse heartbeatResponse = jobClient - .heartbeat(heartbeatResponseId); + .heartbeat(status,justStarted,justInited,true,heartbeatResponseId); return heartbeatResponse; } + + private synchronized List cloneAndResetRunningTaskStatuses() { + List result = new ArrayList(runningTasks.size()); + for(TaskInProgress tip: runningTasks.values()) { + TaskStatus status = tip.getStatus(); + result.add((TaskStatus)status.clone()); + } + return result; + } public void run() { try { + initialize(); startCleanupThreads(); boolean denied = false; while (running && !shuttingDown && !denied) { @@ -225,7 +286,7 @@ } } catch (Exception e) { if (!shuttingDown) { - LOG.info("Lost connection to GraphProcessor [" + masterAddr + LOG.info("Lost connection to GraphProcessor [" + bspMasterAddr + "]. Retrying...", e); try { Thread.sleep(5000); @@ -245,7 +306,7 @@ initialize(); } } catch (IOException ioe) { - LOG.error("Got fatal exception while reinitializing TaskTracker: " + LOG.error("Got fatal exception while reinitializing GroomServer: " + StringUtils.stringifyException(ioe)); return; } @@ -273,11 +334,95 @@ } try { - HamaConfiguration conf = new HamaConfiguration(); - new GroomServer(conf).run(); + Configuration conf = new HamaConfiguration(); + conf.set("bsp.master.port", "40000"); + conf.set("bsp.groom.port", "40020"); + conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local"); + conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system"); + GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf); + startGroomServer(groom); } catch (Throwable e) { LOG.fatal(StringUtils.stringifyException(e)); System.exit(-1); } } + + public static Thread startGroomServer(final GroomServer hrs) { + return startGroomServer(hrs, + "regionserver" + hrs.groomServerName); + } + + public static Thread startGroomServer(final GroomServer hrs, + final String name) { + Thread t = new Thread(hrs); + t.setName(name); + t.start(); + return t; + } + + /////////////////////////////////////////////////////// + // TaskInProgress maintains all the info for a Task that + // lives at this GroomServer. It maintains the Task object, + // its TaskStatus, and the TaskRunner. + /////////////////////////////////////////////////////// + class TaskInProgress { + Task task; + volatile boolean done = false; + volatile boolean wasKilled = false; + private TaskStatus taskStatus; + + public TaskInProgress(Task task, BSPJobContext job) { + this.task = task; + } + + /** + */ + public Task getTask() { + return task; + } + + /** + */ + public synchronized TaskStatus getStatus() { + return taskStatus; + } + + /** + */ + public TaskStatus.State getRunState() { + return taskStatus.getRunState(); + } + + public boolean wasKilled() { + return wasKilled; + } + + @Override + public boolean equals(Object obj) { + return (obj instanceof TaskInProgress) && + task.getTaskID().equals + (((TaskInProgress) obj).getTask().getTaskID()); + } + + @Override + public int hashCode() { + return task.getTaskID().hashCode(); + } + } + + public boolean isRunning() { + return running; + } + + public static GroomServer constructGroomServer(Class groomServerClass, + final Configuration conf2) { + try { + Constructor c = + groomServerClass.getConstructor(Configuration.class); + return c.newInstance(conf2); + } catch (Exception e) { + throw new RuntimeException("Failed construction of " + + "Master: " + groomServerClass.toString(), e); + } + } } Index: src/java/org/apache/hama/bsp/GroomServerAction.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServerAction.java (리비전 0) +++ src/java/org/apache/hama/bsp/GroomServerAction.java (리비전 0) @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} + * to the {@link org.apache.hama.bsp.GroomServer} to take some 'action'. + * + */ +abstract class GroomServerAction implements Writable { + + /** + * Ennumeration of various 'actions' that the {@link BSPMaster} + * directs the {@link GroomServer} to perform periodically. + * + */ + public static enum ActionType { + /** Launch a new task. */ + LAUNCH_TASK, + + /** Kill a task. */ + KILL_TASK, + + /** Kill any tasks of this job and cleanup. */ + KILL_JOB, + + /** Reinitialize the groomserver. */ + REINIT_GROOM, + + /** Ask a task to save its output. */ + COMMIT_TASK + }; + + /** + * A factory-method to create objects of given {@link ActionType}. + * @param actionType the {@link ActionType} of object to create. + * @return an object of {@link ActionType}. + */ + public static GroomServerAction createAction(ActionType actionType) { + GroomServerAction action = null; + + switch (actionType) { + case LAUNCH_TASK: + { + action = new LaunchTaskAction(); + } + break; + case KILL_TASK: + { + action = new KillTaskAction(); + } + break; + case KILL_JOB: + { + action = new KillJobAction(); + } + break; + case REINIT_GROOM: + { + action = new ReinitTrackerAction(); + } + break; + case COMMIT_TASK: + { + action = new CommitTaskAction(); + } + break; + } + + return action; + } + + private ActionType actionType; + + protected GroomServerAction(ActionType actionType) { + this.actionType = actionType; + } + + /** + * Return the {@link ActionType}. + * @return the {@link ActionType}. + */ + ActionType getActionId() { + return actionType; + } + + public void write(DataOutput out) throws IOException { + WritableUtils.writeEnum(out, actionType); + } + + public void readFields(DataInput in) throws IOException { + actionType = WritableUtils.readEnum(in, ActionType.class); + } +} Index: src/java/org/apache/hama/bsp/GroomServerManager.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServerManager.java (리비전 0) +++ src/java/org/apache/hama/bsp/GroomServerManager.java (리비전 0) @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.Collection; + +/** + * Manages information about the {@link GroomServer}s running on a cluster. + * This interface exits primarily to test the {@link BSPMaster}, and is not + * intended to be implemented by users. + */ +interface GroomServerManager { + + /** + * @return A collection of the {@link GroomServerStatus} for the grooms + * being managed. + */ + public Collection grooms(); + + /** + * @return The number of unique hosts running grooms. + */ + public int getNumberOfUniqueHosts(); + + /** + * Get the current status of the cluster + * @param detailed if true then report groom names as well + * @return summary of the state of the cluster + */ + public ClusterStatus getClusterStatus(boolean detailed); + + /** + * Registers a {@link JobInProgressListener} for updates from this + * {@link GroomServerManager}. + * @param jobInProgressListener the {@link JobInProgressListener} to add + */ + public void addJobInProgressListener(JobInProgressListener listener); + + /** + * Unregisters a {@link JobInProgressListener} from this + * {@link GroomServerManager}. + * @param jobInProgressListener the {@link JobInProgressListener} to remove + */ + public void removeJobInProgressListener(JobInProgressListener listener); + + /** + * Return the current heartbeat interval that's used by {@link GroomServer}s. + * + * @return the heartbeat interval used by {@link GroomServer}s + */ + public int getNextHeartbeatInterval(); + + /** + * Kill the job identified by jobid + * + * @param jobid + * @throws IOException + */ + public void killJob(BSPJobID jobid) + throws IOException; + + /** + * Obtain the job object identified by jobid + * + * @param jobid + * @return jobInProgress object + */ + public JobInProgress getJob(BSPJobID jobid); + + /** + * Initialize the Job + * + * @param job JobInProgress object + */ + public void initJob(JobInProgress job); + + /** + * Fail a job. + * + * @param job JobInProgress object + */ + public void failJob(JobInProgress job); +} Index: src/java/org/apache/hama/bsp/GroomServerStatus.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServerStatus.java (리비전 0) +++ src/java/org/apache/hama/bsp/GroomServerStatus.java (리비전 0) @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * + */ +public class GroomServerStatus implements Writable { + + static { + WritableFactories.setFactory + (GroomServerStatus.class, + new WritableFactory() { + public Writable newInstance() { return new GroomServerStatus(); } + }); + } + + String groomName; + String host; + int failures; + List taskReports; + + volatile long lastSeen; + private int maxTasks; + + public GroomServerStatus() { + taskReports = new ArrayList(); + } + + public GroomServerStatus(String groomName, String host, + List taskReports, int failures, int maxTasks) { + this.groomName = groomName; + this.host = host; + + this.taskReports = new ArrayList(taskReports); + this.failures = failures; + this.maxTasks = maxTasks; + } + + public String getGroomName() { + return groomName; + } + + public String getHost() { + return host; + } + + /** + * Get the current tasks at the GroomServer. + * Tasks are tracked by a {@link TaskStatus} object. + * + * @return a list of {@link TaskStatus} representing + * the current tasks at the GroomServer. + */ + public List getTaskReports() { + return taskReports; + } + + public int getFailures() { + return failures; + } + + public long getLastSeen() { + return lastSeen; + } + + public void setLastSeen(long lastSeen) { + this.lastSeen = lastSeen; + } + + public int getMaxTasks() { + return maxTasks; + } + + /** + * Return the current MapTask count + */ + public int countTasks() { + int taskCount = 0; + for (Iterator it = taskReports.iterator(); it.hasNext();) { + TaskStatus ts = it.next(); + TaskStatus.State state = ts.getRunState(); + if(state == TaskStatus.State.RUNNING || + state == TaskStatus.State.UNASSIGNED) { + taskCount++; + } + } + + return taskCount; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ + @Override + public void readFields(DataInput in) throws IOException { + this.groomName = Text.readString(in); + this.host = Text.readString(in); + this.failures = in.readInt(); + this.maxTasks = in.readInt(); + taskReports.clear(); + int numTasks = in.readInt(); + + TaskStatus status; + for (int i = 0; i < numTasks; i++) { + status = new TaskStatus(); + status.readFields(in); + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, groomName); + Text.writeString(out, host); + out.writeInt(failures); + out.writeInt(maxTasks); + out.writeInt(taskReports.size()); + for(TaskStatus taskStatus : taskReports) { + taskStatus.write(out); + } + } + +} Index: src/java/org/apache/hama/bsp/HeartbeatResponse.java =================================================================== --- src/java/org/apache/hama/bsp/HeartbeatResponse.java (리비전 0) +++ src/java/org/apache/hama/bsp/HeartbeatResponse.java (리비전 0) @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +public class HeartbeatResponse implements Writable, Configurable { + private Configuration conf; + short responseId; + private GroomServerAction [] actions; + + public HeartbeatResponse() {} + + public HeartbeatResponse(short responseId, GroomServerAction [] actions) { + this.responseId = responseId; + this.actions = actions; + } + + public void setResponseId(short responseId) { + this.responseId = responseId; + } + + public short getResponseId() { + return responseId; + } + + public void setActions(GroomServerAction [] actions) { + this.actions = actions; + } + + public GroomServerAction [] getActions() { + return actions; + } + + @Override + public void readFields(DataInput in) throws IOException { + this.responseId = in.readShort(); + int length = WritableUtils.readVInt(in); + if (length > 0) { + actions = new GroomServerAction[length]; + for(int i=0; i< length; ++i) { + GroomServerAction.ActionType actionType = + WritableUtils.readEnum(in, GroomServerAction.ActionType.class); + actions[i] = GroomServerAction.createAction(actionType); + actions[i].readFields(in); + } + } else { + actions = null; + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeShort(this.responseId); + if(actions == null) { + WritableUtils.writeVInt(out, 0); + } else { + WritableUtils.writeVInt(out, actions.length); + for(GroomServerAction action: actions) { + WritableUtils.writeEnum(out, action.getActionId()); + action.write(out); + } + } + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } +} Index: src/java/org/apache/hama/bsp/JobChangeEvent.java =================================================================== --- src/java/org/apache/hama/bsp/JobChangeEvent.java (리비전 0) +++ src/java/org/apache/hama/bsp/JobChangeEvent.java (리비전 0) @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +/** + * {@link JobChangeEvent} is used to capture state changes in a job. A job can + * change its state w.r.t priority, progress, run-state etc. + */ +abstract class JobChangeEvent { + private JobInProgress jip; + + JobChangeEvent(JobInProgress jip) { + this.jip = jip; + } + + /** + * Get the job object for which the change is reported + */ + JobInProgress getJobInProgress() { + return jip; + } +} Index: src/java/org/apache/hama/bsp/JobClient.java =================================================================== --- src/java/org/apache/hama/bsp/JobClient.java (리비전 943815) +++ src/java/org/apache/hama/bsp/JobClient.java (작업 사본) @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.BSPMaster; -import org.apache.hama.ipc.JobSubmissionProtocol; - -public class JobClient extends Configured { - private static final Log LOG = LogFactory.getLog(JobClient.class); - - public static enum TaskStatusFilter { - NONE, KILLED, FAILED, SUCCEEDED, ALL - } - - static { - Configuration.addDefaultResource("hama-default.xml"); - } - - @SuppressWarnings("unused") - private JobSubmissionProtocol jobSubmitClient = null; - - public JobClient() { - } - - public JobClient(HamaConfiguration conf) throws IOException { - setConf(conf); - init(conf); - } - - public void init(HamaConfiguration conf) throws IOException { - String tracker = conf.get("hama.master.address", "local"); - this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); - } - - private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, - Configuration conf) throws IOException { - return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory( - conf, JobSubmissionProtocol.class)); - } -} Index: src/java/org/apache/hama/bsp/JobContext.java =================================================================== --- src/java/org/apache/hama/bsp/JobContext.java (리비전 943815) +++ src/java/org/apache/hama/bsp/JobContext.java (작업 사본) @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hama.bsp; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Text; -import org.apache.hama.graph.InputFormat; -import org.apache.hama.graph.OutputFormat; - -/** - * A read-only view of the job that is provided to the tasks while they are - * running. - */ -public class JobContext { - // Put all of the attribute names in here so that Job and JobContext are - // consistent. - protected static final String INPUT_FORMAT_CLASS_ATTR = "angrapa.inputformat.class"; - protected static final String WALKER_CLASS_ATTR = "angrapa.walker.class"; - protected static final String OUTPUT_FORMAT_CLASS_ATTR = "angrapa.outputformat.class"; - - protected final Configuration conf; - private final JobID jobId; - - public JobContext(Configuration conf, JobID jobId) { - this.conf = conf; - this.jobId = jobId; - } - - public Configuration getConfiguration() { - return conf; - } - - public JobID getJobID() { - return jobId; - } - - public Path getWorkingDirectory() throws IOException { - String name = conf.get("angrapa.working.dir"); - - if (name != null) { - return new Path(name); - } else { - try { - Path dir = FileSystem.get(conf).getWorkingDirectory(); - conf.set("angrapa.working.dir", dir.toString()); - return dir; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - public Class getOutputKeyClass() { - return conf.getClass("angrapa.output.key.class", LongWritable.class, - Object.class); - } - - public Class getOutputValueClass() { - return conf - .getClass("angrapa.output.value.class", Text.class, Object.class); - } - - public String getJobName() { - return conf.get("angrapa.job.name", ""); - } - - @SuppressWarnings("unchecked") - public Class> getInputFormatClass() - throws ClassNotFoundException { - return (Class>) conf.getClass( - INPUT_FORMAT_CLASS_ATTR, InputFormat.class); // TODO: To be corrected - // to an implemented class - } - - @SuppressWarnings("unchecked") - public Class> getOutputFormatClass() - throws ClassNotFoundException { - return (Class>) conf.getClass( - OUTPUT_FORMAT_CLASS_ATTR, OutputFormat.class); // TODO: To be corrected - // to an implemented - // class - } - - public RawComparator getSortComparator() { - return null; - } - - public String getJar() { - return conf.get("walker.jar"); - } -} Index: src/java/org/apache/hama/bsp/JobID.java =================================================================== --- src/java/org/apache/hama/bsp/JobID.java (리비전 943815) +++ src/java/org/apache/hama/bsp/JobID.java (작업 사본) @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp; - -import java.io.DataInput; - -import java.io.DataOutput; -import java.io.IOException; -import java.text.NumberFormat; - -import org.apache.hadoop.io.Text; - -public class JobID extends ID implements Comparable { - protected static final String JOB = "job"; - private final Text jtIdentifier; - - protected static final NumberFormat idFormat = NumberFormat.getInstance(); - static { - idFormat.setGroupingUsed(false); - idFormat.setMinimumIntegerDigits(4); - } - - public JobID(String jtIdentifier, int id) { - super(id); - this.jtIdentifier = new Text(jtIdentifier); - } - - public JobID() { - jtIdentifier = new Text(); - } - - public String getJtIdentifier() { - return jtIdentifier.toString(); - } - - @Override - public boolean equals(Object o) { - if (!super.equals(o)) - return false; - - JobID that = (JobID) o; - return this.jtIdentifier.equals(that.jtIdentifier); - } - - @Override - public int compareTo(ID o) { - JobID that = (JobID) o; - int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier); - if (jtComp == 0) { - return this.id - that.id; - } else - return jtComp; - } - - public StringBuilder appendTo(StringBuilder builder) { - builder.append(SEPARATOR); - builder.append(jtIdentifier); - builder.append(SEPARATOR); - builder.append(idFormat.format(id)); - return builder; - } - - @Override - public int hashCode() { - return jtIdentifier.hashCode() + id; - } - - @Override - public String toString() { - return appendTo(new StringBuilder(JOB)).toString(); - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - this.jtIdentifier.readFields(in); - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - jtIdentifier.write(out); - } - - public static JobID forName(String str) throws IllegalArgumentException { - if (str == null) - return null; - try { - String[] parts = str.split("_"); - if (parts.length == 3) { - if (parts[0].equals(JOB)) { - return new JobID(parts[1], Integer.parseInt(parts[2])); - } - } - } catch (Exception ex) { - } - throw new IllegalArgumentException("JobId string : " + str - + " is not properly formed"); - } -} Index: src/java/org/apache/hama/bsp/JobInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/JobInProgress.java (리비전 0) +++ src/java/org/apache/hama/bsp/JobInProgress.java (리비전 0) @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/************************************************************* + * JobInProgress maintains all the info for keeping a Job on the straight and + * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of + * tables for doing bookkeeping of its Tasks. + * *********************************************************** + */ +class JobInProgress { + /** + * Used when the a kill is issued to a job which is initializing. + */ + static class KillInterruptedException extends InterruptedException { + private static final long serialVersionUID = 1L; + + public KillInterruptedException(String msg) { + super(msg); + } + } + + static final Log LOG = LogFactory.getLog(JobInProgress.class); + + JobProfile profile; + JobStatus status; + Path jobFile = null; + Path localJobFile = null; + Path localJarFile = null; + + long startTime; + long launchTime; + long finishTime; + + // private LocalFileSystem localFs; + private BSPJobID jobId; + + final BSPMaster master; + + public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf) + throws IOException { + this.jobId = jobId; + + this.master = master; + this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP); + this.startTime = System.currentTimeMillis(); + status.setStartTime(startTime); + // this.localFs = FileSystem.getLocal(conf); + + this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId + + ".xml"); + this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId + + ".jar"); + Path jobDir = master.getSystemDirectoryForJob(jobId); + FileSystem fs = jobDir.getFileSystem(conf); + jobFile = new Path(jobDir, "job.xml"); + fs.copyToLocalFile(jobFile, localJobFile); + BSPJobContext job = new BSPJobContext(localJobFile, jobId); + + System.out.println("user:" + job.getUser()); + System.out.println("jobId:" + jobId); + System.out.println("jobFile:" + jobFile.toString()); + System.out.println("jobName:" + job.getJobName()); + + this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job + .getJobName()); + + String jarFile = job.getJar(); + if (jarFile != null) { + fs.copyToLocalFile(new Path(jarFile), localJarFile); + } + } + + // /////////////////////////////////////////////////// + // Accessors for the JobInProgress + // /////////////////////////////////////////////////// + public JobProfile getProfile() { + return profile; + } + + public JobStatus getStatus() { + return status; + } + + public synchronized long getLaunchTime() { + return launchTime; + } + + public long getStartTime() { + return startTime; + } + + public long getFinishTime() { + return finishTime; + } + + /** + * @return The JobID of this JobInProgress. + */ + public BSPJobID getJobID() { + return jobId; + } + + public String toString() { + return "jobName:" + profile.getJobName() + "\n" + "submit user:" + + profile.getUser() + "\n" + "JobId:" + jobId + "\n" + "JobFile:" + + jobFile + "\n"; + } + + // /////////////////////////////////////////////////// + // Create/manage tasks + // /////////////////////////////////////////////////// + public synchronized Task obtainNewTask(GroomServerStatus status, + int clusterSize, int numUniqueHosts) { + + return null; + } +} Index: src/java/org/apache/hama/bsp/JobInProgressListener.java =================================================================== --- src/java/org/apache/hama/bsp/JobInProgressListener.java (리비전 0) +++ src/java/org/apache/hama/bsp/JobInProgressListener.java (리비전 0) @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; + +/** + * A listener for changes in a {@link JobInProgress job}'s lifecycle in the + * {@link BSPMaster}. + */ +abstract class JobInProgressListener { + + /** + * Invoked when a new job has been added to the {@link BSPMaster}. + * + * @param job The added job. + * @throws IOException + */ + public abstract void jobAdded(JobInProgress job) throws IOException; + + /** + * Invoked when a job has been removed from the {@link BSPMaster}. + * + * @param job The removed job. + */ + public abstract void jobRemoved(JobInProgress job); + + /** + * Invoked when a job has been updated in the {@link BSPMaster}. This change + * in the job is tracker using {@link JobChangeEvent}. + * + * @param event the event that tracks the change + */ + public abstract void jobUpdated(JobChangeEvent event); +} Index: src/java/org/apache/hama/bsp/JobProfile.java =================================================================== --- src/java/org/apache/hama/bsp/JobProfile.java (리비전 0) +++ src/java/org/apache/hama/bsp/JobProfile.java (리비전 0) @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/************************************************** + * A JobProfile tracks job's status + * + **************************************************/ +public class JobProfile implements Writable { + + static { // register a ctor + WritableFactories.setFactory(JobProfile.class, new WritableFactory() { + public Writable newInstance() { + return new JobProfile(); + } + }); + } + + String user; + final BSPJobID jobid; + String jobFile; + String name; + + /** + * Construct an empty {@link JobProfile}. + */ + public JobProfile() { + jobid = new BSPJobID(); + } + + /** + * Construct a {@link JobProfile} the userid, jobid, job config-file, + * job-details url and job name. + * + * @param user userid of the person who submitted the job. + * @param jobid id of the job. + * @param jobFile job configuration file. + * @param url link to the web-ui for details of the job. + * @param name user-specified job name. + * @param queueName name of the queue to which the job is submitted + */ + public JobProfile(String user, BSPJobID jobid, String jobFile, String name) { + this.user = user; + this.jobid = jobid; + this.jobFile = jobFile; + this.name = name; + } + + /** + * Get the user id. + */ + public String getUser() { + return user; + } + + /** + * Get the job id. + */ + public BSPJobID getJobID() { + return jobid; + } + + /** + * Get the configuration file for the job. + */ + public String getJobFile() { + return jobFile; + } + + /** + * Get the user-specified job name. + */ + public String getJobName() { + return name; + } + + // ///////////////////////////////////// + // Writable + // ///////////////////////////////////// + public void write(DataOutput out) throws IOException { + jobid.write(out); + Text.writeString(out, jobFile); + Text.writeString(out, user); + Text.writeString(out, name); + } + + public void readFields(DataInput in) throws IOException { + jobid.readFields(in); + this.jobFile = Text.readString(in); + this.user = Text.readString(in); + this.name = Text.readString(in); + } +} Index: src/java/org/apache/hama/bsp/JobStatus.java =================================================================== --- src/java/org/apache/hama/bsp/JobStatus.java (리비전 943815) +++ src/java/org/apache/hama/bsp/JobStatus.java (작업 사본) @@ -43,7 +43,7 @@ public static final int PREP = 4; public static final int KILLED = 5; - private JobID jobid; + private BSPJobID jobid; private float progress; private float cleanupProgress; private float setupProgress; @@ -54,16 +54,16 @@ public JobStatus() { } - public JobStatus(JobID jobid, float progress, int runState) { + public JobStatus(BSPJobID jobid, float progress, int runState) { this(jobid, progress, 0.0f, runState); } - public JobStatus(JobID jobid, float progress, float cleanupProgress, + public JobStatus(BSPJobID jobid, float progress, float cleanupProgress, int runState) { this(jobid, 0.0f, progress, cleanupProgress, runState); } - public JobStatus(JobID jobid, float setupProgress, float progress, + public JobStatus(BSPJobID jobid, float setupProgress, float progress, float cleanupProgress, int runState) { this.jobid = jobid; this.setupProgress = setupProgress; @@ -72,7 +72,7 @@ this.runState = runState; } - public JobID getJobID() { + public BSPJobID getJobID() { return jobid; } @@ -148,7 +148,7 @@ } public synchronized void readFields(DataInput in) throws IOException { - this.jobid = new JobID(); + this.jobid = new BSPJobID(); jobid.readFields(in); this.setupProgress = in.readFloat(); this.progress = in.readFloat(); Index: src/java/org/apache/hama/bsp/KillJobAction.java =================================================================== --- src/java/org/apache/hama/bsp/KillJobAction.java (리비전 0) +++ src/java/org/apache/hama/bsp/KillJobAction.java (리비전 0) @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the + * {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup + * resources. + * + */ +class KillJobAction extends GroomServerAction { + final BSPJobID jobId; + + public KillJobAction() { + super(ActionType.KILL_JOB); + jobId = new BSPJobID(); + } + + public KillJobAction(BSPJobID jobId) { + super(ActionType.KILL_JOB); + this.jobId = jobId; + } + + public BSPJobID getJobID() { + return jobId; + } + + @Override + public void write(DataOutput out) throws IOException { + jobId.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + jobId.readFields(in); + } + +} Index: src/java/org/apache/hama/bsp/KillTaskAction.java =================================================================== --- src/java/org/apache/hama/bsp/KillTaskAction.java (리비전 0) +++ src/java/org/apache/hama/bsp/KillTaskAction.java (리비전 0) @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + + +/** + * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} + * to the {@link org.apache.hama.bsp.GroomServer} to kill a task. + * + */ +class KillTaskAction extends GroomServerAction { + final TaskAttemptID taskId; + + public KillTaskAction() { + super(ActionType.KILL_TASK); + taskId = new TaskAttemptID(); + } + + public KillTaskAction(TaskAttemptID taskId) { + super(ActionType.KILL_TASK); + this.taskId = taskId; + } + + public TaskAttemptID getTaskID() { + return taskId; + } + + @Override + public void write(DataOutput out) throws IOException { + taskId.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + taskId.readFields(in); + } +} Index: src/java/org/apache/hama/bsp/LaunchTaskAction.java =================================================================== --- src/java/org/apache/hama/bsp/LaunchTaskAction.java (리비전 0) +++ src/java/org/apache/hama/bsp/LaunchTaskAction.java (리비전 0) @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the + * {@link org.apache.hama.bsp.GroomServer} to launch a new task. + * + */ +class LaunchTaskAction extends GroomServerAction { + private Task task; + + public LaunchTaskAction() { + super(ActionType.LAUNCH_TASK); + } + + public LaunchTaskAction(Task task) { + super(ActionType.LAUNCH_TASK); + this.task = task; + } + + public Task getTask() { + return task; + } + + public void write(DataOutput out) throws IOException { + task.write(out); + } + + public void readFields(DataInput in) throws IOException { + task = new Task(); + task.readFields(in); + } + +} Index: src/java/org/apache/hama/bsp/LocalBSPCluster.java =================================================================== --- src/java/org/apache/hama/bsp/LocalBSPCluster.java (리비전 0) +++ src/java/org/apache/hama/bsp/LocalBSPCluster.java (리비전 0) @@ -0,0 +1,97 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.util.ClusterUtil; + +public class LocalBSPCluster { + public static final Log LOG = LogFactory.getLog(LocalBSPCluster.class); + + private final BSPMaster master; + private final List groomThreads; + private final Configuration conf; + private Class groomServerClass; + private final static int DEFAULT_NO = 1; + + public LocalBSPCluster(final Configuration conf) throws IOException, InterruptedException { + this(conf, DEFAULT_NO); + } + + public LocalBSPCluster(final Configuration conf, final int noGroomServers) + throws IOException, InterruptedException { + this(conf, noGroomServers, BSPMaster.class, + getGroomServerImplementation(conf)); + } + + @SuppressWarnings("unchecked") + public LocalBSPCluster(final Configuration conf, final int noGroomServers, + final Class masterClass, + final Class groomServerClass) throws IOException, InterruptedException { + conf.set("bsp.master.port", "40000"); + conf.set("bsp.groom.port", "40020"); + conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local"); + conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system"); + this.conf = conf; + + // Create the master + this.master = BSPMaster.constructMaster(masterClass, conf); + this.groomThreads = new CopyOnWriteArrayList(); + this.groomServerClass = (Class) conf.getClass( + HConstants.REGION_SERVER_IMPL, groomServerClass); + for (int i = 0; i < noGroomServers; i++) { + addGroomServer(i); + } + } + + @SuppressWarnings("unchecked") + private static Class getGroomServerImplementation( + final Configuration conf) { + return (Class) conf.getClass( + HConstants.REGION_SERVER_IMPL, GroomServer.class); + } + + public ClusterUtil.GroomServerThread addGroomServer(final int index) + throws IOException { + LOG.info("Adding Groom Server"); + ClusterUtil.GroomServerThread rst = ClusterUtil + .createGroomServerThread(this.conf, this.groomServerClass, index); + this.groomThreads.add(rst); + return rst; + } + + public void startup() { + try { + ClusterUtil.startup(this.master, this.groomThreads, conf); + } catch (IOException e) { + LOG.info(e); + } catch (InterruptedException e) { + LOG.info(e); + } + } + + public void shutdown() { + ClusterUtil.shutdown(this.master, this.groomThreads, conf); + } + + /** + * Test things basically work. + * + * @param args + * @throws IOException + * @throws InterruptedException + */ + public static void main(String[] args) throws IOException, InterruptedException { + StringUtils.startupShutdownMessage(LocalBSPCluster.class, args, LOG); + HamaConfiguration conf = new HamaConfiguration(); + LocalBSPCluster cluster = new LocalBSPCluster(conf); + cluster.startup(); + } +} Index: src/java/org/apache/hama/bsp/ReinitTrackerAction.java =================================================================== --- src/java/org/apache/hama/bsp/ReinitTrackerAction.java (리비전 0) +++ src/java/org/apache/hama/bsp/ReinitTrackerAction.java (리비전 0) @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the + * {@link org.apache.hama.bsp.GroomServer} to reinitialize itself. + * + */ +class ReinitTrackerAction extends GroomServerAction { + + public ReinitTrackerAction() { + super(ActionType.REINIT_GROOM); + } + + public void write(DataOutput out) throws IOException { + } + + public void readFields(DataInput in) throws IOException { + } + +} Index: src/java/org/apache/hama/bsp/RunningJob.java =================================================================== --- src/java/org/apache/hama/bsp/RunningJob.java (리비전 0) +++ src/java/org/apache/hama/bsp/RunningJob.java (리비전 0) @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; + +/** + * RunningJob is the user-interface to query for details on a + * running BSP job. + * + *

+ * Clients can get hold of RunningJob via the {@link BSPJobClient} + * and then query the running-job for details such as name, configuration, + * progress etc. + *

+ * + * @see BSPJobClient + */ +public interface RunningJob { + /** + * Get the job identifier. + * + * @return the job identifier. + */ + public BSPJobID getID(); + + /** + * Get the name of the job. + * + * @return the name of the job. + */ + public String getJobName(); + + /** + * Get the path of the submitted job configuration. + * + * @return the path of the submitted job configuration. + */ + public String getJobFile(); + + /** + * Get the progress of the job's tasks, as a float between 0.0 and 1.0. + * When all bsp tasks have completed, the function returns 1.0. + * + * @return the progress of the job's tasks. + * @throws IOException + */ + public float progress() throws IOException; + + /** + * Check if the job is finished or not. This is a non-blocking call. + * + * @return true if the job is complete, else false. + * @throws IOException + */ + public boolean isComplete() throws IOException; + + /** + * Check if the job completed successfully. + * + * @return true if the job succeeded, else false. + * @throws IOException + */ + public boolean isSuccessful() throws IOException; + + /** + * Blocks until the job is complete. + * + * @throws IOException + */ + public void waitForCompletion() throws IOException; + + /** + * Returns the current state of the Job. {@link JobStatus} + * + * @throws IOException + */ + public int getJobState() throws IOException; + + /** + * Kill the running job. Blocks until all job tasks have been killed as well. + * If the job is no longer running, it simply returns. + * + * @throws IOException + */ + public void killJob() throws IOException; + + /** + * Kill indicated task attempt. + * + * @param taskId the id of the task to be terminated. + * @param shouldFail if true the task is failed and added to failed tasks + * list, otherwise it is just killed, w/o affecting job failure + * status. + * @throws IOException + */ + public void killTask(TaskAttemptID taskId, boolean shouldFail) + throws IOException; +} Index: src/java/org/apache/hama/bsp/SimpleTaskScheduler.java =================================================================== --- src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (리비전 0) +++ src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (리비전 0) @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +class SimpleTaskScheduler extends TaskScheduler { + private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class); + List jobQueue; + + public SimpleTaskScheduler() { + jobQueue = new ArrayList(); + } + + @Override + public void addJob(JobInProgress job) { + LOG.debug(">> Added a job (" + job + ") to scheduler (remaining jobs: " + + (jobQueue.size() + 1) + ")"); + jobQueue.add(job); + } + + @Override + public Collection getJobs() { + return jobQueue; + } + + /* + * (non-Javadoc) + * @seeorg.apache.hama.bsp.TaskScheduler#assignTasks(org.apache.hama.bsp. + * GroomServerStatus) + */ + @Override + public List assignTasks(GroomServerStatus groomStatus) + throws IOException { + ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false); + + final int numGroomServers = clusterStatus.getGroomServers(); + // final int clusterTaskCapacity = clusterStatus.getMaxTasks(); + + // + // Get task counts for the current groom. + // + // final int groomTaskCapacity = groom.getMaxTasks(); + final int groomRunningTasks = groomStatus.countTasks(); + + // Assigned tasks + List assignedTasks = new ArrayList(); + + // Task task = null; + if (groomRunningTasks == 0) { + // TODO - Each time a job is submitted in BSPMaster, add a JobInProgress + // instance to the scheduler. + synchronized (jobQueue) { + for (JobInProgress job : jobQueue) { + if (job.getStatus().getRunState() != JobStatus.RUNNING) { + continue; + } + + Task t = null; + + t = job.obtainNewTask(groomStatus, numGroomServers, + groomServerManager.getNumberOfUniqueHosts()); + if (t != null) { + assignedTasks.add(t); + break; // TODO - Now, simple scheduler assigns only one task to + // each groom. Later, it will be improved for scheduler to + // assign one or more tasks to each groom according to + // its capacity. + } + } + } + } + + return assignedTasks; + } +} Index: src/java/org/apache/hama/bsp/Task.java =================================================================== --- src/java/org/apache/hama/bsp/Task.java (리비전 0) +++ src/java/org/apache/hama/bsp/Task.java (리비전 0) @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * + */ +public class Task implements Writable { + //////////////////////////////////////////// + // Fields + //////////////////////////////////////////// + private String jobFile; + private TaskAttemptID taskId; + private int partition; + + protected LocalDirAllocator lDirAlloc; + /** + * + */ + public Task() { + taskId = new TaskAttemptID(); + } + + public Task(String jobFile, TaskAttemptID taskId, int partition) { + this.jobFile = jobFile; + this.taskId = taskId; + + this.partition = partition; + } + + //////////////////////////////////////////// + // Accessors + //////////////////////////////////////////// + public void setJobFile(String jobFile) { + this.jobFile = jobFile; + } + + public String getJobFile() { + return jobFile; + } + + public TaskAttemptID getTaskID() { + return taskId; + } + + /** + * Get the job name for this task. + * @return the job name + */ + public BSPJobID getJobID() { + return taskId.getJobID(); + } + + /** + * Get the index of this task within the job. + * @return the integer part of the task id + */ + public int getPartition() { + return partition; + } + + @Override + public String toString() { + return taskId.toString(); + } + + //////////////////////////////////////////// + // Writable + //////////////////////////////////////////// + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, jobFile); + taskId.write(out); + out.writeInt(partition); + } + + @Override + public void readFields(DataInput in) throws IOException { + jobFile = Text.readString(in); + taskId.readFields(in); + partition = in.readInt(); + } +} Index: src/java/org/apache/hama/bsp/TaskAttemptContext.java =================================================================== --- src/java/org/apache/hama/bsp/TaskAttemptContext.java (리비전 943815) +++ src/java/org/apache/hama/bsp/TaskAttemptContext.java (작업 사본) @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.bsp; import java.io.IOException; @@ -26,7 +25,7 @@ /** * The context for task attempts. */ -public class TaskAttemptContext extends JobContext implements Progressable { +public class TaskAttemptContext extends BSPJobContext implements Progressable { private final TaskAttemptID taskId; private String status = ""; Index: src/java/org/apache/hama/bsp/TaskAttemptID.java =================================================================== --- src/java/org/apache/hama/bsp/TaskAttemptID.java (리비전 943815) +++ src/java/org/apache/hama/bsp/TaskAttemptID.java (작업 사본) @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.bsp; import java.io.DataInput; @@ -43,7 +42,7 @@ taskId = new TaskID(); } - public JobID getJobID() { + public BSPJobID getJobID() { return taskId.getJobID(); } Index: src/java/org/apache/hama/bsp/TaskID.java =================================================================== --- src/java/org/apache/hama/bsp/TaskID.java (리비전 943815) +++ src/java/org/apache/hama/bsp/TaskID.java (작업 사본) @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hama.bsp; import java.io.DataInput; @@ -31,10 +30,10 @@ idFormat.setMinimumIntegerDigits(6); } - private JobID jobId; + private BSPJobID jobId; private boolean isMatrixTask; - public TaskID(JobID jobId, boolean isMatrixTask, int id) { + public TaskID(BSPJobID jobId, boolean isMatrixTask, int id) { super(id); if (jobId == null) { throw new IllegalArgumentException("jobId cannot be null"); @@ -44,15 +43,15 @@ } public TaskID(String jtIdentifier, int jobId, boolean isGraphTask, int id) { - this(new JobID(jtIdentifier, jobId), isGraphTask, id); + this(new BSPJobID(jtIdentifier, jobId), isGraphTask, id); } public TaskID() { - jobId = new JobID(); + jobId = new BSPJobID(); } - /** Returns the {@link JobID} object that this tip belongs to */ - public JobID getJobID() { + /** Returns the {@link BSPJobID} object that this tip belongs to */ + public BSPJobID getJobID() { return jobId; } Index: src/java/org/apache/hama/bsp/TaskInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 0) +++ src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 0) @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + */ +class TaskInProgress { + public static final Log LOG = LogFactory.getLog(TaskInProgress.class); + + private BSPJobContext context; + + // Constants + static final int MAX_TASK_EXECS = 1; + int maxTaskAttempts = 4; + + // Job Meta + private String jobFile = null; + private int partition; + private BSPMaster bspMaster; + private TaskID id; + private JobInProgress job; + private int completes = 0; + + // Status + // private double progress = 0; + // private String state = ""; + private long startTime = 0; + + // The 'next' usable taskid of this tip + int nextTaskId = 0; + + // The taskid that took this TIP to SUCCESS + // private TaskAttemptID successfulTaskId; + + // The first taskid of this tip + private TaskAttemptID firstTaskId; + + // Map from task Id -> GroomServer Id, contains tasks that are + // currently runnings + private TreeMap activeTasks = new TreeMap(); + // All attempt Ids of this TIP + // private TreeSet tasks = new TreeSet(); + /** + * Map from taskId -> TaskStatus + */ + private TreeMap taskStatuses = new TreeMap(); + + public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master, + BSPJobContext context, JobInProgress job, int partition) { + this.jobFile = jobFile; + this.bspMaster = master; + this.job = job; + this.context = context; + this.partition = partition; + } + + // ////////////////////////////////// + // Accessors + // ////////////////////////////////// + /** + * Return the start time + */ + public long getStartTime() { + return startTime; + } + + /** + * Return the parent job + */ + public JobInProgress getJob() { + return job; + } + + public TaskID getTIPId() { + return this.id; + } + + /** + * Is the Task associated with taskid is the first attempt of the tip? + * + * @param taskId + * @return Returns true if the Task is the first attempt of the tip + */ + public boolean isFirstAttempt(TaskAttemptID taskId) { + return firstTaskId == null ? false : firstTaskId.equals(taskId); + } + + /** + * Is this tip currently running any tasks? + * + * @return true if any tasks are running + */ + public boolean isRunning() { + return !activeTasks.isEmpty(); + } + + /** + * Is this tip complete? + * + * @return true if the tip is complete, else false + */ + public synchronized boolean isComplete() { + return (completes > 0); + } +} Index: src/java/org/apache/hama/bsp/TaskScheduler.java =================================================================== --- src/java/org/apache/hama/bsp/TaskScheduler.java (리비전 0) +++ src/java/org/apache/hama/bsp/TaskScheduler.java (리비전 0) @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +/** + * Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer} + * s. + */ +abstract class TaskScheduler implements Configurable { + + protected Configuration conf; + protected GroomServerManager groomServerManager; + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public synchronized void setGroomServerManager( + GroomServerManager groomServerManager) { + this.groomServerManager = groomServerManager; + } + + /** + * Lifecycle method to allow the scheduler to start any work in separate + * threads. + * + * @throws IOException + */ + public void start() throws IOException { + // do nothing + } + + /** + * Lifecycle method to allow the scheduler to stop any work it is doing. + * + * @throws IOException + */ + public void terminate() throws IOException { + // do nothing + } + + public abstract void addJob(JobInProgress job); + + /** + * Returns a collection of jobs in an order which is specific to the + * particular scheduler. + * + * @param queueName + * @return + */ + public abstract Collection getJobs(); + + /** + * Returns the tasks we'd like the GroomServer to execute right now. + * + * @param groomServer The GroomServer for which we're looking for tasks. + * @return A list of tasks to run on that GroomServer, possibly empty. + */ + public abstract List assignTasks(GroomServerStatus groomStatus) + throws IOException; +} Index: src/java/org/apache/hama/bsp/TaskStatus.java =================================================================== --- src/java/org/apache/hama/bsp/TaskStatus.java (리비전 0) +++ src/java/org/apache/hama/bsp/TaskStatus.java (리비전 0) @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +class TaskStatus implements Writable { + static final Log LOG = LogFactory.getLog(TaskStatus.class); + + // enumeration for reporting current phase of a task. + public static enum Phase { + STARTING, COMPUTE, BARRIER_SYNC, CLEANUP + } + + // what state is the task in? + public static enum State { + RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN + } + + private final TaskAttemptID taskId; + private float progress; + private volatile State runState; + private String stateString; + private String groomServer; + + private long startTime; + private long finishTime; + + private volatile Phase phase = Phase.STARTING; + + /** + * + */ + public TaskStatus() { + taskId = new TaskAttemptID(); + } + + public TaskStatus(TaskAttemptID taskId, float progress, State runState, + String stateString, String groomServer, Phase phase) { + this.taskId = taskId; + this.progress = progress; + this.runState = runState; + this.stateString = stateString; + this.groomServer = groomServer; + this.phase = phase; + } + + // ////////////////////////////////////////////////// + // Accessors and Modifiers + // ////////////////////////////////////////////////// + + public TaskAttemptID getTaskId() { + return taskId; + } + + public float getProgress() { + return progress; + } + + public void setProgress(float progress) { + this.progress = progress; + } + + public State getRunState() { + return runState; + } + + public void setRunState(State state) { + this.runState = state; + } + + public String getStateString() { + return stateString; + } + + public void setStateString(String stateString) { + this.stateString = stateString; + } + + public String getGroomServer() { + return groomServer; + } + + public void setGroomServer(String groomServer) { + this.groomServer = groomServer; + } + + public long getFinishTime() { + return finishTime; + } + + void setFinishTime(long finishTime) { + this.finishTime = finishTime; + } + + /** + * Get start time of the task. + * + * @return 0 is start time is not set, else returns start time. + */ + public long getStartTime() { + return startTime; + } + + /** + * Set startTime of the task. + * + * @param startTime start time + */ + void setStartTime(long startTime) { + this.startTime = startTime; + } + + /** + * Get current phase of this task. + * + * @return . + */ + public Phase getPhase() { + return this.phase; + } + + /** + * Set current phase of this task. + * + * @param phase phase of this task + */ + void setPhase(Phase phase) { + this.phase = phase; + } + + /** + * Update the status of the task. + * + * This update is done by ping thread before sending the status. + * + * @param progress + * @param state + * @param counters + */ + synchronized void statusUpdate(float progress, String state) { + setProgress(progress); + setStateString(state); + } + + /** + * Update the status of the task. + * + * @param status updated status + */ + synchronized void statusUpdate(TaskStatus status) { + this.progress = status.getProgress(); + this.runState = status.getRunState(); + this.stateString = status.getStateString(); + + if (status.getStartTime() != 0) { + this.startTime = status.getStartTime(); + } + if (status.getFinishTime() != 0) { + this.finishTime = status.getFinishTime(); + } + + this.phase = status.getPhase(); + } + + /** + * Update specific fields of task status + * + * This update is done in BSPMaster when a cleanup attempt of task reports its + * status. Then update only specific fields, not all. + * + * @param runState + * @param progress + * @param state + * @param phase + * @param finishTime + */ + synchronized void statusUpdate(State runState, float progress, String state, + Phase phase, long finishTime) { + setRunState(runState); + setProgress(progress); + setStateString(state); + setPhase(phase); + if (finishTime != 0) { + this.finishTime = finishTime; + } + } + + @Override + public Object clone() { + try { + return super.clone(); + } catch (CloneNotSupportedException cnse) { + // Shouldn't happen since we do implement Clonable + throw new InternalError(cnse.toString()); + } + } + + // //////////////////////////////////////////// + // Writable + // //////////////////////////////////////////// + + @Override + public void readFields(DataInput in) throws IOException { + this.taskId.readFields(in); + this.progress = in.readFloat(); + this.runState = WritableUtils.readEnum(in, State.class); + this.stateString = Text.readString(in); + this.phase = WritableUtils.readEnum(in, Phase.class); + this.startTime = in.readLong(); + this.finishTime = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + taskId.write(out); + out.writeFloat(progress); + WritableUtils.writeEnum(out, runState); + Text.writeString(out, stateString); + WritableUtils.writeEnum(out, phase); + out.writeLong(startTime); + out.writeLong(finishTime); + } +} Index: src/java/org/apache/hama/bsp/Work.java =================================================================== --- src/java/org/apache/hama/bsp/Work.java (리비전 0) +++ src/java/org/apache/hama/bsp/Work.java (리비전 0) @@ -0,0 +1,5 @@ +package org.apache.hama.bsp; + +public class Work { + +} Index: src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java =================================================================== --- src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java (리비전 943815) +++ src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java (작업 사본) @@ -1,6 +1,4 @@ /** - * Copyright 2008 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information Index: src/java/org/apache/hama/ipc/HeartbeatResponse.java =================================================================== --- src/java/org/apache/hama/ipc/HeartbeatResponse.java (리비전 943815) +++ src/java/org/apache/hama/ipc/HeartbeatResponse.java (작업 사본) @@ -1,69 +0,0 @@ -/** - * Copyright 2008 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.ipc; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; - -public class HeartbeatResponse implements Writable, Configurable { - private Configuration conf; - - private short responseId; - - public HeartbeatResponse() { - } - - public HeartbeatResponse(short responseId) { - this.responseId = responseId; - } - - public void setResponseId(short responseId) { - this.responseId = responseId; - } - - public short getResponseId() { - return responseId; - } - - @Override - public void readFields(DataInput in) throws IOException { - this.responseId = in.readShort(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeShort(this.responseId); - } - - @Override - public Configuration getConf() { - return this.conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } -} Index: src/java/org/apache/hama/ipc/InterTrackerProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/InterTrackerProtocol.java (리비전 943815) +++ src/java/org/apache/hama/ipc/InterTrackerProtocol.java (작업 사본) @@ -1,6 +1,4 @@ /** - * Copyright 2008 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,8 +17,15 @@ */ package org.apache.hama.ipc; +import java.io.IOException; + +import org.apache.hama.bsp.GroomServerStatus; +import org.apache.hama.bsp.HeartbeatResponse; + public interface InterTrackerProtocol extends HamaRPCProtocolVersion { - public HeartbeatResponse heartbeat(short responseId); + public HeartbeatResponse heartbeat(GroomServerStatus status, + boolean restarted, boolean initialContact, boolean acceptNewTasks, + short responseId) throws IOException; public String getSystemDir(); } Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 943815) +++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본) @@ -1,6 +1,4 @@ /** - * Copyright 2008 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,15 +19,89 @@ import java.io.IOException; -import org.apache.hama.bsp.JobID; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.bsp.BSPJobID; +import org.apache.hama.bsp.JobProfile; import org.apache.hama.bsp.JobStatus; +import org.apache.hama.bsp.TaskAttemptID; /** - * Protocol that a Walker and the central Master use to communicate. This + * Protocol that a groom server and the central BSP Master use to communicate. This * interface will contains several methods: submitJob, killJob, and killTask. */ public interface JobSubmissionProtocol extends HamaRPCProtocolVersion { - public JobID getNewJobId() throws IOException; + + /** + * Allocate a new id for the job. + * @return + * @throws IOException + */ + public BSPJobID getNewJobId() throws IOException; + + + /** + * Submit a Job for execution. Returns the latest profile for + * that job. + * The job files should be submitted in system-dir/jobName. + * + * @param jobName + * @return + * @throws IOException + */ + public JobStatus submitJob(BSPJobID jobName) throws IOException; + + /** + * Get the current status of the cluster + * @param detailed if true then report groom names as well + * @return summary of the state of the cluster + */ + public ClusterStatus getClusterStatus(boolean detailed) throws IOException; + + /** + * Grab a handle to a job that is already known to the BSPMaster. + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(BSPJobID jobid) throws IOException; + + /** + * Grab a handle to a job that is already known to the BSPMaster. + * @return Status of the job, or null if not found. + */ + public JobStatus getJobStatus(BSPJobID jobid) throws IOException; + + /** + * A BSP system always operates on a single filesystem. This + * function returns the fs name. ('local' if the localfs; 'addr:port' + * if dfs). The client can then copy files into the right locations + * prior to submitting the job. + */ + public String getFilesystemName() throws IOException; + + /** + * Get all the jobs submitted. + * @return array of JobStatus for the submitted jobs + */ + public JobStatus[] getAllJobs() throws IOException; - public JobStatus submitJob(JobID jobName) throws IOException; + /** + * Grab the bspmaster system directory path where job-specific files are to be placed. + * + * @return the system directory where job-specific files are to be placed. + */ + public String getSystemDir(); + + /** + * Kill the indicated job + */ + public void killJob(BSPJobID jobid) throws IOException; + + + /** + * Kill indicated task attempt. + * @param taskId the id of the task to kill. + * @param shouldFail if true the task is failed and added to failed tasks list, otherwise + * it is just killed, w/o affecting job failure status. + */ + public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException; + } Index: src/java/org/apache/hama/util/ClusterUtil.java =================================================================== --- src/java/org/apache/hama/util/ClusterUtil.java (리비전 0) +++ src/java/org/apache/hama/util/ClusterUtil.java (리비전 0) @@ -0,0 +1,98 @@ +package org.apache.hama.util; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.bsp.BSPMaster; +import org.apache.hama.bsp.GroomServer; + +public class ClusterUtil { + public static final Log LOG = LogFactory.getLog(ClusterUtil.class); + + /** + * Datastructure to hold GroomServer Thread and GroomServer instance + */ + public static class GroomServerThread extends Thread { + private final GroomServer groomServer; + + public GroomServerThread(final GroomServer r, final int index) { + super(r, "GroomServer:" + index); + this.groomServer = r; + } + + /** @return the groom server */ + public GroomServer getGroomServer() { + return this.groomServer; + } + + /** + * Block until the groom server has come online, indicating it is ready + * to be used. + */ + public void waitForServerOnline() { + while (!groomServer.isRunning()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // continue waiting + } + } + } + } + + /** + * Creates a {@link GroomServerThread}. + * Call 'start' on the returned thread to make it run. + * @param c Configuration to use. + * @param hrsc Class to create. + * @param index Used distingushing the object returned. + * @throws IOException + * @return Groom server added. + */ + public static ClusterUtil.GroomServerThread createGroomServerThread(final Configuration c, + final Class hrsc, final int index) + throws IOException { + GroomServer server; + try { + server = hrsc.getConstructor(Configuration.class).newInstance(c); + } catch (Exception e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } + return new ClusterUtil.GroomServerThread(server, index); + } + + /** + * Start the cluster. + * @param m + * @param conf + * @param groomServers + * @return Address to use contacting master. + * @throws InterruptedException + * @throws IOException + */ + public static String startup(final BSPMaster m, + final List groomservers, Configuration conf) throws IOException, InterruptedException { + if (m != null) { + m.start(); + } + + if (groomservers != null) { + for (ClusterUtil.GroomServerThread t: groomservers) { + t.start(); + } + } + + return m == null? null: BSPMaster.getAddress(conf).getHostName(); + } + + public static void shutdown(BSPMaster master, + List groomThreads, Configuration conf) { + LOG.debug("Shutting down HAMA Cluster"); + // TODO: + } +} Index: src/test/org/apache/hama/HamaCluster.java =================================================================== --- src/test/org/apache/hama/HamaCluster.java (리비전 943815) +++ src/test/org/apache/hama/HamaCluster.java (작업 사본) @@ -19,18 +19,35 @@ */ package org.apache.hama; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.bsp.LocalBSPCluster; /** * Forming up the miniDfs and miniHbase */ public abstract class HamaCluster extends HBaseClusterTestCase { + public static final Log LOG = LogFactory.getLog(HamaCluster.class); protected final static HamaConfiguration conf = new HamaConfiguration(); - public void setUp() throws Exception { + + protected void setUp() throws Exception { super.setUp(); + + String[] args = new String[0]; + StringUtils.startupShutdownMessage(LocalBSPCluster.class, args, LOG); + HamaConfiguration conf = new HamaConfiguration(); + LocalBSPCluster cluster = new LocalBSPCluster(conf); + cluster.startup(); } - - public static HamaConfiguration getConf() { + + protected static HamaConfiguration getConf() { return conf; } + + protected void setMiniBSPCluster() { + // TODO Auto-generated method stub + + } } Index: src/test/org/apache/hama/bsp/BSPPeerTest.java =================================================================== --- src/test/org/apache/hama/bsp/BSPPeerTest.java (리비전 943815) +++ src/test/org/apache/hama/bsp/BSPPeerTest.java (작업 사본) @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hama.Constants; import org.apache.hama.HamaCluster; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -60,14 +61,14 @@ Stat s = null; if (zk != null) { try { - s = zk.exists(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, false); + s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false); } catch (Exception e) { LOG.error(s); } if (s == null) { try { - zk.create(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], + zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) { LOG.error(e); @@ -155,9 +156,9 @@ BSPPeerThread thread; for (int i = 0; i < NUM_PEER; i++) { conf.set("bsp.peers.num", String.valueOf(NUM_PEER)); - conf.set(BSPConstants.PEER_HOST, "localhost"); - conf.set(BSPConstants.PEER_PORT, String.valueOf(30000 + i)); - conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); + conf.set(Constants.PEER_HOST, "localhost"); + conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); + conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); thread = new BSPPeerThread(conf); list.add(thread); } @@ -178,25 +179,25 @@ Configuration conf = new Configuration(); BSPPeer peer = new BSPPeer(conf); - System.out.println(peer.bindAddress+" = "+BSPConstants.DEFAULT_PEER_HOST); - System.out.println(peer.bindPort+" = "+BSPConstants.DEFAULT_PEER_PORT); - assertEquals(peer.bindAddress,BSPConstants.DEFAULT_PEER_HOST); - assertEquals(peer.bindPort,BSPConstants.DEFAULT_PEER_PORT); - assertEquals(peer.zookeeperAddr,BSPConstants.DEFAULT_ZOOKEEPER_SERVER_ADDR); + System.out.println(peer.bindAddress+" = "+Constants.DEFAULT_PEER_HOST); + System.out.println(peer.bindPort+" = "+Constants.DEFAULT_PEER_PORT); + assertEquals(peer.bindAddress,Constants.DEFAULT_PEER_HOST); + assertEquals(peer.bindPort,Constants.DEFAULT_PEER_PORT); + assertEquals(peer.zookeeperAddr,Constants.DEFAULT_ZOOKEEPER_SERVER_ADDR); int peerPort; int zkPort; conf = new Configuration(); - conf.set(BSPConstants.PEER_HOST, "localhost"); + conf.set(Constants.PEER_HOST, "localhost"); do{ peerPort = r.nextInt(Short.MAX_VALUE); } while(peerPort == 0); - conf.setInt(BSPConstants.PEER_PORT, peerPort); + conf.setInt(Constants.PEER_PORT, peerPort); do{ zkPort = r.nextInt(Short.MAX_VALUE); } while(zkPort == peerPort || zkPort == 0); - conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:"+zkPort); + conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:"+zkPort); peer = new BSPPeer(conf); assertEquals(peer.bindAddress,"localhost"); assertEquals(peer.bindPort,peerPort); Index: src/test/org/apache/hama/bsp/MiniBSPCluster.java =================================================================== --- src/test/org/apache/hama/bsp/MiniBSPCluster.java (리비전 0) +++ src/test/org/apache/hama/bsp/MiniBSPCluster.java (리비전 0) @@ -0,0 +1,5 @@ +package org.apache.hama.bsp; + +public class MiniBSPCluster { + +} Index: src/test/org/apache/hama/bsp/SerializePrinting.java =================================================================== --- src/test/org/apache/hama/bsp/SerializePrinting.java (리비전 943815) +++ src/test/org/apache/hama/bsp/SerializePrinting.java (작업 사본) @@ -7,6 +7,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hama.Constants; import org.apache.hama.HamaCluster; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -37,14 +38,14 @@ Stat s = null; if (zk != null) { try { - s = zk.exists(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, false); + s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false); } catch (Exception e) { LOG.error(s); } if (s == null) { try { - zk.create(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], + zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) { LOG.error(e); @@ -60,10 +61,10 @@ int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 }; for (int i = 0; i < NUM_PEER; i++) { conf.set("bsp.peers.num", String.valueOf(NUM_PEER)); - conf.set(BSPConstants.PEER_HOST, "localhost"); - conf.set(BSPConstants.PEER_PORT, String + conf.set(Constants.PEER_HOST, "localhost"); + conf.set(Constants.PEER_PORT, String .valueOf(30000 + randomSequence[i])); - conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); + conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); thread = new BSPPeerThread(conf, randomSequence[i]); System.out.println(randomSequence[i] + ", " + thread.getName()); list.add(thread); Index: src/test/org/apache/hama/bsp/UserInterface.java =================================================================== --- src/test/org/apache/hama/bsp/UserInterface.java (리비전 0) +++ src/test/org/apache/hama/bsp/UserInterface.java (리비전 0) @@ -0,0 +1,105 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hama.HamaCluster; +import org.apache.hama.HamaConfiguration; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +public class UserInterface extends HamaCluster implements Watcher { + private HamaConfiguration conf; + private String JOBNAME = "hama.test.bsp"; + private Path INPUTPATH = new Path("/tmp/input"); + private Path OUTPUTPATH = new Path("/tmp/output"); + + public UserInterface() { + this.conf = getConf(); + } + + public void testScenario() throws InterruptedException, IOException { + // BSP job configuration + BSPJobConf bsp = new BSPJobConf(conf); + // Set the job name + bsp.setJobName(JOBNAME); + + // Set in/output path and formatter + bsp.setInputPath(conf, INPUTPATH); + bsp.setOutputPath(conf, OUTPUTPATH); + bsp.setInputFormat(MyInputFormat.class); + bsp.setOutputFormat(MyOutputFormat.class); + + // Set the BSP code + bsp.setBSPCode(MyBSP.class); + bsp.submit(); + + //******************* + // assertion checking + assertEquals(bsp.getJobName(), JOBNAME); + //assertEquals(bsp.getInputPath(), INPUTPATH); + //assertEquals(bsp.getOutputPath(), OUTPUTPATH); + } + + class MyBSP implements BSPInterface { + // TODO: implement some BSP example + } + + class MyInputFormat extends InputFormat { + + @Override + public RecordReader createRecordReader(InputSplit arg0, + TaskAttemptContext arg1) throws IOException, InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getSplits(JobContext arg0) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + return null; + } + // TODO: implement some input Formatter + } + + class MyOutputFormat extends OutputFormat { + + @Override + public void checkOutputSpecs(JobContext arg0) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) + throws IOException, InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext arg0) + throws IOException, InterruptedException { + // TODO Auto-generated method stub + return null; + } + // TODO: implement some input Formatter + } + + @Override + public void process(WatchedEvent event) { + // TODO Auto-generated method stub + + } +}