Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1665130)
+++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy)
@@ -304,7 +304,8 @@
job.setJobID(jobId);
int maxTasks;
- if (job.getConfiguration().getBoolean("hama.yarn.application", false)) {
+
+ if (job.getConfiguration().get("bsp.framework.name").equals("yarn")) {
int maxMem = job.getConfiguration().getInt("yarn.nodemanager.resource.memory-mb", 0);
int minAllocationMem = job.getConfiguration().getInt("yarn.scheduler.minimum-allocation-mb", 1024);
maxTasks = maxMem / minAllocationMem;
Index: pom.xml
===================================================================
--- pom.xml (revision 1665130)
+++ pom.xml (working copy)
@@ -212,6 +212,11 @@
commons-io
${commons-io.version}
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ ${hadoop.version}
+
Index: yarn/pom.xml
===================================================================
--- yarn/pom.xml (revision 1665130)
+++ yarn/pom.xml (working copy)
@@ -70,7 +70,6 @@
hadoop-yarn-client
${hadoop.version}
-
org.apache.zookeeper
zookeeper
@@ -80,11 +79,6 @@
hadoop-common
${hadoop.version}
-
- org.apache.hadoop
- hadoop-yarn-client
- ${hadoop.version}
-
Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1665130)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy)
@@ -59,7 +59,7 @@
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Job.JobState;
-import org.apache.hama.bsp.sync.SyncServerRunner;
+import org.apache.hama.bsp.sync.SyncServer;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.RPC;
@@ -101,7 +101,8 @@
private Server taskServer;
private volatile long superstep;
- private SyncServerRunner syncServer;
+ //private SyncServerRunner syncServer;
+ private SyncServer syncServer;
private Counters globalCounter = new Counters();
@@ -114,8 +115,11 @@
}
this.jobFile = args[0];
+
+ this.jobConf = getSubmitConfiguration(jobFile);
+
this.localConf = new YarnConfiguration();
- this.jobConf = getSubmitConfiguration(jobFile);
+ localConf.addResource(localConf);
fs = FileSystem.get(jobConf);
this.applicationName = jobConf.get("bsp.job.name",
@@ -192,11 +196,30 @@
* @throws IOException
*/
private void startSyncServer() throws Exception {
- syncServer = SyncServiceFactory.getSyncServerRunner(jobConf);
- jobConf = syncServer.init(jobConf);
- threadPool.submit(syncServer);
+ syncServer = SyncServiceFactory.getSyncServer(jobConf);
+ syncServer.init(jobConf);
+
+ ZKServerThread serverThread = new ZKServerThread(syncServer);
+ threadPool.submit(serverThread);
}
+ private static class ZKServerThread implements Runnable {
+ SyncServer server;
+
+ ZKServerThread(SyncServer s) {
+ server = s;
+ }
+
+ @Override
+ public void run() {
+ try {
+ server.start();
+ } catch (Exception e) {
+ LOG.error("Error running SyncServer.", e);
+ }
+ }
+ }
+
/**
* Connects to the Resource Manager.
*
@@ -282,11 +305,14 @@
.getApplicationAttemptId();
}
- private void start() throws Exception {
+ private void start() throws IOException, YarnException /*throws Exception*/ {
JobState finalState = null;
try {
job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId);
finalState = job.startJob();
+ } catch (Exception e) {
+ LOG.error("error was occured. cleaning up");
+ e.printStackTrace();
} finally {
if (finalState != null) {
LOG.info("Job \"" + applicationName + "\"'s state after completion: "
@@ -294,12 +320,14 @@
LOG.info("Job took " + ((clock.getTime() - startTime) / 1000L)
+ "s to finish!");
}
+ LOG.info("job is cleaning up");
job.cleanup();
}
}
private void cleanup() throws YarnException, IOException {
- syncServer.stop();
+ //syncServer.stop();
+ syncServer.stopServer();
if (threadPool != null && !threadPool.isShutdown()) {
threadPool.shutdownNow();
Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1665130)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy)
@@ -26,7 +26,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.*;
@@ -101,11 +103,15 @@
ContainerStatus lastStatus = null;
GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
List containerStatuses = getContainerStatusesResponse.getContainerStatuses();
+ if (containerStatuses.size() <= 0) {
+ LOG.info("container Statuses size is zero");
+ return null;
+ }
+
for (ContainerStatus containerStatus : containerStatuses) {
- LOG.info("Got container status for containerID="
- + containerStatus.getContainerId() + ", state="
- + containerStatus.getState() + ", exitStatus="
- + containerStatus.getExitStatus() + ", diagnostics="
+ LOG.info("Got container status for containerID=" + containerStatus
+ .getContainerId() + ", state=" + containerStatus.getState()
+ + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
+ containerStatus.getDiagnostics());
if (containerStatus.getContainerId().equals(allocatedContainer.getId())) {
@@ -113,12 +119,14 @@
break;
}
}
+
if (lastStatus.getState() != ContainerState.COMPLETE) {
+ LOG.info("Not completed...");
return null;
}
- LOG.info(this.id + " Last report comes with exitstatus of "
- + lastStatus.getExitStatus() + " and diagnose string of "
- + lastStatus.getDiagnostics());
+ LOG.info(this.id + " Last report comes with exitstatus of " + lastStatus
+ .getExitStatus() + " and diagnose string of " + lastStatus
+ .getDiagnostics());
return new BSPTaskStatus(id, lastStatus.getExitStatus());
}
@@ -154,19 +162,22 @@
localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
- Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_RELEASE_LOCATION));
+ Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString());
- LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
- hamaReleaseRsrc.setResource(hamaReleaseUrl);
- hamaReleaseRsrc.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_SIZE)));
- hamaReleaseRsrc.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP)));
- hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
- hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ RemoteIterator fileStatusListIterator = fs.listFiles(
+ hamaReleaseFile, true);
- localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
+ while(fileStatusListIterator.hasNext()) {
+ LocatedFileStatus lfs = fileStatusListIterator.next();
+ LocalResource localRsrc = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ lfs.getLen(), lfs.getModificationTime());
+ localResources.put(lfs.getPath().getName(), localRsrc);
+ }
ctx.setLocalResources(localResources);
@@ -187,13 +198,6 @@
classPathEnv.append(c.trim());
}
- classPathEnv.append(File.pathSeparator);
- classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
- "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/*");
- classPathEnv.append(File.pathSeparator);
- classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
- "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/lib/*");
-
Vector vargs = new Vector();
vargs.add("${JAVA_HOME}/bin/java");
vargs.add("-cp " + classPathEnv + "");
Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1665130)
+++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy)
@@ -196,34 +196,27 @@
state = JobState.RUNNING;
int completed = 0;
- List cleanupTasks = new ArrayList();
while (completed != numBSPTasks) {
for (BSPTaskLauncher task : completionQueue) {
BSPTaskStatus returnedTask = task.poll();
- // if our task returned with a finished state
- if (returnedTask != null) {
- if (returnedTask.getExitStatus() != 0) {
- LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
- cleanupTask(returnedTask.getId());
- state = JobState.FAILED;
- return state;
- } else {
- LOG.info("Task \"" + returnedTask.getId()
- + "\" sucessfully finished!");
- completed++;
- LOG.info("Waiting for " + (numBSPTasks - completed)
- + " tasks to finish!");
- }
- cleanupTasks.add(returnedTask.getId());
+ if(returnedTask != null && returnedTask.getExitStatus() == 0) {
+ LOG.info("Task \"" + returnedTask.getId()
+ + "\" sucessfully finished!");
+ completed++;
+ LOG.info("Waiting for " + (numBSPTasks - completed)
+ + " tasks to finish!");
}
+
+ if(returnedTask != null && returnedTask.getExitStatus() != 0) {
+ LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
+ completionQueue.add(task);
+ state = JobState.FAILED;
+ return state;
+ }
}
Thread.sleep(1000L);
}
- for (Integer stopId : cleanupTasks) {
- cleanupTask(stopId);
- }
-
state = JobState.SUCCESS;
return state;
}
@@ -308,6 +301,7 @@
@Override
public void cleanup() throws YarnException, IOException {
for (BSPTaskLauncher launcher : completionQueue) {
+ LOG.info("cleanup tasks !!!");
launcher.stopAndCleanup();
}
}
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java (revision 1665130)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java (working copy)
@@ -42,7 +42,7 @@
/**
* Environment key name pointing to the hama release's location
*/
- public static final String HAMA_RELEASE_LOCATION = "HAMARELEASELOCATION";
+ public static final String HAMA_LOCATION = "HAMALOCATION";
/**
* Environment key name denoting the file content length for the hama release.
@@ -61,23 +61,4 @@
*/
public static final String APP_MASTER_JAR_PATH = "AppMaster.jar";
- /**
- * Symbolic link name for hama release archive in container local resource
- */
- public static final String HAMA_SYMLINK = "hama";
-
- /**
- * Hama release file name
- */
- public static final String HAMA_RELEASE_FILE = "hama-0.6.4.tar.gz";
-
- /**
- * Hama release version
- */
- public static final String HAMA_RELEASE_VERSION = "hama-0.6.4";
-
- /**
- * Hama release file source location
- */
- public static final String HAMA_SRC_PATH = "/home/hadoop";
}
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1665130)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy)
@@ -17,23 +17,18 @@
*/
package org.apache.hama.bsp;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.commons.beanutils.Converter;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -44,10 +39,8 @@
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
public class YARNBSPJobClient extends BSPJobClient {
@@ -233,24 +226,14 @@
// this creates a symlink in the working directory
localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc);
- // Copy from hama-${version}.tar.gz to HDFS
- Path hamaDstPath = new Path(getSystemDir(), YARNBSPConstants.HAMA_RELEASE_FILE);
- hamaDstPath = fs.makeQualified(hamaDstPath);
- fs.copyFromLocalFile(false, true,
- new Path(YARNBSPConstants.HAMA_SRC_PATH, YARNBSPConstants.HAMA_RELEASE_FILE),
- hamaDstPath);
- FileStatus hamaStatus = fs.getFileStatus(hamaDstPath);
- URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaDstPath
- .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
- LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
- hamaReleaseRsrc.setResource(hamaReleaseUrl);
- hamaReleaseRsrc.setSize(hamaStatus.getLen());
- hamaReleaseRsrc.setTimestamp(hamaStatus.getModificationTime());
- hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
- hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ // add hama related jar files to localresources for container
+ List hamaJars = localJarfromPath(System.getProperty("hama.home.dir"));
+ String hamaPath = getSystemDir() + "/hama";
+ for (File fileEntry : hamaJars) {
+ addToLocalResources(fs, fileEntry.getCanonicalPath(),
+ hamaPath, fileEntry.getName(), localResources);
+ }
- localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
-
// Set the local resources into the launch context
amContainer.setLocalResources(localResources);
@@ -270,16 +253,12 @@
classPathEnv.append(File.pathSeparatorChar);
classPathEnv.append(c.trim());
}
- classPathEnv.append(File.pathSeparator);
- classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK + "/hama-0.6.4/*");
env.put(YARNBSPConstants.HAMA_YARN_LOCATION, jarPath.toUri().toString());
env.put(YARNBSPConstants.HAMA_YARN_SIZE, Long.toString(jarStatus.getLen()));
env.put(YARNBSPConstants.HAMA_YARN_TIMESTAMP, Long.toString(jarStatus.getModificationTime()));
- env.put(YARNBSPConstants.HAMA_RELEASE_LOCATION, hamaDstPath.toUri().toString());
- env.put(YARNBSPConstants.HAMA_RELEASE_SIZE, Long.toString(hamaStatus.getLen()));
- env.put(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP, Long.toString(hamaStatus.getModificationTime()));
+ env.put(YARNBSPConstants.HAMA_LOCATION, hamaPath);
env.put("CLASSPATH", classPathEnv.toString());
amContainer.setEnvironment(env);
@@ -436,4 +415,28 @@
// throws an exception in case of failures
yarnClient.killApplication(appId);
}
+
+ private List localJarfromPath(String path) throws IOException {
+ File hamaHome = new File(path);
+ String[] extensions = new String[]{"jar"};
+ List files = (List)FileUtils.listFiles(hamaHome, extensions, true);
+
+ return files;
+ }
+
+
+ private void addToLocalResources(FileSystem fs, String fileSrcPath,
+ String fileDstPath, String fileName, Map localResources)
+ throws IOException {
+ Path dstPath = new Path(fileDstPath, fileName);
+ dstPath = fs.makeQualified(dstPath);
+ fs.copyFromLocalFile(false, true, new Path(fileSrcPath), dstPath);
+ FileStatus fileStatus = fs.getFileStatus(dstPath);
+ LocalResource localRsrc =
+ LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromURI(dstPath.toUri()),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ fileStatus.getLen(), fileStatus.getModificationTime());
+ localResources.put(fileName, localRsrc);
+ }
}
Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1665130)
+++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy)
@@ -62,7 +62,7 @@
conf.setInt(Constants.MAX_TASKS, 10);
YARNBSPJob job = new YARNBSPJob(conf);
- job.setBoolean("hama.yarn.application", true);
+ System.out.println(conf.get("bsp.user.name"));
job.setBspClass(HelloBSP.class);
job.setJarByClass(HelloBSP.class);
job.setJobName("Serialize Printing");
Index: .
===================================================================
--- . (revision 1665130)
+++ . (working copy)
Property changes on: .
___________________________________________________________________
Modified: svn:ignore
## -1,3 +1,4 ##
+.*
.project
.classpath
build