Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1659147)
+++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy)
@@ -303,14 +303,21 @@
BSPJob job = pJob;
job.setJobID(jobId);
- ClusterStatus clusterStatus = getClusterStatus(true);
- int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
- clusterStatus.getMaxTasks() - clusterStatus.getTasks());
+ int maxTasks;
+ if (job.getConfiguration().getBoolean("hama.yarn.application", false)) {
+ int maxMem = job.getConfiguration().getInt("yarn.nodemanager.resource.memory-mb", 0);
+ int minAllocationMem = job.getConfiguration().getInt("yarn.scheduler.minimum-allocation-mb", 1024);
+ maxTasks = maxMem / minAllocationMem;
+ } else {
+ ClusterStatus clusterStatus = getClusterStatus(true);
+ maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+ clusterStatus.getMaxTasks() - clusterStatus.getTasks());
- if (maxTasks < job.getNumBspTask()) {
- LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with "
- + maxTasks + " tasks.");
- job.setNumBspTask(maxTasks);
+ if (maxTasks < job.getNumBspTask()) {
+ LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with "
+ + maxTasks + " tasks.");
+ job.setNumBspTask(maxTasks);
+ }
}
Path submitJobDir = new Path(getSystemDir(), "submit_"
Index: pom.xml
===================================================================
--- pom.xml (revision 1659147)
+++ pom.xml (working copy)
@@ -319,6 +319,7 @@
examples
ml
mesos
+ yarn
dist
Index: yarn/pom.xml
===================================================================
--- yarn/pom.xml (revision 1659147)
+++ yarn/pom.xml (working copy)
@@ -19,7 +19,7 @@
org.apache.hama
hama-parent
- 0.6.3-SNAPSHOT
+ 0.7.0-SNAPSHOT
4.0.0
@@ -26,13 +26,9 @@
org.apache.hama
hama-yarn
yarn
- 0.6.3-SNAPSHOT
+ 0.7.0-SNAPSHOT
jar
-
- 1.2.0
-
-
org.apache.hama
@@ -54,27 +50,41 @@
avro
1.5.3
-
org.apache.hadoop
hadoop-yarn-api
- 0.23.1
+ ${hadoop.version}
org.apache.hadoop
hadoop-yarn-common
- 0.23.1
+ ${hadoop.version}
org.apache.hadoop
hadoop-yarn-server-tests
- 0.23.1
+ ${hadoop.version}
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ ${hadoop.version}
+
org.apache.zookeeper
zookeeper
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ ${hadoop.version}
+
Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy)
@@ -19,7 +19,11 @@
import java.io.DataInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedAction;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -32,14 +36,14 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -46,18 +50,23 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Job.JobState;
import org.apache.hama.bsp.sync.SyncServerRunner;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.Server;
import org.apache.hama.util.BSPNetUtils;
+
/**
* BSPApplicationMaster is an application master for Apache Hamas BSP Engine.
*/
@@ -73,8 +82,9 @@
private Clock clock;
private YarnRPC yarnRPC;
- private AMRMProtocol amrmRPC;
+ private ApplicationMasterProtocol amrmRPC;
+
private ApplicationAttemptId appAttemptId;
private String applicationName;
private long startTime;
@@ -106,6 +116,7 @@
this.jobFile = args[0];
this.localConf = new YarnConfiguration();
this.jobConf = getSubmitConfiguration(jobFile);
+ fs = FileSystem.get(jobConf);
this.applicationName = jobConf.get("bsp.job.name",
"");
@@ -128,10 +139,12 @@
startSyncServer();
startRPCServers();
+
/*
* Make sure that this executes after the start the RPC servers, because we
* are readjusting the configuration.
*/
+
rewriteSubmitConfiguration(jobFile, jobConf);
String jobSplit = jobConf.get("bsp.job.split.file");
@@ -146,7 +159,7 @@
}
this.amrmRPC = getYarnRPCConnection(localConf);
- registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort,
+ registerApplicationMaster(amrmRPC, hostname, clientPort,
"http://localhost:8080");
}
@@ -159,15 +172,14 @@
*/
private void startRPCServers() throws IOException {
// start the RPC server which talks to the client
- this.clientServer = RPC.getServer(BSPClient.class, this, hostname,
- clientPort, jobConf);
+ this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf);
this.clientServer.start();
// start the RPC server which talks to the tasks
this.taskServerPort = BSPNetUtils.getFreePort(10000);
- this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname,
- taskServerPort, jobConf);
+ this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf);
this.taskServer.start();
+
// readjusting the configuration to let the tasks know where we are.
this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
}
@@ -191,28 +203,55 @@
* @param yarnConf
* @return a new RPC connection to the Resource Manager.
*/
- private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) {
+ private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) throws IOException {
// Connect to the Scheduler of the ResourceManager.
- InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+
+ final InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+
+ Token extends TokenIdentifier> amRMToken = setupAndReturnAMRMToken(rmAddress, credentials.getAllTokens());
+ currentUser.addToken(amRMToken);
+
+ final Configuration conf = yarnConf;
+
+ ApplicationMasterProtocol client = currentUser
+ .doAs(new PrivilegedAction() {
+ @Override
+ public ApplicationMasterProtocol run() {
+ return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);
+ }
+ });
LOG.info("Connecting to ResourceManager at " + rmAddress);
- return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress,
- yarnConf);
+ return client;
}
+ @SuppressWarnings("unchecked")
+ private Token extends TokenIdentifier> setupAndReturnAMRMToken(
+ InetSocketAddress rmBindAddress,
+ Collection> allTokens) {
+ for (Token extends TokenIdentifier> token : allTokens) {
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ SecurityUtil.setTokenService(token, rmBindAddress);
+ return token;
+ }
+ }
+ return null;
+ }
+
+
/**
* Registers this application master with the Resource Manager and retrieves a
* response which is used to launch additional containers.
*/
private static RegisterApplicationMasterResponse registerApplicationMaster(
- AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID,
- String appMasterHostName, int appMasterRpcPort,
- String appMasterTrackingUrl) throws YarnRemoteException {
+ ApplicationMasterProtocol resourceManager, String appMasterHostName, int appMasterRpcPort,
+ String appMasterTrackingUrl) throws YarnException, IOException {
RegisterApplicationMasterRequest appMasterRequest = Records
.newRecord(RegisterApplicationMasterRequest.class);
- appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setHost(appMasterHostName);
appMasterRequest.setRpcPort(appMasterRpcPort);
// TODO tracking URL
@@ -219,7 +258,7 @@
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
RegisterApplicationMasterResponse response = resourceManager
.registerApplicationMaster(appMasterRequest);
- LOG.debug("ApplicationMaster has maximum resource capability of: "
+ LOG.info("ApplicationMaster has maximum resource capability of: "
+ response.getMaximumResourceCapability().getMemory());
return response;
}
@@ -234,12 +273,13 @@
private static ApplicationAttemptId getApplicationAttemptId()
throws IOException {
Map envs = System.getenv();
- if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+ if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
throw new IllegalArgumentException(
"ApplicationAttemptId not set in the environment");
}
+
return ConverterUtils.toContainerId(
- envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV))
+ envs.get(Environment.CONTAINER_ID.name()))
.getApplicationAttemptId();
}
@@ -259,33 +299,34 @@
}
}
- private void cleanup() throws YarnRemoteException {
+ private void cleanup() throws YarnException, IOException {
syncServer.stop();
+
if (threadPool != null && !threadPool.isShutdown()) {
threadPool.shutdownNow();
}
+
clientServer.stop();
taskServer.stop();
FinishApplicationMasterRequest finishReq = Records
.newRecord(FinishApplicationMasterRequest.class);
- finishReq.setAppAttemptId(appAttemptId);
switch (job.getState()) {
case SUCCESS:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
break;
case KILLED:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED);
break;
case FAILED:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
break;
default:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
}
this.amrmRPC.finishApplicationMaster(finishReq);
}
- public static void main(String[] args) throws YarnRemoteException {
+ public static void main(String[] args) throws YarnException, IOException {
// we expect getting the qualified path of the job.xml as the first
// element in the arguments
BSPApplicationMaster master = null;
@@ -301,17 +342,19 @@
}
}
- /*
- * Some utility methods
- */
-
/**
* Reads the configuration from the given path.
*/
- private static Configuration getSubmitConfiguration(String path) {
+ private static Configuration getSubmitConfiguration(String path)
+ throws IOException {
Path jobSubmitPath = new Path(path);
Configuration jobConf = new HamaConfiguration();
- jobConf.addResource(jobSubmitPath);
+
+ FileSystem fs = FileSystem.get(URI.create(path), jobConf);
+
+ InputStream in =fs.open(jobSubmitPath);
+ jobConf.addResource(in);
+
return jobConf;
}
@@ -326,6 +369,7 @@
FSDataOutputStream out = fs.create(jobSubmitPath);
conf.writeXml(out);
out.close();
+
LOG.info("Written new configuration back to " + path);
}
@@ -340,12 +384,6 @@
}
@Override
- public ProtocolSignature getProtocolSignature(String protocol,
- long clientVersion, int clientMethodsHash) throws IOException {
- return new ProtocolSignature(HamaRPCProtocolVersion.versionID, null);
- }
-
- @Override
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
if (taskStatus.getSuperstepCount() > superstep) {
@@ -372,7 +410,8 @@
@Override
public Task getTask(TaskAttemptID taskid) throws IOException {
BSPJobClient.RawSplit assignedSplit = null;
- String splitName = NullInputFormat.NullInputSplit.class.getCanonicalName();
+ String splitName = NullInputFormat.NullInputSplit.class.getName();
+ //String splitName = NullInputSplit.class.getCanonicalName();
if (splits != null) {
assignedSplit = splits[taskid.id];
splitName = assignedSplit.getClassName();
Index: yarn/src/main/java/org/apache/hama/bsp/BSPClient.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPClient.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPClient.java (working copy)
@@ -18,7 +18,7 @@
package org.apache.hama.bsp;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hama.ipc.VersionedProtocol;
public interface BSPClient extends VersionedProtocol {
Index: yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (working copy)
@@ -17,15 +17,16 @@
*/
package org.apache.hama.bsp;
+import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hama.ipc.RPC;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
@@ -37,7 +38,7 @@
private static final Log LOG = LogFactory.getLog(BSPRunner.class);
- private Configuration conf;
+ private HamaConfiguration conf;
private TaskAttemptID id;
private BSPPeerImpl, ?, ?, ?, ? extends Writable> peer;
private Counters counters = new Counters();
@@ -49,7 +50,10 @@
public BSPRunner(String jobId, int taskAttemptId, Path confPath)
throws Exception {
conf = new HamaConfiguration();
- conf.addResource(confPath);
+ FileSystem fs = FileSystem.get(confPath.toUri(), conf);
+ InputStream in = fs.open(confPath);
+ conf.addResource(in);
+
this.id = new TaskAttemptID(jobId, 0, taskAttemptId, 0);
this.id.id = taskAttemptId;
@@ -59,6 +63,7 @@
conf.set(Constants.PEER_HOST, BSPNetUtils.getCanonicalHostname());
String umbilicalAddress = conf.get("hama.umbilical.address");
+
if (umbilicalAddress == null || umbilicalAddress.isEmpty()
|| !umbilicalAddress.contains(":")) {
throw new IllegalArgumentException(
@@ -69,9 +74,10 @@
InetSocketAddress address = new InetSocketAddress(hostPort[0],
Integer.valueOf(hostPort[1]));
- BSPPeerProtocol umbilical = RPC.getProxy(BSPPeerProtocol.class,
- HamaRPCProtocolVersion.versionID, address, conf);
-
+ BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
+ BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, address,
+ conf);
+
BSPJob job = new BSPJob(new HamaConfiguration(conf));
BSPTask task = (BSPTask) umbilical.getTask(id);
Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy)
@@ -17,31 +17,22 @@
*/
package org.apache.hama.bsp;
+import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -51,17 +42,22 @@
private final Container allocatedContainer;
private final int id;
- private final ContainerManager cm;
+ private final ContainerManagementProtocol cm;
private final Configuration conf;
private String user;
private final Path jobFile;
private final BSPJobID jobId;
- private GetContainerStatusRequest statusRequest;
+ private GetContainerStatusesRequest statusRequest;
+
+ @Override
+ protected void finalize() throws Throwable {
+ stopAndCleanup();
+ }
- public BSPTaskLauncher(int id, Container container, ContainerManager cm,
- Configuration conf, Path jobFile, BSPJobID jobId)
- throws YarnRemoteException {
+ public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm,
+ Configuration conf, Path jobFile, BSPJobID jobId)
+ throws YarnException {
this.id = id;
this.cm = cm;
this.conf = conf;
@@ -75,19 +71,18 @@
}
}
- @Override
- protected void finalize() throws Throwable {
- stopAndCleanup();
- }
+ public void stopAndCleanup() throws YarnException, IOException {
+ StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class);
+ List containerIds = new ArrayList();
+ containerIds.add(allocatedContainer.getId());
+ LOG.info("getId : " + allocatedContainer.getId());
+ stopRequest.setContainerIds(containerIds);
+ LOG.info("StopContainer : " + stopRequest.getContainerIds());
+ cm.stopContainers(stopRequest);
- public void stopAndCleanup() throws YarnRemoteException {
- StopContainerRequest stopRequest = Records
- .newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(allocatedContainer.getId());
- cm.stopContainer(stopRequest);
}
- public void start() throws IOException {
+ public void start() throws IOException, YarnException {
LOG.info("Spawned task with id: " + this.id
+ " for allocated container id: "
+ this.allocatedContainer.getId().toString());
@@ -103,20 +98,33 @@
*/
public BSPTaskStatus poll() throws Exception {
- ContainerStatus lastStatus;
- if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
- .getState() != ContainerState.COMPLETE) {
+ ContainerStatus lastStatus = null;
+ GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
+ List containerStatuses = getContainerStatusesResponse.getContainerStatuses();
+ for (ContainerStatus containerStatus : containerStatuses) {
+ LOG.info("Got container status for containerID="
+ + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus="
+ + containerStatus.getExitStatus() + ", diagnostics="
+ + containerStatus.getDiagnostics());
+
+ if (containerStatus.getContainerId().equals(allocatedContainer.getId())) {
+ lastStatus = containerStatus;
+ break;
+ }
+ }
+ if (lastStatus.getState() != ContainerState.COMPLETE) {
return null;
}
- LOG.info(this.id + "\tLast report comes with existatus of "
+ LOG.info(this.id + " Last report comes with exitstatus of "
+ lastStatus.getExitStatus() + " and diagnose string of "
+ lastStatus.getDiagnostics());
+
return new BSPTaskStatus(id, lastStatus.getExitStatus());
}
- private GetContainerStatusRequest setupContainer(
- Container allocatedContainer, ContainerManager cm, String user, int id)
- throws IOException {
+ private GetContainerStatusesRequest setupContainer(
+ Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException {
LOG.info("Setting up a container for user " + user + " with id of " + id
+ " and containerID of " + allocatedContainer.getId() + " as " + user);
// Now we setup a ContainerLaunchContext
@@ -123,18 +131,11 @@
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
- ctx.setContainerId(allocatedContainer.getId());
- ctx.setResource(allocatedContainer.getResource());
- ctx.setUser(user);
-
- /*
- * jar
- */
+ // Set the local resources
+ Map localResources = new HashMap();
LocalResource packageResource = Records.newRecord(LocalResource.class);
FileSystem fs = FileSystem.get(conf);
- Path packageFile = new Path(conf.get("bsp.jar"));
- // FIXME there seems to be a problem with the converter utils and URL
- // transformation
+ Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
LOG.info("PackageURL has been composed to " + packageUrl.toString());
@@ -145,16 +146,30 @@
LOG.fatal("If you see this error the workarround does not work", e);
}
- FileStatus fileStatus = fs.getFileStatus(packageFile);
packageResource.setResource(packageUrl);
- packageResource.setSize(fileStatus.getLen());
- packageResource.setTimestamp(fileStatus.getModificationTime());
- packageResource.setType(LocalResourceType.ARCHIVE);
+ packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
+ packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
+ packageResource.setType(LocalResourceType.FILE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
- LOG.info("Package resource: " + packageResource.getResource());
- ctx.setLocalResources(Collections.singletonMap("package", packageResource));
+ localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
+ Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_RELEASE_LOCATION));
+ URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+ LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString());
+
+ LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
+ hamaReleaseRsrc.setResource(hamaReleaseUrl);
+ hamaReleaseRsrc.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_SIZE)));
+ hamaReleaseRsrc.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP)));
+ hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
+ hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
+
+ ctx.setLocalResources(localResources);
+
/*
* TODO Package classpath seems not to work if you're in pseudo distributed
* mode, because the resource must not be moved, it will never be unpacked.
@@ -161,32 +176,64 @@
* So we will check if our jar file has the file:// prefix and put it into
* the CP directly
*/
- String cp = "$CLASSPATH:./*:./package/*:./*:";
- if (packageUrl.getScheme() != null && packageUrl.getScheme().equals("file")) {
- cp += packageFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString() + ":";
- LOG.info("Localized file scheme detected, adjusting CP to: " + cp);
+
+ StringBuilder classPathEnv = new StringBuilder(
+ ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+ .append("./*");
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
}
- String[] cmds = {
- "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" "
- + BSPRunner.class.getCanonicalName(),
- jobId.getJtIdentifier(),
- id + "",
- this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString(),
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
- " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" };
- ctx.setCommands(Arrays.asList(cmds));
- LOG.info("Starting command: " + Arrays.toString(cmds));
+ classPathEnv.append(File.pathSeparator);
+ classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
+ "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/*");
+ classPathEnv.append(File.pathSeparator);
+ classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
+ "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/lib/*");
+
+ Vector vargs = new Vector();
+ vargs.add("${JAVA_HOME}/bin/java");
+ vargs.add("-cp " + classPathEnv + "");
+ vargs.add(BSPRunner.class.getCanonicalName());
+
+ vargs.add(jobId.getJtIdentifier());
+ vargs.add(Integer.toString(id));
+ vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+ .toString());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/bsp.stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ List commands = new ArrayList();
+ commands.add(command.toString());
+
+ ctx.setCommands(commands);
+ LOG.info("Starting command: " + commands);
+
StartContainerRequest startReq = Records
.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
- cm.startContainer(startReq);
+ startReq.setContainerToken(allocatedContainer.getContainerToken());
- GetContainerStatusRequest statusReq = Records
- .newRecord(GetContainerStatusRequest.class);
- statusReq.setContainerId(allocatedContainer.getId());
+ List list = new ArrayList();
+ list.add(startReq);
+ StartContainersRequest requestList = StartContainersRequest.newInstance(list);
+ cm.startContainers(requestList);
+
+ GetContainerStatusesRequest statusReq = Records
+ .newRecord(GetContainerStatusesRequest.class);
+ List containerIds = new ArrayList();
+ containerIds.add(allocatedContainer.getId());
+ statusReq.setContainerIds(containerIds);
return statusReq;
}
Index: yarn/src/main/java/org/apache/hama/bsp/Job.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/Job.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/Job.java (working copy)
@@ -17,8 +17,10 @@
*/
package org.apache.hama.bsp;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import java.io.IOException;
+
/**
* Main interface to interact with the job. Provides only getters.
*/
@@ -34,7 +36,7 @@
public JobState startJob() throws Exception;
- public void cleanup() throws YarnRemoteException;
+ public void cleanup() throws YarnException, IOException;
JobState getState();
Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy)
@@ -1,8 +1,8 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
+ * regarding copyright ownership. The ASF licenses this file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -17,14 +17,10 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.security.PrivilegedAction;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,20 +27,18 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
@@ -66,7 +60,7 @@
private ApplicationAttemptId appAttemptId;
private YarnRPC yarnRPC;
- private AMRMProtocol resourceManager;
+ private ApplicationMasterProtocol resourceManager;
private List allocatedContainers;
private List releasedContainers = Collections.emptyList();
@@ -76,11 +70,20 @@
private int lastResponseID = 0;
+ private int getMemoryRequirements() {
+ String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+ if (newMemoryProperty == null) {
+ LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
+ return getMemoryFromOptString(childOpts);
+ } else {
+ return Integer.valueOf(newMemoryProperty);
+ }
+ }
+
public JobImpl(ApplicationAttemptId appAttemptId,
- Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
- String jobFile, BSPJobID jobId) {
+ Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC,
+ String jobFile, BSPJobID jobId) {
super();
- this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1);
this.appAttemptId = appAttemptId;
this.yarnRPC = yarnRPC;
this.resourceManager = amrmRPC;
@@ -88,35 +91,29 @@
this.state = JobState.NEW;
this.jobId = jobId;
this.conf = jobConfiguration;
+ this.numBSPTasks = conf.getInt("bsp.peers.num", 1);
this.childOpts = conf.get("bsp.child.java.opts");
this.taskMemoryInMb = getMemoryRequirements();
- LOG.info("Memory per task: " + taskMemoryInMb + "m!");
}
- private int getMemoryRequirements() {
- String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
- if (newMemoryProperty == null) {
- LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
- return getMemoryFromOptString(childOpts);
- } else {
- return Integer.valueOf(newMemoryProperty);
+ // This really needs a testcase
+ private static int getMemoryFromOptString(String opts) {
+ if (opts == null) {
+ return DEFAULT_MEMORY_MB;
}
- }
- // This really needs a testcase
- private static int getMemoryFromOptString(String opts) {
if (!opts.contains("-Xmx")) {
LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
return DEFAULT_MEMORY_MB;
} else {
// e.G: -Xmx512m
+
int startIndex = opts.indexOf("-Xmx") + 4;
- int endIndex = opts.indexOf(" ", startIndex);
- String xmxString = opts.substring(startIndex, endIndex);
+ String xmxString = opts.substring(startIndex);
char qualifier = xmxString.charAt(xmxString.length() - 1);
int memory = Integer.valueOf(xmxString.substring(0,
- xmxString.length() - 2));
+ xmxString.length() - 1));
if (qualifier == 'm') {
return memory;
} else if (qualifier == 'g') {
@@ -133,28 +130,28 @@
public JobState startJob() throws Exception {
this.allocatedContainers = new ArrayList(numBSPTasks);
+ NMTokenCache nmTokenCache = new NMTokenCache();
while (allocatedContainers.size() < numBSPTasks) {
+ AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f,
+ createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb,
+ priority), releasedContainers, null);
- AllocateRequest req = BuilderUtils.newAllocateRequest(
- appAttemptId,
- lastResponseID,
- 0.0f,
- createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
- taskMemoryInMb, priority), releasedContainers);
+ AllocateResponse allocateResponse = resourceManager.allocate(req);
+ for (NMToken token : allocateResponse.getNMTokens()) {
+ nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
+ }
- AllocateResponse allocateResponse = resourceManager.allocate(req);
- AMResponse amResponse = allocateResponse.getAMResponse();
- LOG.info("Got response! ID: " + amResponse.getResponseId()
+ LOG.info("Got response ID: " + allocateResponse.getResponseId()
+ " with num of containers: "
- + amResponse.getAllocatedContainers().size()
+ + allocateResponse.getAllocatedContainers().size()
+ " and following resources: "
- + amResponse.getAvailableResources().getMemory() + "mb");
- this.lastResponseID = amResponse.getResponseId();
+ + allocateResponse.getAvailableResources().getMemory() + "mb");
+ this.lastResponseID = allocateResponse.getResponseId();
- // availableResources = amResponse.getAvailableResources();
- this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
- LOG.info("Waiting to allocate "
- + (numBSPTasks - allocatedContainers.size()) + " more containers...");
+ this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
+
+ LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers...");
+
Thread.sleep(1000l);
}
@@ -166,17 +163,26 @@
+ allocatedContainer.getId() + ", containerNode="
+ allocatedContainer.getNodeId().getHost() + ":"
+ allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
- + allocatedContainer.getNodeHttpAddress() + ", containerState"
- + allocatedContainer.getState() + ", containerResourceMemory"
+ + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
// Connect to ContainerManager on the allocated container
- String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":"
- + allocatedContainer.getNodeId().getPort();
- InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
- ContainerManager cm = (ContainerManager) yarnRPC.getProxy(
- ContainerManager.class, cmAddress, conf);
+ String user = conf.get("bsp.user.name");
+ if (user == null) {
+ user = System.getenv(ApplicationConstants.Environment.USER.name());
+ }
+ ContainerManagementProtocol cm = null;
+ try {
+ cm = getContainerManagementProtocolProxy(yarnRPC,
+ nmTokenCache.getToken(allocatedContainer.getNodeId().toString()), allocatedContainer.getNodeId(), user);
+ } catch (Exception e) {
+ LOG.error("Failed to create ContainerManager...");
+ if (cm != null)
+ yarnRPC.stopProxy(cm, conf);
+ e.printStackTrace();
+ }
+
BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
allocatedContainer, cm, conf, jobFile, jobId);
@@ -185,9 +191,13 @@
completionQueue.add(runnableLaunchContainer);
id++;
}
+
LOG.info("Waiting for tasks to finish...");
state = JobState.RUNNING;
int completed = 0;
+
+
+ List cleanupTasks = new ArrayList();
while (completed != numBSPTasks) {
for (BSPTaskLauncher task : completionQueue) {
BSPTaskStatus returnedTask = task.poll();
@@ -195,6 +205,7 @@
if (returnedTask != null) {
if (returnedTask.getExitStatus() != 0) {
LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
+ cleanupTask(returnedTask.getId());
state = JobState.FAILED;
return state;
} else {
@@ -204,24 +215,58 @@
LOG.info("Waiting for " + (numBSPTasks - completed)
+ " tasks to finish!");
}
- cleanupTask(returnedTask.getId());
+ cleanupTasks.add(returnedTask.getId());
}
}
Thread.sleep(1000L);
}
+ for (Integer stopId : cleanupTasks) {
+ cleanupTask(stopId);
+ }
+
state = JobState.SUCCESS;
return state;
}
/**
+ *
+ * @param rpc
+ * @param nmToken
+ * @param nodeId
+ * @param user
+ * @return
+ */
+ protected ContainerManagementProtocol getContainerManagementProtocolProxy(
+ final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) {
+ ContainerManagementProtocol proxy;
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+ final InetSocketAddress addr =
+ NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort());
+ if (nmToken != null) {
+ ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
+ }
+
+ proxy = ugi
+ .doAs(new PrivilegedAction() {
+ @Override
+ public ContainerManagementProtocol run() {
+ return (ContainerManagementProtocol) rpc.getProxy(
+ ContainerManagementProtocol.class,
+ addr, conf);
+ }
+ });
+ return proxy;
+ }
+
+ /**
* Makes a lookup for the taskid and stops its container and task. It also
* removes the task from the launcher so that we won't have to stop it twice.
*
* @param id
- * @throws YarnRemoteException
+ * @throws YarnException
*/
- private void cleanupTask(int id) throws YarnRemoteException {
+ private void cleanupTask(int id) throws YarnException, IOException {
BSPTaskLauncher bspTaskLauncher = launchers.get(id);
bspTaskLauncher.stopAndCleanup();
launchers.remove(id);
@@ -228,13 +273,6 @@
completionQueue.remove(bspTaskLauncher);
}
- @Override
- public void cleanup() throws YarnRemoteException {
- for (BSPTaskLauncher launcher : completionQueue) {
- launcher.stopAndCleanup();
- }
- }
-
private List createBSPTaskRequest(int numTasks,
int memoryInMb, int priority) {
@@ -247,7 +285,7 @@
// whether a particular rack/host is needed
// useful for applications that are sensitive
// to data locality
- rsrcRequest.setHostName("*");
+ rsrcRequest.setResourceName("*");
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
@@ -269,6 +307,13 @@
}
@Override
+ public void cleanup() throws YarnException, IOException {
+ for (BSPTaskLauncher launcher : completionQueue) {
+ launcher.stopAndCleanup();
+ }
+ }
+
+ @Override
public JobState getState() {
return state;
}
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java (revision 0)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java (working copy)
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * Contants used in both Client and Application Master
+ */
+public class YARNBSPConstants {
+
+ /**
+ * Environment key name pointing to the hama-yarn's location
+ */
+ public static final String HAMA_YARN_LOCATION = "HAMAYARNJARLOCATION";
+
+ /**
+ * Environment key name denoting the file content length for the hama-yarn application.
+ * Used to validate the local resource.
+ */
+ public static final String HAMA_YARN_SIZE = "HAMAYARNJARSIZE";
+
+ /**
+ * Environment key name denoting the file timestamp for the hama-yarn application.
+ * Used to validate the local resource.
+ */
+ public static final String HAMA_YARN_TIMESTAMP = "HAMAYARNJARTIMESTAMP";
+
+ /**
+ * Environment key name pointing to the hama release's location
+ */
+ public static final String HAMA_RELEASE_LOCATION = "HAMARELEASELOCATION";
+
+ /**
+ * Environment key name denoting the file content length for the hama release.
+ * Used to validate the local resource.
+ */
+ public static final String HAMA_RELEASE_SIZE = "HAMARELEASESIZE";
+
+ /**
+ * Environment key name denoting the file timestamp for the hama release.
+ * Used to validate the local resource.
+ */
+ public static final String HAMA_RELEASE_TIMESTAMP = "HAMARELEASETIMESTAMP";
+
+ /**
+ * Symbolic link name for application master's jar file in container local resource
+ */
+ public static final String APP_MASTER_JAR_PATH = "AppMaster.jar";
+
+ /**
+ * Symbolic link name for hama release archive in container local resource
+ */
+ public static final String HAMA_SYMLINK = "hama";
+
+ /**
+ * Hama release file name
+ */
+ public static final String HAMA_RELEASE_FILE = "hama-0.6.4.tar.gz";
+
+ /**
+ * Hama release version
+ */
+ public static final String HAMA_RELEASE_VERSION = "hama-0.6.4";
+
+ /**
+ * Hama release file source location
+ */
+ public static final String HAMA_SRC_PATH = "/home/hadoop";
+}
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (working copy)
@@ -22,16 +22,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.HamaConfiguration;
@@ -43,14 +39,12 @@
private static volatile int id = 0;
private YARNBSPJobClient submitClient;
- private BSPClient client;
private boolean submitted;
private ApplicationReport report;
- private ClientRMProtocol applicationsManager;
+ private ApplicationClientProtocol applicationsManager;
private YarnRPC rpc;
public YARNBSPJob(HamaConfiguration conf) throws IOException {
- super(conf);
submitClient = new YARNBSPJobClient(conf);
YarnConfiguration yarnConf = new YarnConfiguration(conf);
this.rpc = YarnRPC.create(conf);
@@ -58,8 +52,8 @@
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
- this.applicationsManager = ((ClientRMProtocol) rpc.getProxy(
- ClientRMProtocol.class, rmAddress, conf));
+ this.applicationsManager = ((ApplicationClientProtocol) rpc.getProxy(
+ ApplicationClientProtocol.class, rmAddress, conf));
}
public void setMemoryUsedPerTaskInMb(int mem) {
@@ -66,7 +60,7 @@
conf.setInt("bsp.child.mem.in.mb", mem);
}
- public void kill() throws YarnRemoteException {
+ public void kill() throws YarnException, IOException {
if (submitClient != null) {
KillApplicationRequest killRequest = Records
.newRecord(KillApplicationRequest.class);
@@ -79,6 +73,7 @@
public void submit() throws IOException, InterruptedException {
RunningJob submitJobInternal = submitClient.submitJobInternal(this,
new BSPJobID("hama_yarn", id++));
+
if (submitJobInternal != null) {
submitted = true;
report = submitClient.getReport();
@@ -95,45 +90,14 @@
this.submit();
}
- client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID,
- NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf);
- GetApplicationReportRequest reportRequest = Records
- .newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(submitClient.getId());
-
- GetApplicationReportResponse reportResponse = applicationsManager
- .getApplicationReport(reportRequest);
- ApplicationReport localReport = reportResponse.getApplicationReport();
- long clientSuperStep = -1L;
- while (localReport.getFinalApplicationStatus() != null
- && localReport.getFinalApplicationStatus().ordinal() == 0) {
- LOG.debug("currently in state: "
- + localReport.getFinalApplicationStatus());
- if (verbose) {
- long remoteSuperStep = client.getCurrentSuperStep().get();
- if (clientSuperStep < remoteSuperStep) {
- clientSuperStep = remoteSuperStep;
- LOG.info("Current supersteps number: " + clientSuperStep);
- }
- }
- reportResponse = applicationsManager.getApplicationReport(reportRequest);
- localReport = reportResponse.getApplicationReport();
-
- Thread.sleep(3000L);
- }
-
- if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
- LOG.info("Job succeeded!");
+ if (report != null && report.getApplicationId() == submitClient.getId()) {
return true;
} else {
- LOG.info("Job failed with status: "
- + localReport.getFinalApplicationStatus().toString() + "!");
return false;
}
-
}
- ClientRMProtocol getApplicationsManager() {
+ ApplicationClientProtocol getApplicationsManager() {
return applicationsManager;
}
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy)
@@ -17,32 +17,37 @@
*/
package org.apache.hama.bsp;
+import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.nio.ByteBuffer;
+import java.util.*;
+import org.apache.commons.beanutils.Converter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
public class YARNBSPJobClient extends BSPJobClient {
@@ -52,14 +57,93 @@
private ApplicationId id;
private ApplicationReport report;
+ // Configuration
+ private YarnClient yarnClient;
+ private YarnConfiguration yarnConf;
+
+ // Start time for client
+ private final long clientStartTime = System.currentTimeMillis();
+ // Timeout threshold for client. Kill app after time interval expires.
+ private long clientTimeout = 60000;
+
+ class NetworkedJob implements RunningJob {
+ @Override
+ public BSPJobID getID() {
+ return null;
+ }
+
+ @Override
+ public String getJobName() {
+ return null;
+ }
+
+ @Override
+ public long progress() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public boolean isComplete() throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean isSuccessful() throws IOException {
+ return false;
+ }
+
+ @Override
+ public void waitForCompletion() throws IOException {
+
+ }
+
+ @Override
+ public int getJobState() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void killJob() throws IOException {
+
+ }
+
+ @Override
+ public void killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
+
+ }
+
+ @Override
+ public long getSuperstepCount() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public JobStatus getStatus() {
+ return null;
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(int eventCounter) {
+ return new TaskCompletionEvent[0];
+ }
+
+ @Override
+ public String getJobFile() {
+ return null;
+ }
+ }
+
public YARNBSPJobClient(HamaConfiguration conf) {
setConf(conf);
+ yarnConf = new YarnConfiguration(conf);
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(yarnConf);
+ yarnClient.start();
}
@Override
protected RunningJob launchJob(BSPJobID jobId, BSPJob normalJob,
Path submitJobFile, FileSystem pFs) throws IOException {
-
YARNBSPJob job = (YARNBSPJob) normalJob;
LOG.info("Submitting job...");
@@ -66,6 +150,7 @@
if (getConf().get("bsp.child.mem.in.mb") == null) {
LOG.warn("BSP Child memory has not been set, YARN will guess your needs or use default values.");
}
+
FileSystem fs = pFs;
if (fs == null) {
fs = FileSystem.get(getConf());
@@ -77,124 +162,196 @@
LOG.debug("Retrieved username: " + s);
}
- GetNewApplicationRequest request = Records
- .newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse response = job.getApplicationsManager()
- .getNewApplication(request);
- id = response.getApplicationId();
- LOG.debug("Got new ApplicationId=" + id);
+ try {
+ YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
+ LOG.info("Got Cluster metric info from ASM"
+ + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
- // Create a new ApplicationSubmissionContext
- ApplicationSubmissionContext appContext = Records
- .newRecord(ApplicationSubmissionContext.class);
- // set the ApplicationId
- appContext.setApplicationId(this.id);
- // set the application name
- appContext.setApplicationName(job.getJobName());
+ List clusterNodeReports = yarnClient.getNodeReports(
+ NodeState.RUNNING);
+ LOG.info("Got Cluster node info from ASM");
+ for (NodeReport node : clusterNodeReports) {
+ LOG.info("Got node report from ASM for"
+ + ", nodeId=" + node.getNodeId()
+ + ", nodeAddress" + node.getHttpAddress()
+ + ", nodeRackName" + node.getRackName()
+ + ", nodeNumContainers" + node.getNumContainers());
+ }
- // Create a new container launch context for the AM's container
- ContainerLaunchContext amContainer = Records
- .newRecord(ContainerLaunchContext.class);
+ QueueInfo queueInfo = yarnClient.getQueueInfo("default");
+ LOG.info("Queue info"
+ + ", queueName=" + queueInfo.getQueueName()
+ + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
+ + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+ + ", queueApplicationCount=" + queueInfo.getApplications().size()
+ + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
- // Define the local resources required
- Map localResources = new HashMap();
- // Lets assume the jar we need for our ApplicationMaster is available in
- // HDFS at a certain known path to us and we want to make it available to
- // the ApplicationMaster in the launched container
- if (job.getJar() == null) {
- throw new IllegalArgumentException(
- "Jar must be set in order to run the application!");
- }
- Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar");
- fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath);
- LOG.debug("Copying app jar to " + jarPath);
- getConf()
- .set(
- "bsp.jar",
- jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString());
- FileStatus jarStatus = fs.getFileStatus(jarPath);
- LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
- amJarRsrc.setType(LocalResourceType.FILE);
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
- amJarRsrc.setTimestamp(jarStatus.getModificationTime());
- amJarRsrc.setSize(jarStatus.getLen());
- // this creates a symlink in the working directory
- localResources.put("AppMaster.jar", amJarRsrc);
- // Set the local resources into the launch context
- amContainer.setLocalResources(localResources);
+ List listAclInfo = yarnClient.getQueueAclsInfo();
+ for (QueueUserACLInfo aclInfo : listAclInfo) {
+ for (QueueACL userAcl : aclInfo.getUserAcls()) {
+ LOG.info("User ACL Info for Queue"
+ + ", queueName=" + aclInfo.getQueueName()
+ + ", userAcl=" + userAcl.name());
+ }
+ }
- // Set up the environment needed for the launch context
- Map env = new HashMap();
- // Assuming our classes or jars are available as local resources in the
- // working directory from which the command will be run, we need to append
- // "." to the path.
- // By default, all the hadoop specific classpaths will already be available
- // in $CLASSPATH, so we should be careful not to overwrite it.
- String classPathEnv = "$CLASSPATH:./*:";
- env.put("CLASSPATH", classPathEnv);
- amContainer.setEnvironment(env);
+ GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
+ GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request);
+ id = response.getApplicationId();
- // Construct the command to be executed on the launched container
- String command = "${JAVA_HOME}"
- + "/bin/java -cp "
- + classPathEnv
- + " "
- + BSPApplicationMaster.class.getCanonicalName()
- + " "
- + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/stderr";
+ // Create a new ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+ // set the ApplicationId
+ appContext.setApplicationId(this.id);
+ // set the application name
+ appContext.setApplicationName(job.getJobName());
- LOG.debug("Start command: " + command);
+ // Create a new container launch context for the AM's container
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
- amContainer.setCommands(Collections.singletonList(command));
+ // Define the local resources required
+ Map localResources = new HashMap();
+ // Lets assume the jar we need for our ApplicationMaster is available in
+ // HDFS at a certain known path to us and we want to make it available to
+ // the ApplicationMaster in the launched container
+ if (job.getJar() == null) {
+ throw new IllegalArgumentException("Jar must be set in order to run the application!");
+ }
+
+ Path jarPath = new Path(job.getJar());
+ jarPath = fs.makeQualified(jarPath);
+ getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), jarPath).toString());
- Resource capability = Records.newRecord(Resource.class);
- // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
- // a base usage of 100mb
- capability.setMemory(3 * job.getNumBspTask()
- + getConf().getInt("hama.appmaster.memory.mb", 100));
- LOG.info("Set memory for the application master to "
- + capability.getMemory() + "mb!");
- amContainer.setResource(capability);
+ FileStatus jarStatus = fs.getFileStatus(jarPath);
+ LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+ amJarRsrc.setType(LocalResourceType.FILE);
+ amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+ amJarRsrc.setTimestamp(jarStatus.getModificationTime());
+ amJarRsrc.setSize(jarStatus.getLen());
- // Set the container launch content into the ApplicationSubmissionContext
- appContext.setAMContainerSpec(amContainer);
+ // this creates a symlink in the working directory
+ localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc);
- // Create the request to send to the ApplicationsManager
- SubmitApplicationRequest appRequest = Records
- .newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
- job.getApplicationsManager().submitApplication(appRequest);
+ // Copy from hama-${version}.tar.gz to HDFS
+ Path hamaDstPath = new Path(getSystemDir(), YARNBSPConstants.HAMA_RELEASE_FILE);
+ hamaDstPath = fs.makeQualified(hamaDstPath);
+ fs.copyFromLocalFile(false, true,
+ new Path(YARNBSPConstants.HAMA_SRC_PATH, YARNBSPConstants.HAMA_RELEASE_FILE),
+ hamaDstPath);
+ FileStatus hamaStatus = fs.getFileStatus(hamaDstPath);
+ URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaDstPath
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+ LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
+ hamaReleaseRsrc.setResource(hamaReleaseUrl);
+ hamaReleaseRsrc.setSize(hamaStatus.getLen());
+ hamaReleaseRsrc.setTimestamp(hamaStatus.getModificationTime());
+ hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
+ hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- GetApplicationReportRequest reportRequest = Records
- .newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(id);
- while (report == null || report.getHost().equals("N/A")) {
- GetApplicationReportResponse reportResponse = job
- .getApplicationsManager().getApplicationReport(reportRequest);
- report = reportResponse.getApplicationReport();
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- LOG.error(
- "Got interrupted while waiting for a response report from AM.", e);
+ localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
+
+ // Set the local resources into the launch context
+ amContainer.setLocalResources(localResources);
+
+ // Set up the environment needed for the launch context
+ Map env = new HashMap();
+ // Assuming our classes or jars are available as local resources in the
+ // working directory from which the command will be run, we need to append
+ // "." to the path.
+ // By default, all the hadoop specific classpaths will already be available
+ // in $CLASSPATH, so we should be careful not to overwrite it.
+ StringBuilder classPathEnv = new StringBuilder(
+ ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+ .append("./*");
+ for (String c : yarnConf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
}
+ classPathEnv.append(File.pathSeparator);
+ classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK + "/hama-0.6.4/*");
+
+ env.put(YARNBSPConstants.HAMA_YARN_LOCATION, jarPath.toUri().toString());
+ env.put(YARNBSPConstants.HAMA_YARN_SIZE, Long.toString(jarStatus.getLen()));
+ env.put(YARNBSPConstants.HAMA_YARN_TIMESTAMP, Long.toString(jarStatus.getModificationTime()));
+
+ env.put(YARNBSPConstants.HAMA_RELEASE_LOCATION, hamaDstPath.toUri().toString());
+ env.put(YARNBSPConstants.HAMA_RELEASE_SIZE, Long.toString(hamaStatus.getLen()));
+ env.put(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP, Long.toString(hamaStatus.getModificationTime()));
+ env.put("CLASSPATH", classPathEnv.toString());
+ amContainer.setEnvironment(env);
+
+ // Set the necessary command to execute on the allocated container
+ Vector vargs = new Vector(5);
+ vargs.add("${JAVA_HOME}/bin/java");
+ vargs.add("-cp " + classPathEnv + "");
+ vargs.add(BSPApplicationMaster.class.getCanonicalName());
+ vargs.add(submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-appmaster.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-appmaster.stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ List commands = new ArrayList();
+ commands.add(command.toString());
+ amContainer.setCommands(commands);
+
+ LOG.debug("Start command: " + command);
+
+ Resource capability = Records.newRecord(Resource.class);
+ // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
+ // a base usage of 100mb
+ capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100));
+ LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!");
+
+ // Set the container launch content into the ApplicationSubmissionContext
+ appContext.setResource(capability);
+
+ // Setup security tokens
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
+ Credentials credentials = new Credentials();
+ String tokenRenewer = yarnConf.get(YarnConfiguration.RM_PRINCIPAL);
+ if (tokenRenewer == null || tokenRenewer.length() == 0) {
+ throw new IOException(
+ "Can't get Master Kerberos principal for the RM to use as renewer");
+ }
+
+ // For now, only getting tokens for the default file-system.
+ final Token> tokens[] =
+ fs.addDelegationTokens(tokenRenewer, credentials);
+ if (tokens != null) {
+ for (Token> token : tokens) {
+ LOG.info("Got dt for " + fs.getUri() + "; " + token);
+ }
+ }
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ amContainer.setTokens(fsTokens);
+ }
+
+ appContext.setAMContainerSpec(amContainer);
+
+ // Create the request to send to the ApplicationsManager
+ ApplicationId appId = appContext.getApplicationId();
+ yarnClient.submitApplication(appContext);
+
+ return monitorApplication(appId) ? new NetworkedJob() : null;
+ } catch (YarnException e) {
+ e.printStackTrace();
+ return null;
}
- LOG.info("Got report: " + report.getApplicationId() + " "
- + report.getHost() + ":" + report.getRpcPort());
- return new NetworkedJob();
}
@Override
- protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
- return Math.max(1, limitTasks);
- }
-
- @Override
public Path getSystemDir() {
return new Path(getConf().get("bsp.local.dir", "/tmp/hama-yarn/"));
}
@@ -207,4 +364,76 @@
return report;
}
+ private boolean monitorApplication(ApplicationId appId)
+ throws IOException, YarnException {
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.debug("Thread sleep in monitoring loop interrupted");
+ }
+
+ // Get application report for the appId we are interested in
+ report = yarnClient.getApplicationReport(appId);
+
+ LOG.info("Got application report from ASM for" + ", appId="
+ + appId.getId() + ", clientToAMToken="
+ + report.getClientToAMToken() + ", appDiagnostics="
+ + report.getDiagnostics() + ", appMasterHost="
+ + report.getHost() + ", appQueue=" + report.getQueue()
+ + ", appMasterRpcPort=" + report.getRpcPort()
+ + ", appStartTime=" + report.getStartTime()
+ + ", yarnAppState="
+ + report.getYarnApplicationState().toString()
+ + ", distributedFinalState="
+ + report.getFinalApplicationStatus().toString()
+ + ", appTrackingUrl=" + report.getTrackingUrl()
+ + ", appUser=" + report.getUser());
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ FinalApplicationStatus dsStatus = report
+ .getFinalApplicationStatus();
+ if (YarnApplicationState.FINISHED == state) {
+ if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+ LOG.info("Application has completed successfully. Breaking monitoring loop");
+ return true;
+ } else {
+ LOG.info("Application did finished unsuccessfully."
+ + " YarnState=" + state.toString()
+ + ", DSFinalStatus=" + dsStatus.toString()
+ + ". Breaking monitoring loop");
+ return false;
+ }
+ } else if (YarnApplicationState.KILLED == state
+ || YarnApplicationState.FAILED == state) {
+ LOG.info("Application did not finish." + " YarnState="
+ + state.toString() + ", DSFinalStatus="
+ + dsStatus.toString() + ". Breaking monitoring loop");
+ return false;
+ }
+
+ if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
+ LOG.info("Reached client specified timeout for application. Killing application");
+ forceKillApplication(appId);
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Kill a submitted application by sending a call to the ASM
+ * @param appId Application Id to be killed.
+ * @throws YarnException
+ * @throws IOException
+ */
+ private void forceKillApplication(ApplicationId appId)
+ throws YarnException, IOException {
+ // TODO clarify whether multiple jobs with the same app id can be submitted and be running at
+ // the same time.
+ // If yes, can we kill a particular attempt only?
+
+ // Response can be ignored as it is non-null on success or
+ // throws an exception in case of failures
+ yarnClient.killApplication(appId);
+ }
}
Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy)
@@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
@@ -37,7 +38,7 @@
public void bsp(
BSPPeer bspPeer)
throws IOException, SyncException, InterruptedException {
- num = bspPeer.getConfiguration().getInt("bsp.peers.num", 0);
+ num = bspPeer.getConfiguration().getInt("bsp.peers.num", 1);
LOG.info(bspPeer.getAllPeerNames());
int i = 0;
for (String otherPeer : bspPeer.getAllPeerNames()) {
@@ -57,15 +58,19 @@
InterruptedException, ClassNotFoundException {
HamaConfiguration conf = new HamaConfiguration();
// TODO some keys that should be within a conf
- conf.set("yarn.resourcemanager.address", "0.0.0.0:8040");
- conf.set("bsp.local.dir", "/tmp/bsp-yarn/");
+ conf.set("bsp.user.name", "root");
+ conf.setInt(Constants.MAX_TASKS, 10);
YARNBSPJob job = new YARNBSPJob(conf);
+ job.setBoolean("hama.yarn.application", true);
job.setBspClass(HelloBSP.class);
job.setJarByClass(HelloBSP.class);
job.setJobName("Serialize Printing");
- job.setMemoryUsedPerTaskInMb(50);
- job.setNumBspTask(2);
+ job.setInputFormat(NullInputFormat.class);
+ job.setOutputFormat(NullOutputFormat.class);
+
+ job.setMemoryUsedPerTaskInMb(100);
+ job.setNumBspTask(4);
job.waitForCompletion(true);
}
}