Index: yarn/pom.xml
===================================================================
--- yarn/pom.xml (revision 1654753)
+++ yarn/pom.xml (working copy)
@@ -19,7 +19,7 @@
org.apache.hama
hama-parent
- 0.6.3-SNAPSHOT
+ 0.7.0-SNAPSHOT
4.0.0
@@ -26,13 +26,15 @@
org.apache.hama
hama-yarn
yarn
- 0.6.3-SNAPSHOT
+ 0.7.0-SNAPSHOT
jar
- 1.2.0
+
-
+
org.apache.hama
@@ -54,27 +56,41 @@
avro
1.5.3
-
org.apache.hadoop
hadoop-yarn-api
- 0.23.1
+ ${hadoop.version}
org.apache.hadoop
hadoop-yarn-common
- 0.23.1
+ ${hadoop.version}
org.apache.hadoop
hadoop-yarn-server-tests
- 0.23.1
+ ${hadoop.version}
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ ${hadoop.version}
+
org.apache.zookeeper
zookeeper
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ 2.4.0.2.1.5.0-695
+
Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy)
@@ -20,6 +20,8 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -33,13 +35,16 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
+//import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -46,10 +51,14 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+//import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Job.JobState;
import org.apache.hama.bsp.sync.SyncServerRunner;
@@ -56,6 +65,8 @@
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.Server;
import org.apache.hama.util.BSPNetUtils;
/**
@@ -73,8 +84,10 @@
private Clock clock;
private YarnRPC yarnRPC;
- private AMRMProtocol amrmRPC;
+ private RPC rpc;
+ private ApplicationMasterProtocol amrmRPC;
+
private ApplicationAttemptId appAttemptId;
private String applicationName;
private long startTime;
@@ -104,6 +117,7 @@
}
this.jobFile = args[0];
+ LOG.info("jobFile >>>> " + jobFile);
this.localConf = new YarnConfiguration();
this.jobConf = getSubmitConfiguration(jobFile);
@@ -159,14 +173,23 @@
*/
private void startRPCServers() throws IOException {
// start the RPC server which talks to the client
- this.clientServer = RPC.getServer(BSPClient.class, this, hostname,
- clientPort, jobConf);
+ InetSocketAddress BSPClientAddress = jobConf.getSocketAddr(hostname, hostname, clientPort);
+ LOG.info("BSPClientAddress >>>> " + BSPClientAddress);
+ this.clientServer = rpc.getServer(BSPClient.class, hostname, clientPort, jobConf);
+// this.clientServer = rpc.getServer(BSPClient.class, this, BSPClientAddress, jobConf, null,
+// jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+// YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
this.clientServer.start();
// start the RPC server which talks to the tasks
this.taskServerPort = BSPNetUtils.getFreePort(10000);
- this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname,
- taskServerPort, jobConf);
+ InetSocketAddress BSPPeerProtocolAddress = jobConf
+ .getSocketAddr(hostname, hostname, taskServerPort);
+ this.taskServer = rpc.getServer(BSPPeerProtocol.class, hostname, taskServerPort, jobConf);
+// this.taskServer = rpc.getServer(BSPPeerProtocol.class, this, BSPPeerProtocolAddress,
+// jobConf, null,
+// jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+// YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
this.taskServer.start();
// readjusting the configuration to let the tasks know where we are.
this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
@@ -191,28 +214,57 @@
* @param yarnConf
* @return a new RPC connection to the Resource Manager.
*/
- private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) {
+ private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) throws IOException {
// Connect to the Scheduler of the ResourceManager.
- InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ LOG.info("current User & appAttemptId : " + currentUser + " " + appAttemptId.toString());
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ final InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+ Token extends TokenIdentifier> amRMToken = setupAndReturnAMRMToken(rmAddress, credentials.getAllTokens());
+ currentUser.addToken(amRMToken);
+ final Configuration conf = yarnConf;
+ ApplicationMasterProtocol client = currentUser
+ .doAs(new PrivilegedAction() {
+ @Override
+ public ApplicationMasterProtocol run() {
+ return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);
+ }
+ });
LOG.info("Connecting to ResourceManager at " + rmAddress);
- return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress,
- yarnConf);
+ return client;
+// return (ApplicationMasterProtocol)yarnRPC
+// .getProxy(ApplicationMasterProtocol.class, rmAddress, yarnConf);
}
+ @SuppressWarnings("unchecked")
+ private Token extends TokenIdentifier> setupAndReturnAMRMToken(
+ InetSocketAddress rmBindAddress,
+ Collection> allTokens) {
+ for (Token extends TokenIdentifier> token : allTokens) {
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ SecurityUtil.setTokenService(token, rmBindAddress);
+ return (Token) token;
+ }
+ }
+ return null;
+ }
+
+
/**
* Registers this application master with the Resource Manager and retrieves a
* response which is used to launch additional containers.
*/
private static RegisterApplicationMasterResponse registerApplicationMaster(
- AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID,
+ ApplicationMasterProtocol resourceManager, ApplicationAttemptId appAttemptID,
String appMasterHostName, int appMasterRpcPort,
- String appMasterTrackingUrl) throws YarnRemoteException {
+ String appMasterTrackingUrl) throws YarnException, IOException {
+
RegisterApplicationMasterRequest appMasterRequest = Records
.newRecord(RegisterApplicationMasterRequest.class);
- appMasterRequest.setApplicationAttemptId(appAttemptID);
+ //appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setHost(appMasterHostName);
appMasterRequest.setRpcPort(appMasterRpcPort);
// TODO tracking URL
@@ -219,7 +271,7 @@
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
RegisterApplicationMasterResponse response = resourceManager
.registerApplicationMaster(appMasterRequest);
- LOG.debug("ApplicationMaster has maximum resource capability of: "
+ LOG.info("ApplicationMaster has maximum resource capability of: "
+ response.getMaximumResourceCapability().getMemory());
return response;
}
@@ -234,12 +286,13 @@
private static ApplicationAttemptId getApplicationAttemptId()
throws IOException {
Map envs = System.getenv();
- if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+ if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
throw new IllegalArgumentException(
"ApplicationAttemptId not set in the environment");
}
+
return ConverterUtils.toContainerId(
- envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV))
+ envs.get(Environment.CONTAINER_ID.name()))
.getApplicationAttemptId();
}
@@ -247,6 +300,7 @@
JobState finalState = null;
try {
job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId);
+ LOG.info("job : " + job.getState());
finalState = job.startJob();
} finally {
if (finalState != null) {
@@ -259,7 +313,7 @@
}
}
- private void cleanup() throws YarnRemoteException {
+ private void cleanup() throws YarnException, IOException {
syncServer.stop();
if (threadPool != null && !threadPool.isShutdown()) {
threadPool.shutdownNow();
@@ -268,29 +322,31 @@
taskServer.stop();
FinishApplicationMasterRequest finishReq = Records
.newRecord(FinishApplicationMasterRequest.class);
- finishReq.setAppAttemptId(appAttemptId);
+ //finishReq.setAppAttemptId(appAttemptId);
switch (job.getState()) {
case SUCCESS:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
break;
case KILLED:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED);
break;
case FAILED:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
break;
default:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
}
this.amrmRPC.finishApplicationMaster(finishReq);
}
- public static void main(String[] args) throws YarnRemoteException {
+ public static void main(String[] args) throws YarnException, IOException {
// we expect getting the qualified path of the job.xml as the first
// element in the arguments
BSPApplicationMaster master = null;
try {
+ LOG.info("Start Application Master!!!!!");
master = new BSPApplicationMaster(args);
+ LOG.info("master : " + master.getCurrentSuperStep());
master.start();
} catch (Exception e) {
LOG.fatal("Error starting BSPApplicationMaster", e);
Index: yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (working copy)
@@ -37,7 +37,7 @@
private static final Log LOG = LogFactory.getLog(BSPRunner.class);
- private Configuration conf;
+ private HamaConfiguration conf;
private TaskAttemptID id;
private BSPPeerImpl, ?, ?, ?, ? extends Writable> peer;
private Counters counters = new Counters();
Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy)
@@ -19,8 +19,8 @@
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,20 +28,17 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -51,17 +48,20 @@
private final Container allocatedContainer;
private final int id;
- private final ContainerManager cm;
+ private final ContainerManagementProtocol cm;
private final Configuration conf;
private String user;
private final Path jobFile;
private final BSPJobID jobId;
- private GetContainerStatusRequest statusRequest;
+ private ByteBuffer allTokens;
+ private UserGroupInformation appSubmitterUgi;
- public BSPTaskLauncher(int id, Container container, ContainerManager cm,
+ private GetContainerStatusesRequest statusRequest;
+
+ public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm,
Configuration conf, Path jobFile, BSPJobID jobId)
- throws YarnRemoteException {
+ throws YarnException {
this.id = id;
this.cm = cm;
this.conf = conf;
@@ -71,7 +71,8 @@
// FIXME why does this contain mapreduce here?
this.user = conf.get("bsp.user.name");
if (this.user == null) {
- this.user = conf.get("mapreduce.job.user.name");
+ //this.user = conf.get("mapreduce.job.user.name");
+ user = System.getenv(ApplicationConstants.Environment.USER.name());
}
}
@@ -80,14 +81,15 @@
stopAndCleanup();
}
- public void stopAndCleanup() throws YarnRemoteException {
- StopContainerRequest stopRequest = Records
- .newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(allocatedContainer.getId());
- cm.stopContainer(stopRequest);
+ public void stopAndCleanup() throws YarnException, IOException {
+ StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class);
+ List containerIds = new ArrayList();
+ containerIds.add(allocatedContainer.getId());
+ stopRequest.setContainerIds(containerIds);
+ cm.stopContainers(stopRequest);
}
- public void start() throws IOException {
+ public void start() throws IOException, YarnException {
LOG.info("Spawned task with id: " + this.id
+ " for allocated container id: "
+ this.allocatedContainer.getId().toString());
@@ -103,20 +105,22 @@
*/
public BSPTaskStatus poll() throws Exception {
- ContainerStatus lastStatus;
- if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
- .getState() != ContainerState.COMPLETE) {
- return null;
+ ContainerStatus lastStatus = null;
+ GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
+ List containerStatuses = getContainerStatusesResponse.getContainerStatuses();
+ for (ContainerStatus containerStatus : containerStatuses) {
+ if ((lastStatus = containerStatus).getState() != ContainerState.COMPLETE) {
+ return null;
+ }
+ LOG.info(this.id + "\tLast report comes with existatus of "
+ + lastStatus.getExitStatus() + " and diagnose string of "
+ + lastStatus.getDiagnostics());
}
- LOG.info(this.id + "\tLast report comes with existatus of "
- + lastStatus.getExitStatus() + " and diagnose string of "
- + lastStatus.getDiagnostics());
return new BSPTaskStatus(id, lastStatus.getExitStatus());
}
- private GetContainerStatusRequest setupContainer(
- Container allocatedContainer, ContainerManager cm, String user, int id)
- throws IOException {
+ private GetContainerStatusesRequest setupContainer(
+ Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException {
LOG.info("Setting up a container for user " + user + " with id of " + id
+ " and containerID of " + allocatedContainer.getId() + " as " + user);
// Now we setup a ContainerLaunchContext
@@ -123,16 +127,49 @@
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
- ctx.setContainerId(allocatedContainer.getId());
- ctx.setResource(allocatedContainer.getResource());
- ctx.setUser(user);
+// ctx.setContainerId(allocatedContainer.getId());
+// ctx.setResource(allocatedContainer.getResource());
+// ctx.setUser(user);
+ // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
+ // are marked as LimitedPrivate
+
/*
+ Credentials credentials =
+ UserGroupInformation.getCurrentUser().getCredentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ // Now remove the AM->RM token so that containers cannot access it.
+ Iterator> iter = credentials.getAllTokens().iterator();
+ LOG.info("Executing with tokens:");
+ while (iter.hasNext()) {
+ Token> token = iter.next();
+ LOG.info("token is " + token);
+ if (token.getKind().equals(ContainerTokenIdentifier.KIND)) {
+ iter.remove();
+ }
+ }
+ allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ ctx.setTokens(allTokens.duplicate());
+
+
+ // Create appSubmitterUgi and add original tokens to it
+ String appSubmitterUserName =
+ System.getenv(ApplicationConstants.Environment.USER.name());
+ appSubmitterUgi =
+ UserGroupInformation.createRemoteUser(appSubmitterUserName);
+ appSubmitterUgi.addCredentials(credentials);
+*/
+
+
+ /*
* jar
*/
LocalResource packageResource = Records.newRecord(LocalResource.class);
FileSystem fs = FileSystem.get(conf);
- Path packageFile = new Path(conf.get("bsp.jar"));
+ LOG.info("bsp.jar : " + conf.get("bsp.jar"));
+ LOG.info("app.jar resource : " + System.getenv("HAMAYARNJARLOCATION"));
+ Path packageFile = new Path(System.getenv("HAMAYARNJARLOCATION"));
// FIXME there seems to be a problem with the converter utils and URL
// transformation
URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
@@ -167,26 +204,57 @@
.toString() + ":";
LOG.info("Localized file scheme detected, adjusting CP to: " + cp);
}
- String[] cmds = {
- "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" "
- + BSPRunner.class.getCanonicalName(),
- jobId.getJtIdentifier(),
- id + "",
- this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString(),
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
- " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" };
- ctx.setCommands(Arrays.asList(cmds));
- LOG.info("Starting command: " + Arrays.toString(cmds));
+// String[] cmds = {
+// "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" "
+// + BSPRunner.class.getCanonicalName(),
+// jobId.getJtIdentifier(),
+// id + "",
+// this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+// .toString(),
+// " 1>/var/log/hadoop-yarn/yarn/hama-yarncontainer.stdout",
+// " 2>/var/log/hadoop-yarn/yarn/hama-yarncontainer.stderr" };
+// //" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+// //" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" };
+ Vector vargs = new Vector();
+ vargs.add("${JAVA_HOME}/bin/java");
+ vargs.add("-cp \"" + cp + "\"");
+ vargs.add(BSPRunner.class.getCanonicalName());
+ vargs.add(jobId.getJtIdentifier());
+ vargs.add(Integer.toString(id));
+ vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+ .toString());
+ vargs.add("1>/var/log/hadoop-yarn/yarn/hama-yarncontainer.stdout");
+ vargs.add("2>/var/log/hadoop-yarn/yarn/hama-yarncontainer.stderr");
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ List commands = new ArrayList();
+ commands.add(command.toString());
+
+ ctx.setCommands(commands);
+ LOG.info("Starting command: " + commands);
+
StartContainerRequest startReq = Records
.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
- cm.startContainer(startReq);
- GetContainerStatusRequest statusReq = Records
- .newRecord(GetContainerStatusRequest.class);
- statusReq.setContainerId(allocatedContainer.getId());
+ List list = new ArrayList();
+ list.add(startReq);
+ StartContainersRequest requestList = StartContainersRequest.newInstance(list);
+ LOG.info("container token" + allocatedContainer.getContainerToken().getKind());
+ LOG.info (" requestList : " + requestList.getStartContainerRequests());
+ LOG.info("container manager: " + cm.toString());
+ cm.startContainers(requestList);
+
+ GetContainerStatusesRequest statusReq = Records
+ .newRecord(GetContainerStatusesRequest.class);
+ List containerIds = new ArrayList();
+ containerIds.add(allocatedContainer.getId());
+ statusReq.setContainerIds(containerIds);
return statusReq;
}
Index: yarn/src/main/java/org/apache/hama/bsp/Job.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/Job.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/Job.java (working copy)
@@ -17,8 +17,10 @@
*/
package org.apache.hama.bsp;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import java.io.IOException;
+
/**
* Main interface to interact with the job. Provides only getters.
*/
@@ -34,7 +36,7 @@
public JobState startJob() throws Exception;
- public void cleanup() throws YarnRemoteException;
+ public void cleanup() throws YarnException, IOException;
JobState getState();
Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy)
@@ -17,34 +17,43 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+//import org.apache.hadoop.security.token.Token;
+//import org.apache.hadoop.security.token.TokenIdentifier;
+//import org.apache.hadoop.security.token.*;
+//import org.apache.hadoop.security.token.Token;
+//import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
@@ -66,7 +75,7 @@
private ApplicationAttemptId appAttemptId;
private YarnRPC yarnRPC;
- private AMRMProtocol resourceManager;
+ private ApplicationMasterProtocol resourceManager;
private List allocatedContainers;
private List releasedContainers = Collections.emptyList();
@@ -76,9 +85,19 @@
private int lastResponseID = 0;
+ private int getMemoryRequirements() {
+ String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+ if (newMemoryProperty == null) {
+ LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
+ return getMemoryFromOptString(childOpts);
+ } else {
+ return Integer.valueOf(newMemoryProperty);
+ }
+ }
+
public JobImpl(ApplicationAttemptId appAttemptId,
- Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
- String jobFile, BSPJobID jobId) {
+ Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC,
+ String jobFile, BSPJobID jobId) {
super();
this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1);
this.appAttemptId = appAttemptId;
@@ -91,21 +110,16 @@
this.childOpts = conf.get("bsp.child.java.opts");
this.taskMemoryInMb = getMemoryRequirements();
+
LOG.info("Memory per task: " + taskMemoryInMb + "m!");
}
- private int getMemoryRequirements() {
- String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
- if (newMemoryProperty == null) {
- LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
- return getMemoryFromOptString(childOpts);
- } else {
- return Integer.valueOf(newMemoryProperty);
+ // This really needs a testcase
+ private static int getMemoryFromOptString(String opts) {
+ if (opts == null) {
+ return DEFAULT_MEMORY_MB;
}
- }
- // This really needs a testcase
- private static int getMemoryFromOptString(String opts) {
if (!opts.contains("-Xmx")) {
LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
return DEFAULT_MEMORY_MB;
@@ -133,28 +147,35 @@
public JobState startJob() throws Exception {
this.allocatedContainers = new ArrayList(numBSPTasks);
+ List nmTokenList = null;
while (allocatedContainers.size() < numBSPTasks) {
- AllocateRequest req = BuilderUtils.newAllocateRequest(
- appAttemptId,
- lastResponseID,
- 0.0f,
- createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
- taskMemoryInMb, priority), releasedContainers);
+// AllocateRequest req = BuilderUtils.newAllocateRequest(
+// appAttemptId,
+// lastResponseID,
+// 0.0f,
+// createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
+// taskMemoryInMb, priority), releasedContainers);
+ AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f,
+ createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb,
+ priority), releasedContainers, null);
+
AllocateResponse allocateResponse = resourceManager.allocate(req);
- AMResponse amResponse = allocateResponse.getAMResponse();
- LOG.info("Got response! ID: " + amResponse.getResponseId()
+ LOG.info("Got response! ID: " + allocateResponse.getResponseId()
+ " with num of containers: "
- + amResponse.getAllocatedContainers().size()
+ + allocateResponse.getAllocatedContainers().size()
+ " and following resources: "
- + amResponse.getAvailableResources().getMemory() + "mb");
- this.lastResponseID = amResponse.getResponseId();
+ + allocateResponse.getAvailableResources().getMemory() + "mb");
+ this.lastResponseID = allocateResponse.getResponseId();
// availableResources = amResponse.getAvailableResources();
- this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
- LOG.info("Waiting to allocate "
- + (numBSPTasks - allocatedContainers.size()) + " more containers...");
+ this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
+ LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers...");
+
+ LOG.info("getNMToken : " + allocateResponse.getNMTokens());
+ nmTokenList = allocateResponse.getNMTokens();
+
Thread.sleep(1000l);
}
@@ -166,17 +187,36 @@
+ allocatedContainer.getId() + ", containerNode="
+ allocatedContainer.getNodeId().getHost() + ":"
+ allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
- + allocatedContainer.getNodeHttpAddress() + ", containerState"
- + allocatedContainer.getState() + ", containerResourceMemory"
+ + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
// Connect to ContainerManager on the allocated container
- String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":"
- + allocatedContainer.getNodeId().getPort();
- InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
- ContainerManager cm = (ContainerManager) yarnRPC.getProxy(
- ContainerManager.class, cmAddress, conf);
+ String user = conf.get("bsp.user.name");
+ if (user == null) {
+ user = System.getenv(ApplicationConstants.Environment.USER.name());
+ }
+ LOG.info("AppAttemptId : " + appAttemptId + ", AllocatedContainer NodeId : "
+ + allocatedContainer.getNodeId() + ", User : " + user);
+ Token nmToken = null;
+ for (NMToken token : nmTokenList) {
+ LOG.info("token : " + token.getToken());
+ nmToken = token.getToken();
+ }
+ ContainerManagementProtocol cm = null;
+ LOG.info("Token!!! " + nmToken);
+
+ try {
+ cm = getContainerManagementProtocolProxy(yarnRPC,
+ nmToken, allocatedContainer.getNodeId(), user);
+ } catch (Exception e) {
+ LOG.error("failed to create cm!!!");
+ if (cm != null)
+ yarnRPC.stopProxy(cm, conf);
+ e.printStackTrace();
+ }
+
+
BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
allocatedContainer, cm, conf, jobFile, jobId);
@@ -214,14 +254,50 @@
return state;
}
+ /*
+ private org.apache.hadoop.security.token.Token extends NMTokenIdentifier> setupAndReturnAMRMToken(
+ InetSocketAddress rmBindAddress,
+ Collection> allTokens) {
+ for (org.apache.hadoop.security.token.Token extends TokenIdentifier> token : allTokens) {
+ if (token.getKind().equals(NMTokenIdentifier.KIND)) {
+ SecurityUtil.setTokenService(token, rmBindAddress);
+ return (org.apache.hadoop.security.token.Token) token;
+ }
+ }
+ return null;
+ }*/
+
+ protected ContainerManagementProtocol getContainerManagementProtocolProxy(
+ final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) {
+ ContainerManagementProtocol proxy;
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+ final InetSocketAddress addr =
+ NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort());
+ if (nmToken != null) {
+ ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
+ }
+
+ proxy = ugi
+ .doAs(new PrivilegedAction() {
+ @Override
+ public ContainerManagementProtocol run() {
+ LOG.info("ContainerManagementProtocol running!!!");
+ return (ContainerManagementProtocol) rpc.getProxy(
+ ContainerManagementProtocol.class,
+ addr, conf);
+ }
+ });
+ return proxy;
+ }
+
/**
* Makes a lookup for the taskid and stops its container and task. It also
* removes the task from the launcher so that we won't have to stop it twice.
*
* @param id
- * @throws YarnRemoteException
+ * @throws YarnException
*/
- private void cleanupTask(int id) throws YarnRemoteException {
+ private void cleanupTask(int id) throws YarnException, IOException {
BSPTaskLauncher bspTaskLauncher = launchers.get(id);
bspTaskLauncher.stopAndCleanup();
launchers.remove(id);
@@ -229,7 +305,7 @@
}
@Override
- public void cleanup() throws YarnRemoteException {
+ public void cleanup() throws YarnException, IOException {
for (BSPTaskLauncher launcher : completionQueue) {
launcher.stopAndCleanup();
}
@@ -247,7 +323,7 @@
// whether a particular rack/host is needed
// useful for applications that are sensitive
// to data locality
- rsrcRequest.setHostName("*");
+ rsrcRequest.setResourceName("*");
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (working copy)
@@ -24,7 +24,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -31,7 +31,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.HamaConfiguration;
@@ -46,7 +46,7 @@
private BSPClient client;
private boolean submitted;
private ApplicationReport report;
- private ClientRMProtocol applicationsManager;
+ private ApplicationClientProtocol applicationsManager;
private YarnRPC rpc;
public YARNBSPJob(HamaConfiguration conf) throws IOException {
@@ -58,8 +58,8 @@
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
- this.applicationsManager = ((ClientRMProtocol) rpc.getProxy(
- ClientRMProtocol.class, rmAddress, conf));
+ this.applicationsManager = ((ApplicationClientProtocol) rpc.getProxy(
+ ApplicationClientProtocol.class, rmAddress, conf));
}
public void setMemoryUsedPerTaskInMb(int mem) {
@@ -66,7 +66,7 @@
conf.setInt("bsp.child.mem.in.mb", mem);
}
- public void kill() throws YarnRemoteException {
+ public void kill() throws YarnException, IOException {
if (submitClient != null) {
KillApplicationRequest killRequest = Records
.newRecord(KillApplicationRequest.class);
@@ -95,45 +95,44 @@
this.submit();
}
- client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID,
- NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf);
- GetApplicationReportRequest reportRequest = Records
- .newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(submitClient.getId());
+ client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID, NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf);
+ GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
+ reportRequest.setApplicationId(submitClient.getId());
- GetApplicationReportResponse reportResponse = applicationsManager
- .getApplicationReport(reportRequest);
- ApplicationReport localReport = reportResponse.getApplicationReport();
- long clientSuperStep = -1L;
- while (localReport.getFinalApplicationStatus() != null
- && localReport.getFinalApplicationStatus().ordinal() == 0) {
- LOG.debug("currently in state: "
- + localReport.getFinalApplicationStatus());
- if (verbose) {
- long remoteSuperStep = client.getCurrentSuperStep().get();
- if (clientSuperStep < remoteSuperStep) {
- clientSuperStep = remoteSuperStep;
- LOG.info("Current supersteps number: " + clientSuperStep);
+ try {
+ GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest);
+ ApplicationReport localReport = reportResponse.getApplicationReport();
+ long clientSuperStep = -1L;
+ while (localReport.getFinalApplicationStatus() != null && localReport.getFinalApplicationStatus().ordinal() == 0) {
+ LOG.debug("currently in state: " + localReport.getFinalApplicationStatus());
+ if (verbose) {
+ long remoteSuperStep = client.getCurrentSuperStep().get();
+ if (clientSuperStep < remoteSuperStep) {
+ clientSuperStep = remoteSuperStep;
+ LOG.info("Current supersteps number: " + clientSuperStep);
+ }
}
+ reportResponse = applicationsManager.getApplicationReport(reportRequest);
+ localReport = reportResponse.getApplicationReport();
+
+ Thread.sleep(3000L);
}
- reportResponse = applicationsManager.getApplicationReport(reportRequest);
- localReport = reportResponse.getApplicationReport();
- Thread.sleep(3000L);
- }
- if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
- LOG.info("Job succeeded!");
- return true;
- } else {
- LOG.info("Job failed with status: "
- + localReport.getFinalApplicationStatus().toString() + "!");
+ if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
+ LOG.info("Job succeeded!");
+ return true;
+ } else {
+ LOG.info("Job failed with status: " + localReport.getFinalApplicationStatus().toString() + "!");
+ return false;
+ }
+ } catch (YarnException e) {
+ e.printStackTrace();
return false;
}
-
}
- ClientRMProtocol getApplicationsManager() {
+ ApplicationClientProtocol getApplicationsManager() {
return applicationsManager;
}
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy)
@@ -17,9 +17,12 @@
*/
package org.apache.hama.bsp;
+import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -27,6 +30,10 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@@ -33,14 +40,10 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.HamaConfiguration;
@@ -61,6 +64,10 @@
Path submitJobFile, FileSystem pFs) throws IOException {
YARNBSPJob job = (YARNBSPJob) normalJob;
+ YarnConfiguration conf = new YarnConfiguration();
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
LOG.info("Submitting job...");
if (getConf().get("bsp.child.mem.in.mb") == null) {
@@ -77,122 +84,243 @@
LOG.debug("Retrieved username: " + s);
}
- GetNewApplicationRequest request = Records
- .newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse response = job.getApplicationsManager()
- .getNewApplication(request);
- id = response.getApplicationId();
- LOG.debug("Got new ApplicationId=" + id);
+ try {
+ List listAclInfo = yarnClient.getQueueAclsInfo();
+ for (QueueUserACLInfo aclInfo : listAclInfo) {
+ for (QueueACL userAcl : aclInfo.getUserAcls()) {
+ LOG.info("User ACL Info for Queue"
+ + ", queueName=" + aclInfo.getQueueName()
+ + ", userAcl=" + userAcl.name());
+ }
+ }
+ GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
- // Create a new ApplicationSubmissionContext
- ApplicationSubmissionContext appContext = Records
- .newRecord(ApplicationSubmissionContext.class);
- // set the ApplicationId
- appContext.setApplicationId(this.id);
- // set the application name
- appContext.setApplicationName(job.getJobName());
+ GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request);
+ id = response.getApplicationId();
+ LOG.debug("Got new ApplicationId=" + id);
- // Create a new container launch context for the AM's container
- ContainerLaunchContext amContainer = Records
- .newRecord(ContainerLaunchContext.class);
+ // Create a new ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+ // set the ApplicationId
+ appContext.setApplicationId(this.id);
+ // set the application name
+ appContext.setApplicationName(job.getJobName());
- // Define the local resources required
- Map localResources = new HashMap();
- // Lets assume the jar we need for our ApplicationMaster is available in
- // HDFS at a certain known path to us and we want to make it available to
- // the ApplicationMaster in the launched container
- if (job.getJar() == null) {
- throw new IllegalArgumentException(
- "Jar must be set in order to run the application!");
- }
- Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar");
- fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath);
- LOG.debug("Copying app jar to " + jarPath);
- getConf()
- .set(
- "bsp.jar",
- jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString());
- FileStatus jarStatus = fs.getFileStatus(jarPath);
- LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
- amJarRsrc.setType(LocalResourceType.FILE);
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
- amJarRsrc.setTimestamp(jarStatus.getModificationTime());
- amJarRsrc.setSize(jarStatus.getLen());
- // this creates a symlink in the working directory
- localResources.put("AppMaster.jar", amJarRsrc);
- // Set the local resources into the launch context
- amContainer.setLocalResources(localResources);
+ // Create a new container launch context for the AM's container
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
- // Set up the environment needed for the launch context
- Map env = new HashMap();
- // Assuming our classes or jars are available as local resources in the
- // working directory from which the command will be run, we need to append
- // "." to the path.
- // By default, all the hadoop specific classpaths will already be available
- // in $CLASSPATH, so we should be careful not to overwrite it.
- String classPathEnv = "$CLASSPATH:./*:";
- env.put("CLASSPATH", classPathEnv);
- amContainer.setEnvironment(env);
+ // Define the local resources required
+ Map localResources = new HashMap();
+ // Lets assume the jar we need for our ApplicationMaster is available in
+ // HDFS at a certain known path to us and we want to make it available to
+ // the ApplicationMaster in the launched container
+ if (job.getJar() == null) {
+ throw new IllegalArgumentException("Jar must be set in order to run the application!");
+ }
+ Path jarPath = new Path(fs.getHomeDirectory(), "app.jar");
+ LOG.info(fs.getHomeDirectory());
+ LOG.info(job.getLocalPath("hama-yarn-0.7.0-SNAPSHOT.jar"));
+ fs.copyFromLocalFile(false, true, job.getLocalPath("hama-yarn-0.7.0-SNAPSHOT.jar"), jarPath);
+ //fs.copyFromLocalFile(false, true, job.getLocalPath(job.getJar()), jarPath);
+ LOG.info("DST : " + jarPath);
+ LOG.debug("Copying app jar to " + jarPath);
+ //getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+ getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), jarPath).toString());
- // Construct the command to be executed on the launched container
- String command = "${JAVA_HOME}"
- + "/bin/java -cp "
- + classPathEnv
- + " "
- + BSPApplicationMaster.class.getCanonicalName()
- + " "
- + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/stderr";
+ FileStatus jarStatus = fs.getFileStatus(jarPath);
+ LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+ amJarRsrc.setType(LocalResourceType.FILE);
+ amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+ amJarRsrc.setTimestamp(jarStatus.getModificationTime());
+ amJarRsrc.setSize(jarStatus.getLen());
+ // this creates a symlink in the working directory
+ localResources.put("AppMaster.jar", amJarRsrc);
- LOG.debug("Start command: " + command);
+ String hamaYarnLocation = jarPath.toUri().toString();
+ FileStatus hamaYarnFileStatus = fs.getFileStatus(jarPath);
+ long hamaYarnLen = hamaYarnFileStatus.getLen();
+ long hamaYarnTimestamp = hamaYarnFileStatus.getModificationTime();
- amContainer.setCommands(Collections.singletonList(command));
+ //
+ Path hamaCoreJarPath = new Path(fs.getHomeDirectory(), "hama-core.jar");
+ LOG.info("SRC : " + job.getLocalPath("/hama-0.6.4/hama-core-0.7.0-SNAPSHOT.jar"));
+ fs.copyFromLocalFile(false, true,
+ new Path("/root/hama-0.6.4/hama-core-0.7.0-SNAPSHOT.jar"), hamaCoreJarPath);
+ LOG.info("DST : " + hamaCoreJarPath);
- Resource capability = Records.newRecord(Resource.class);
- // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
- // a base usage of 100mb
- capability.setMemory(3 * job.getNumBspTask()
- + getConf().getInt("hama.appmaster.memory.mb", 100));
- LOG.info("Set memory for the application master to "
- + capability.getMemory() + "mb!");
- amContainer.setResource(capability);
+ jarStatus = fs.getFileStatus(hamaCoreJarPath);
+ LocalResource hamaCoreJarRsrc = Records.newRecord(LocalResource.class);
+ hamaCoreJarRsrc.setType(LocalResourceType.FILE);
+ hamaCoreJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ hamaCoreJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(hamaCoreJarPath));
+ hamaCoreJarRsrc.setTimestamp(jarStatus.getModificationTime());
+ hamaCoreJarRsrc.setSize(jarStatus.getLen());
+ localResources.put("HamaCore.jar", hamaCoreJarRsrc);
- // Set the container launch content into the ApplicationSubmissionContext
- appContext.setAMContainerSpec(amContainer);
+ // Set the local resources into the launch context
+ amContainer.setLocalResources(localResources);
- // Create the request to send to the ApplicationsManager
- SubmitApplicationRequest appRequest = Records
- .newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
- job.getApplicationsManager().submitApplication(appRequest);
+ String hamaCoreLocation = hamaCoreJarPath.toUri().toString();
+ FileStatus hamaCoreFileStatus = fs.getFileStatus(hamaCoreJarPath);
+ long hamaCoreLen = hamaCoreFileStatus.getLen();
+ long hamaCoreTimestamp = hamaCoreFileStatus.getModificationTime();
- GetApplicationReportRequest reportRequest = Records
- .newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(id);
- while (report == null || report.getHost().equals("N/A")) {
- GetApplicationReportResponse reportResponse = job
- .getApplicationsManager().getApplicationReport(reportRequest);
- report = reportResponse.getApplicationReport();
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- LOG.error(
- "Got interrupted while waiting for a response report from AM.", e);
+ // Set up the environment needed for the launch context
+ Map env = new HashMap();
+ // Assuming our classes or jars are available as local resources in the
+ // working directory from which the command will be run, we need to append
+ // "." to the path.
+ // By default, all the hadoop specific classpaths will already be available
+ // in $CLASSPATH, so we should be careful not to overwrite it.
+ StringBuilder classPathEnv = new StringBuilder(
+ ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+ .append("./*");
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
}
+
+ env.put("HAMAYARNJARLOCATION", hamaYarnLocation);
+ env.put("HAMAYARNJARSIZE", Long.toString(hamaYarnLen));
+ env.put("HAMAYARNJARTIMESTAMP", Long.toString(hamaYarnTimestamp));
+
+ env.put("HAMACOREJARLOCATION", hamaCoreLocation);
+ env.put("HAMACOREJARSIZE", Long.toString(hamaCoreLen));
+ env.put("HAMACOREJARTIMESTAMP", Long.toString(hamaCoreTimestamp));
+
+ env.put("CLASSPATH", classPathEnv.toString());
+ amContainer.setEnvironment(env);
+
+ // Construct the command to be executed on the launched container
+ String command = "${JAVA_HOME}" + "/bin/java -classpath " + classPathEnv + " "
+ + BSPApplicationMaster.class.getCanonicalName() + " "
+ + jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()
+ + " 1>/var/log/hadoop-yarn/yarn/hama-yarn.stdout"
+ + " 2>/var/log/hadoop-yarn/yarn/hama-yarn.stderr";
+
+ LOG.info("Start command: " + command);
+ LOG.debug("Start command: " + command);
+
+ amContainer.setCommands(Collections.singletonList(command));
+
+ Resource capability = Records.newRecord(Resource.class);
+ // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
+ // a base usage of 100mb
+ capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100));
+ LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!");
+ //amContainer.setResource(capability);
+
+ // Set the container launch content into the ApplicationSubmissionContext
+ appContext.setResource(capability);
+
+
+ // Setup security tokens
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
+ LOG.info("SecurityEnable!!!");
+ Credentials credentials = new Credentials();
+ String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
+ if (tokenRenewer == null || tokenRenewer.length() == 0) {
+ throw new IOException(
+ "Can't get Master Kerberos principal for the RM to use as renewer");
+ }
+
+ // For now, only getting tokens for the default file-system.
+ final Token> tokens[] =
+ fs.addDelegationTokens(tokenRenewer, credentials);
+ if (tokens != null) {
+ for (Token> token : tokens) {
+ LOG.info("Got dt for " + fs.getUri() + "; " + token);
+ }
+ }
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ amContainer.setTokens(fsTokens);
+ }
+
+ appContext.setAMContainerSpec(amContainer);
+
+ // Create the request to send to the ApplicationsManager
+// SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
+// appRequest.setApplicationSubmissionContext(appContext);
+ //job.getApplicationsManager().submitApplication(appRequest);
+ ApplicationId appId = appContext.getApplicationId();
+ LOG.info("Submitting application " + appId);
+ yarnClient.submitApplication(appContext);
+
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.debug("Thread sleep in monitoring loop interrupted");
+ }
+
+ // Get application report for the appId we are interested in
+ ApplicationReport report = yarnClient.getApplicationReport(appId);
+
+ LOG.info("Got application report from ASM for" + ", appId="
+ + appId.getId() + ", clientToAMToken="
+ + report.getClientToAMToken() + ", appDiagnostics="
+ + report.getDiagnostics() + ", appMasterHost="
+ + report.getHost() + ", appQueue=" + report.getQueue()
+ + ", appMasterRpcPort=" + report.getRpcPort()
+ + ", appStartTime=" + report.getStartTime()
+ + ", yarnAppState="
+ + report.getYarnApplicationState().toString()
+ + ", distributedFinalState="
+ + report.getFinalApplicationStatus().toString()
+ + ", appTrackingUrl=" + report.getTrackingUrl()
+ + ", appUser=" + report.getUser());
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ FinalApplicationStatus dsStatus = report
+ .getFinalApplicationStatus();
+ if (YarnApplicationState.FINISHED == state) {
+ if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+ LOG.info("Application has completed successfully. Breaking monitoring loop");
+ return new NetworkedJob();
+ } else {
+ LOG.info("Application did finished unsuccessfully."
+ + " YarnState=" + state.toString()
+ + ", DSFinalStatus=" + dsStatus.toString()
+ + ". Breaking monitoring loop");
+ return null;
+ }
+ } else if (YarnApplicationState.KILLED == state
+ || YarnApplicationState.FAILED == state) {
+ LOG.info("Application did not finish." + " YarnState="
+ + state.toString() + ", DSFinalStatus="
+ + dsStatus.toString() + ". Breaking monitoring loop");
+ return null;
+ }
+ }
+// GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
+// reportRequest.setApplicationId(id);
+// while (report == null || report.getHost().equals("N/A")) {
+// GetApplicationReportResponse reportResponse = job.getApplicationsManager().getApplicationReport(reportRequest);
+// report = reportResponse.getApplicationReport();
+// try {
+// Thread.sleep(1000L);
+// } catch (InterruptedException e) {
+// LOG.error("Got interrupted while waiting for a response report from AM.", e);
+// }
+// }
+// LOG.info("Got report: " + report.getApplicationId() + " " + report.getHost() + ":" + report.getRpcPort());
+ //return new NetworkedJob();
+ } catch (YarnException e) {
+ e.printStackTrace();
+ return null;
}
- LOG.info("Got report: " + report.getApplicationId() + " "
- + report.getHost() + ":" + report.getRpcPort());
- return new NetworkedJob();
}
- @Override
- protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
- return Math.max(1, limitTasks);
- }
+// @Override
+// protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
+// return Math.max(1, limitTasks);
+// }
@Override
public Path getSystemDir() {
Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy)
@@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
@@ -57,8 +58,13 @@
InterruptedException, ClassNotFoundException {
HamaConfiguration conf = new HamaConfiguration();
// TODO some keys that should be within a conf
- conf.set("yarn.resourcemanager.address", "0.0.0.0:8040");
- conf.set("bsp.local.dir", "/tmp/bsp-yarn/");
+ conf.set("yarn.resourcemanager.address", "slave1.hama.com:8050");
+ conf.set("bsp.user.name", "minho");
+ conf.set("bsp.local.dir", "/root/");
+ conf.set("bsp.working.dir", "/user/root/bsp-yarn/");
+ conf.set("hama.zookeeper.quorum", "slave1.hama.com,slave2.hama.com,master.hama.com");
+ conf.set("hama.zookeeper.property.clientPort", "2181");
+ conf.setInt(Constants.MAX_TASKS, 10);
YARNBSPJob job = new YARNBSPJob(conf);
job.setBspClass(HelloBSP.class);