Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1665130) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -304,7 +304,8 @@ job.setJobID(jobId); int maxTasks; - if (job.getConfiguration().getBoolean("hama.yarn.application", false)) { + + if (job.getConfiguration().get("bsp.framework.name").equals("yarn")) { int maxMem = job.getConfiguration().getInt("yarn.nodemanager.resource.memory-mb", 0); int minAllocationMem = job.getConfiguration().getInt("yarn.scheduler.minimum-allocation-mb", 1024); maxTasks = maxMem / minAllocationMem; Index: pom.xml =================================================================== --- pom.xml (revision 1665130) +++ pom.xml (working copy) @@ -212,6 +212,11 @@ commons-io ${commons-io.version} + + org.apache.hadoop + hadoop-yarn-client + ${hadoop.version} + Index: yarn/pom.xml =================================================================== --- yarn/pom.xml (revision 1665130) +++ yarn/pom.xml (working copy) @@ -70,7 +70,6 @@ hadoop-yarn-client ${hadoop.version} - org.apache.zookeeper zookeeper @@ -80,11 +79,6 @@ hadoop-common ${hadoop.version} - - org.apache.hadoop - hadoop-yarn-client - ${hadoop.version} - Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1665130) +++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy) @@ -59,7 +59,7 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.Job.JobState; -import org.apache.hama.bsp.sync.SyncServerRunner; +import org.apache.hama.bsp.sync.SyncServer; import org.apache.hama.bsp.sync.SyncServiceFactory; import org.apache.hama.ipc.BSPPeerProtocol; import org.apache.hama.ipc.RPC; @@ -101,7 +101,8 @@ private Server taskServer; private volatile long superstep; - private SyncServerRunner syncServer; + //private SyncServerRunner syncServer; + private SyncServer syncServer; private Counters globalCounter = new Counters(); @@ -114,8 +115,11 @@ } this.jobFile = args[0]; + + this.jobConf = getSubmitConfiguration(jobFile); + this.localConf = new YarnConfiguration(); - this.jobConf = getSubmitConfiguration(jobFile); + localConf.addResource(localConf); fs = FileSystem.get(jobConf); this.applicationName = jobConf.get("bsp.job.name", @@ -192,11 +196,30 @@ * @throws IOException */ private void startSyncServer() throws Exception { - syncServer = SyncServiceFactory.getSyncServerRunner(jobConf); - jobConf = syncServer.init(jobConf); - threadPool.submit(syncServer); + syncServer = SyncServiceFactory.getSyncServer(jobConf); + syncServer.init(jobConf); + + ZKServerThread serverThread = new ZKServerThread(syncServer); + threadPool.submit(serverThread); } + private static class ZKServerThread implements Runnable { + SyncServer server; + + ZKServerThread(SyncServer s) { + server = s; + } + + @Override + public void run() { + try { + server.start(); + } catch (Exception e) { + LOG.error("Error running SyncServer.", e); + } + } + } + /** * Connects to the Resource Manager. * @@ -282,11 +305,14 @@ .getApplicationAttemptId(); } - private void start() throws Exception { + private void start() throws IOException, YarnException /*throws Exception*/ { JobState finalState = null; try { job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId); finalState = job.startJob(); + } catch (Exception e) { + LOG.error("error was occured. cleaning up"); + e.printStackTrace(); } finally { if (finalState != null) { LOG.info("Job \"" + applicationName + "\"'s state after completion: " @@ -294,12 +320,14 @@ LOG.info("Job took " + ((clock.getTime() - startTime) / 1000L) + "s to finish!"); } + LOG.info("job is cleaning up"); job.cleanup(); } } private void cleanup() throws YarnException, IOException { - syncServer.stop(); + //syncServer.stop(); + syncServer.stopServer(); if (threadPool != null && !threadPool.isShutdown()) { threadPool.shutdownNow(); Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1665130) +++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy) @@ -26,7 +26,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.*; @@ -101,11 +103,15 @@ ContainerStatus lastStatus = null; GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest); List containerStatuses = getContainerStatusesResponse.getContainerStatuses(); + if (containerStatuses.size() <= 0) { + LOG.info("container Statuses size is zero"); + return null; + } + for (ContainerStatus containerStatus : containerStatuses) { - LOG.info("Got container status for containerID=" - + containerStatus.getContainerId() + ", state=" - + containerStatus.getState() + ", exitStatus=" - + containerStatus.getExitStatus() + ", diagnostics=" + LOG.info("Got container status for containerID=" + containerStatus + .getContainerId() + ", state=" + containerStatus.getState() + + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics()); if (containerStatus.getContainerId().equals(allocatedContainer.getId())) { @@ -113,12 +119,14 @@ break; } } + if (lastStatus.getState() != ContainerState.COMPLETE) { + LOG.info("Not completed..."); return null; } - LOG.info(this.id + " Last report comes with exitstatus of " - + lastStatus.getExitStatus() + " and diagnose string of " - + lastStatus.getDiagnostics()); + LOG.info(this.id + " Last report comes with exitstatus of " + lastStatus + .getExitStatus() + " and diagnose string of " + lastStatus + .getDiagnostics()); return new BSPTaskStatus(id, lastStatus.getExitStatus()); } @@ -154,19 +162,22 @@ localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource); - Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_RELEASE_LOCATION)); + Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION)); URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile .makeQualified(fs.getUri(), fs.getWorkingDirectory())); LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString()); - LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class); - hamaReleaseRsrc.setResource(hamaReleaseUrl); - hamaReleaseRsrc.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_SIZE))); - hamaReleaseRsrc.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP))); - hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE); - hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + RemoteIterator fileStatusListIterator = fs.listFiles( + hamaReleaseFile, true); - localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc); + while(fileStatusListIterator.hasNext()) { + LocatedFileStatus lfs = fileStatusListIterator.next(); + LocalResource localRsrc = LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(lfs.getPath()), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + lfs.getLen(), lfs.getModificationTime()); + localResources.put(lfs.getPath().getName(), localRsrc); + } ctx.setLocalResources(localResources); @@ -187,13 +198,6 @@ classPathEnv.append(c.trim()); } - classPathEnv.append(File.pathSeparator); - classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK + - "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/*"); - classPathEnv.append(File.pathSeparator); - classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK + - "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/lib/*"); - Vector vargs = new Vector(); vargs.add("${JAVA_HOME}/bin/java"); vargs.add("-cp " + classPathEnv + ""); Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1665130) +++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy) @@ -196,34 +196,27 @@ state = JobState.RUNNING; int completed = 0; - List cleanupTasks = new ArrayList(); while (completed != numBSPTasks) { for (BSPTaskLauncher task : completionQueue) { BSPTaskStatus returnedTask = task.poll(); - // if our task returned with a finished state - if (returnedTask != null) { - if (returnedTask.getExitStatus() != 0) { - LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!"); - cleanupTask(returnedTask.getId()); - state = JobState.FAILED; - return state; - } else { - LOG.info("Task \"" + returnedTask.getId() - + "\" sucessfully finished!"); - completed++; - LOG.info("Waiting for " + (numBSPTasks - completed) - + " tasks to finish!"); - } - cleanupTasks.add(returnedTask.getId()); + if(returnedTask != null && returnedTask.getExitStatus() == 0) { + LOG.info("Task \"" + returnedTask.getId() + + "\" sucessfully finished!"); + completed++; + LOG.info("Waiting for " + (numBSPTasks - completed) + + " tasks to finish!"); } + + if(returnedTask != null && returnedTask.getExitStatus() != 0) { + LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!"); + completionQueue.add(task); + state = JobState.FAILED; + return state; + } } Thread.sleep(1000L); } - for (Integer stopId : cleanupTasks) { - cleanupTask(stopId); - } - state = JobState.SUCCESS; return state; } @@ -308,6 +301,7 @@ @Override public void cleanup() throws YarnException, IOException { for (BSPTaskLauncher launcher : completionQueue) { + LOG.info("cleanup tasks !!!"); launcher.stopAndCleanup(); } } Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java (revision 1665130) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java (working copy) @@ -42,7 +42,7 @@ /** * Environment key name pointing to the hama release's location */ - public static final String HAMA_RELEASE_LOCATION = "HAMARELEASELOCATION"; + public static final String HAMA_LOCATION = "HAMALOCATION"; /** * Environment key name denoting the file content length for the hama release. @@ -61,23 +61,4 @@ */ public static final String APP_MASTER_JAR_PATH = "AppMaster.jar"; - /** - * Symbolic link name for hama release archive in container local resource - */ - public static final String HAMA_SYMLINK = "hama"; - - /** - * Hama release file name - */ - public static final String HAMA_RELEASE_FILE = "hama-0.6.4.tar.gz"; - - /** - * Hama release version - */ - public static final String HAMA_RELEASE_VERSION = "hama-0.6.4"; - - /** - * Hama release file source location - */ - public static final String HAMA_SRC_PATH = "/home/hadoop"; } Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1665130) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy) @@ -17,23 +17,18 @@ */ package org.apache.hama.bsp; -import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import org.apache.commons.beanutils.Converter; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -44,10 +39,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; -import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; public class YARNBSPJobClient extends BSPJobClient { @@ -233,24 +226,14 @@ // this creates a symlink in the working directory localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc); - // Copy from hama-${version}.tar.gz to HDFS - Path hamaDstPath = new Path(getSystemDir(), YARNBSPConstants.HAMA_RELEASE_FILE); - hamaDstPath = fs.makeQualified(hamaDstPath); - fs.copyFromLocalFile(false, true, - new Path(YARNBSPConstants.HAMA_SRC_PATH, YARNBSPConstants.HAMA_RELEASE_FILE), - hamaDstPath); - FileStatus hamaStatus = fs.getFileStatus(hamaDstPath); - URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaDstPath - .makeQualified(fs.getUri(), fs.getWorkingDirectory())); - LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class); - hamaReleaseRsrc.setResource(hamaReleaseUrl); - hamaReleaseRsrc.setSize(hamaStatus.getLen()); - hamaReleaseRsrc.setTimestamp(hamaStatus.getModificationTime()); - hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE); - hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + // add hama related jar files to localresources for container + List hamaJars = localJarfromPath(System.getProperty("hama.home.dir")); + String hamaPath = getSystemDir() + "/hama"; + for (File fileEntry : hamaJars) { + addToLocalResources(fs, fileEntry.getCanonicalPath(), + hamaPath, fileEntry.getName(), localResources); + } - localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc); - // Set the local resources into the launch context amContainer.setLocalResources(localResources); @@ -270,16 +253,12 @@ classPathEnv.append(File.pathSeparatorChar); classPathEnv.append(c.trim()); } - classPathEnv.append(File.pathSeparator); - classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK + "/hama-0.6.4/*"); env.put(YARNBSPConstants.HAMA_YARN_LOCATION, jarPath.toUri().toString()); env.put(YARNBSPConstants.HAMA_YARN_SIZE, Long.toString(jarStatus.getLen())); env.put(YARNBSPConstants.HAMA_YARN_TIMESTAMP, Long.toString(jarStatus.getModificationTime())); - env.put(YARNBSPConstants.HAMA_RELEASE_LOCATION, hamaDstPath.toUri().toString()); - env.put(YARNBSPConstants.HAMA_RELEASE_SIZE, Long.toString(hamaStatus.getLen())); - env.put(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP, Long.toString(hamaStatus.getModificationTime())); + env.put(YARNBSPConstants.HAMA_LOCATION, hamaPath); env.put("CLASSPATH", classPathEnv.toString()); amContainer.setEnvironment(env); @@ -436,4 +415,28 @@ // throws an exception in case of failures yarnClient.killApplication(appId); } + + private List localJarfromPath(String path) throws IOException { + File hamaHome = new File(path); + String[] extensions = new String[]{"jar"}; + List files = (List)FileUtils.listFiles(hamaHome, extensions, true); + + return files; + } + + + private void addToLocalResources(FileSystem fs, String fileSrcPath, + String fileDstPath, String fileName, Map localResources) + throws IOException { + Path dstPath = new Path(fileDstPath, fileName); + dstPath = fs.makeQualified(dstPath); + fs.copyFromLocalFile(false, true, new Path(fileSrcPath), dstPath); + FileStatus fileStatus = fs.getFileStatus(dstPath); + LocalResource localRsrc = + LocalResource.newInstance( + ConverterUtils.getYarnUrlFromURI(dstPath.toUri()), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + fileStatus.getLen(), fileStatus.getModificationTime()); + localResources.put(fileName, localRsrc); + } } Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1665130) +++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy) @@ -62,7 +62,7 @@ conf.setInt(Constants.MAX_TASKS, 10); YARNBSPJob job = new YARNBSPJob(conf); - job.setBoolean("hama.yarn.application", true); + System.out.println(conf.get("bsp.user.name")); job.setBspClass(HelloBSP.class); job.setJarByClass(HelloBSP.class); job.setJobName("Serialize Printing"); Index: . =================================================================== --- . (revision 1665130) +++ . (working copy) Property changes on: . ___________________________________________________________________ Modified: svn:ignore ## -1,3 +1,4 ## +.* .project .classpath build