Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1654753) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -303,10 +303,14 @@ BSPJob job = pJob; job.setJobID(jobId); - ClusterStatus clusterStatus = getClusterStatus(true); - int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, - clusterStatus.getMaxTasks() - clusterStatus.getTasks()); + int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS, 0); + if(maxTasks == 0) { + ClusterStatus clusterStatus = getClusterStatus(true); + maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, + clusterStatus.getMaxTasks() - clusterStatus.getTasks()); + } + if (maxTasks < job.getNumBspTask()) { LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with " + maxTasks + " tasks."); Index: pom.xml =================================================================== --- pom.xml (revision 1654753) +++ pom.xml (working copy) @@ -320,6 +320,7 @@ ml mesos dist + yarn Index: yarn/pom.xml =================================================================== --- yarn/pom.xml (revision 1654753) +++ yarn/pom.xml (working copy) @@ -19,7 +19,7 @@ org.apache.hama hama-parent - 0.6.3-SNAPSHOT + 0.7.0-SNAPSHOT 4.0.0 @@ -26,13 +26,15 @@ org.apache.hama hama-yarn yarn - 0.6.3-SNAPSHOT + 0.7.0-SNAPSHOT jar - 1.2.0 + - + org.apache.hama @@ -54,27 +56,41 @@ avro 1.5.3 - org.apache.hadoop hadoop-yarn-api - 0.23.1 + ${hadoop.version} org.apache.hadoop hadoop-yarn-common - 0.23.1 + ${hadoop.version} org.apache.hadoop hadoop-yarn-server-tests - 0.23.1 + ${hadoop.version} + + org.apache.hadoop + hadoop-yarn-client + ${hadoop.version} + org.apache.zookeeper zookeeper + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-yarn-client + 2.4.0.2.1.5.0-695 + Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy) @@ -20,6 +20,8 @@ import java.io.DataInputStream; import java.io.IOException; import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.Collection; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -33,13 +35,16 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RPC.Server; +//import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.SystemClock; -import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -46,10 +51,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; +//import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.Job.JobState; import org.apache.hama.bsp.sync.SyncServerRunner; @@ -56,6 +65,8 @@ import org.apache.hama.bsp.sync.SyncServiceFactory; import org.apache.hama.ipc.BSPPeerProtocol; import org.apache.hama.ipc.HamaRPCProtocolVersion; +import org.apache.hama.ipc.RPC; +import org.apache.hama.ipc.Server; import org.apache.hama.util.BSPNetUtils; /** @@ -73,8 +84,10 @@ private Clock clock; private YarnRPC yarnRPC; - private AMRMProtocol amrmRPC; + private RPC rpc; + private ApplicationMasterProtocol amrmRPC; + private ApplicationAttemptId appAttemptId; private String applicationName; private long startTime; @@ -104,6 +117,7 @@ } this.jobFile = args[0]; + LOG.info("jobFile >>>> " + jobFile); this.localConf = new YarnConfiguration(); this.jobConf = getSubmitConfiguration(jobFile); @@ -159,14 +173,23 @@ */ private void startRPCServers() throws IOException { // start the RPC server which talks to the client - this.clientServer = RPC.getServer(BSPClient.class, this, hostname, - clientPort, jobConf); + InetSocketAddress BSPClientAddress = jobConf.getSocketAddr(hostname, hostname, clientPort); + LOG.info("BSPClientAddress >>>> " + BSPClientAddress); + this.clientServer = rpc.getServer(BSPClient.class, hostname, clientPort, jobConf); +// this.clientServer = rpc.getServer(BSPClient.class, this, BSPClientAddress, jobConf, null, +// jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, +// YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); this.clientServer.start(); // start the RPC server which talks to the tasks this.taskServerPort = BSPNetUtils.getFreePort(10000); - this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname, - taskServerPort, jobConf); + InetSocketAddress BSPPeerProtocolAddress = jobConf + .getSocketAddr(hostname, hostname, taskServerPort); + this.taskServer = rpc.getServer(BSPPeerProtocol.class, hostname, taskServerPort, jobConf); +// this.taskServer = rpc.getServer(BSPPeerProtocol.class, this, BSPPeerProtocolAddress, +// jobConf, null, +// jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, +// YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); this.taskServer.start(); // readjusting the configuration to let the tasks know where we are. this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort); @@ -191,28 +214,57 @@ * @param yarnConf * @return a new RPC connection to the Resource Manager. */ - private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) { + private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) throws IOException { // Connect to the Scheduler of the ResourceManager. - InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( + UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(appAttemptId.toString()); + LOG.info("current User & appAttemptId : " + currentUser + " " + appAttemptId.toString()); + Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + final InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); + Token amRMToken = setupAndReturnAMRMToken(rmAddress, credentials.getAllTokens()); + currentUser.addToken(amRMToken); + final Configuration conf = yarnConf; + ApplicationMasterProtocol client = currentUser + .doAs(new PrivilegedAction() { + @Override + public ApplicationMasterProtocol run() { + return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, conf); + } + }); LOG.info("Connecting to ResourceManager at " + rmAddress); - return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress, - yarnConf); + return client; +// return (ApplicationMasterProtocol)yarnRPC +// .getProxy(ApplicationMasterProtocol.class, rmAddress, yarnConf); } + @SuppressWarnings("unchecked") + private Token setupAndReturnAMRMToken( + InetSocketAddress rmBindAddress, + Collection> allTokens) { + for (Token token : allTokens) { + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + SecurityUtil.setTokenService(token, rmBindAddress); + return token; + } + } + return null; + } + + /** * Registers this application master with the Resource Manager and retrieves a * response which is used to launch additional containers. */ private static RegisterApplicationMasterResponse registerApplicationMaster( - AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID, + ApplicationMasterProtocol resourceManager, ApplicationAttemptId appAttemptID, String appMasterHostName, int appMasterRpcPort, - String appMasterTrackingUrl) throws YarnRemoteException { + String appMasterTrackingUrl) throws YarnException, IOException { + RegisterApplicationMasterRequest appMasterRequest = Records .newRecord(RegisterApplicationMasterRequest.class); - appMasterRequest.setApplicationAttemptId(appAttemptID); + //appMasterRequest.setApplicationAttemptId(appAttemptID); appMasterRequest.setHost(appMasterHostName); appMasterRequest.setRpcPort(appMasterRpcPort); // TODO tracking URL @@ -219,7 +271,7 @@ appMasterRequest.setTrackingUrl(appMasterTrackingUrl); RegisterApplicationMasterResponse response = resourceManager .registerApplicationMaster(appMasterRequest); - LOG.debug("ApplicationMaster has maximum resource capability of: " + LOG.info("ApplicationMaster has maximum resource capability of: " + response.getMaximumResourceCapability().getMemory()); return response; } @@ -234,12 +286,13 @@ private static ApplicationAttemptId getApplicationAttemptId() throws IOException { Map envs = System.getenv(); - if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { + if (!envs.containsKey(Environment.CONTAINER_ID.name())) { throw new IllegalArgumentException( "ApplicationAttemptId not set in the environment"); } + return ConverterUtils.toContainerId( - envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)) + envs.get(Environment.CONTAINER_ID.name())) .getApplicationAttemptId(); } @@ -247,6 +300,7 @@ JobState finalState = null; try { job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId); + LOG.info("job : " + job.getState()); finalState = job.startJob(); } finally { if (finalState != null) { @@ -259,7 +313,7 @@ } } - private void cleanup() throws YarnRemoteException { + private void cleanup() throws YarnException, IOException { syncServer.stop(); if (threadPool != null && !threadPool.isShutdown()) { threadPool.shutdownNow(); @@ -268,29 +322,31 @@ taskServer.stop(); FinishApplicationMasterRequest finishReq = Records .newRecord(FinishApplicationMasterRequest.class); - finishReq.setAppAttemptId(appAttemptId); + //finishReq.setAppAttemptId(appAttemptId); switch (job.getState()) { case SUCCESS: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); break; case KILLED: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED); break; case FAILED: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); break; default: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); } this.amrmRPC.finishApplicationMaster(finishReq); } - public static void main(String[] args) throws YarnRemoteException { + public static void main(String[] args) throws YarnException, IOException { // we expect getting the qualified path of the job.xml as the first // element in the arguments BSPApplicationMaster master = null; try { + LOG.info("Start Application Master!!!!!"); master = new BSPApplicationMaster(args); + LOG.info("master : " + master.getCurrentSuperStep()); master.start(); } catch (Exception e) { LOG.fatal("Error starting BSPApplicationMaster", e); Index: yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (working copy) @@ -37,7 +37,7 @@ private static final Log LOG = LogFactory.getLog(BSPRunner.class); - private Configuration conf; + private HamaConfiguration conf; private TaskAttemptID id; private BSPPeerImpl peer; private Counters counters = new Counters(); Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy) @@ -17,11 +17,13 @@ */ package org.apache.hama.bsp; +import java.io.File; import java.io.IOException; import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Collections; +import java.nio.ByteBuffer; +import java.util.*; +import com.sun.tools.javac.util.Convert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,20 +30,20 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.ContainerManager; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.URL; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl; +import org.apache.hadoop.yarn.api.protocolrecords.*; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -51,17 +53,20 @@ private final Container allocatedContainer; private final int id; - private final ContainerManager cm; + private final ContainerManagementProtocol cm; private final Configuration conf; private String user; private final Path jobFile; private final BSPJobID jobId; - private GetContainerStatusRequest statusRequest; + private ByteBuffer allTokens; + private UserGroupInformation appSubmitterUgi; - public BSPTaskLauncher(int id, Container container, ContainerManager cm, + private GetContainerStatusesRequest statusRequest; + + public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm, Configuration conf, Path jobFile, BSPJobID jobId) - throws YarnRemoteException { + throws YarnException { this.id = id; this.cm = cm; this.conf = conf; @@ -71,7 +76,8 @@ // FIXME why does this contain mapreduce here? this.user = conf.get("bsp.user.name"); if (this.user == null) { - this.user = conf.get("mapreduce.job.user.name"); + //this.user = conf.get("mapreduce.job.user.name"); + user = System.getenv(ApplicationConstants.Environment.USER.name()); } } @@ -80,14 +86,15 @@ stopAndCleanup(); } - public void stopAndCleanup() throws YarnRemoteException { - StopContainerRequest stopRequest = Records - .newRecord(StopContainerRequest.class); - stopRequest.setContainerId(allocatedContainer.getId()); - cm.stopContainer(stopRequest); + public void stopAndCleanup() throws YarnException, IOException { + StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class); + List containerIds = new ArrayList(); + containerIds.add(allocatedContainer.getId()); + stopRequest.setContainerIds(containerIds); + cm.stopContainers(stopRequest); } - public void start() throws IOException { + public void start() throws IOException, YarnException { LOG.info("Spawned task with id: " + this.id + " for allocated container id: " + this.allocatedContainer.getId().toString()); @@ -103,20 +110,28 @@ */ public BSPTaskStatus poll() throws Exception { - ContainerStatus lastStatus; - if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus()) - .getState() != ContainerState.COMPLETE) { - return null; + ContainerStatus lastStatus = null; + GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest); + List containerStatuses = getContainerStatusesResponse.getContainerStatuses(); + LOG.info("getContainerStatusesREsponse : " + getContainerStatusesResponse); + LOG.info("ContainerStatuses : " + containerStatuses); + LOG.info("ContainerStatuses Size : " + containerStatuses.size()); + for (ContainerStatus containerStatus : containerStatuses) { + LOG.info("ContainerStatus getStatus : " + containerStatus.getState()); + if ((lastStatus = containerStatus).getState() != ContainerState.COMPLETE) { + return null; + } + LOG.info(this.id + "\tLast report comes with exitstatus of " + lastStatus.getExitStatus() + " and diagnose string of " + lastStatus.getDiagnostics()); } - LOG.info(this.id + "\tLast report comes with existatus of " - + lastStatus.getExitStatus() + " and diagnose string of " - + lastStatus.getDiagnostics()); + + if (lastStatus == null) + return new BSPTaskStatus(id, -1000); + return new BSPTaskStatus(id, lastStatus.getExitStatus()); } - private GetContainerStatusRequest setupContainer( - Container allocatedContainer, ContainerManager cm, String user, int id) - throws IOException { + private GetContainerStatusesRequest setupContainer( + Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException { LOG.info("Setting up a container for user " + user + " with id of " + id + " and containerID of " + allocatedContainer.getId() + " as " + user); // Now we setup a ContainerLaunchContext @@ -123,16 +138,48 @@ ContainerLaunchContext ctx = Records .newRecord(ContainerLaunchContext.class); - ctx.setContainerId(allocatedContainer.getId()); - ctx.setResource(allocatedContainer.getResource()); - ctx.setUser(user); +// ctx.setContainerId(allocatedContainer.getId()); +// ctx.setResource(allocatedContainer.getResource()); +// ctx.setUser(user); + // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class + // are marked as LimitedPrivate + + /* + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials();/ + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + // Now remove the AM->RM token so that containers cannot access it. + Iterator> iter = credentials.getAllTokens().iterator(); + LOG.info("Executing with tokens:"); + while (iter.hasNext()) { + Token token = iter.next(); + LOG.info("token is " + token); + if (token.getKind().equals(ContainerTokenIdentifier.KIND)) { + iter.remove(); + } + } + allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ctx.setTokens(allTokens.duplicate()); + + + // Create appSubmitterUgi and add original tokens to it + String appSubmitterUserName = + System.getenv(ApplicationConstants.Environment.USER.name()); + appSubmitterUgi = + UserGroupInformation.createRemoteUser(appSubmitterUserName); + appSubmitterUgi.addCredentials(credentials); + */ + + /* * jar */ + Map localResources = new HashMap(); LocalResource packageResource = Records.newRecord(LocalResource.class); FileSystem fs = FileSystem.get(conf); - Path packageFile = new Path(conf.get("bsp.jar")); + Path packageFile = new Path(System.getenv("HAMAYARNJARLOCATION")); // FIXME there seems to be a problem with the converter utils and URL // transformation URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile @@ -147,14 +194,32 @@ FileStatus fileStatus = fs.getFileStatus(packageFile); packageResource.setResource(packageUrl); - packageResource.setSize(fileStatus.getLen()); - packageResource.setTimestamp(fileStatus.getModificationTime()); - packageResource.setType(LocalResourceType.ARCHIVE); + //packageResource.setSize(fileStatus.getLen()); + //packageResource.setTimestamp(fileStatus.getModificationTime()); + packageResource.setSize(Long.parseLong(System.getenv("HAMAYARNJARSIZE"))); + packageResource.setTimestamp(Long.parseLong(System.getenv("HAMAYARNJARTIMESTAMP"))); + packageResource.setType(LocalResourceType.FILE); packageResource.setVisibility(LocalResourceVisibility.APPLICATION); - LOG.info("Package resource: " + packageResource.getResource()); - ctx.setLocalResources(Collections.singletonMap("package", packageResource)); + LocalResource hamaResource = Records.newRecord(LocalResource.class); + Path hamaCoreJar = new Path(System.getenv("HAMACOREJARLOCATION")); + URL hamaCoreUrl = ConverterUtils.getYarnUrlFromPath(hamaCoreJar + .makeQualified(fs.getUri(), fs.getWorkingDirectory())); + FileStatus HamaCoreStatus = fs.getFileStatus(hamaCoreJar); + hamaResource.setResource(hamaCoreUrl); + hamaResource.setSize(HamaCoreStatus.getLen()); + hamaResource.setTimestamp(HamaCoreStatus.getModificationTime()); + hamaResource.setType(LocalResourceType.FILE); + hamaResource.setVisibility(LocalResourceVisibility.APPLICATION); + + localResources.put("HamaCore.jar", hamaResource); + localResources.put("AppMaster.jar", packageResource); + LOG.info("LocalResources" + localResources); + + //ctx.setLocalResources(Collections.singletonMap("package", packageResource)); + ctx.setLocalResources(localResources); + /* * TODO Package classpath seems not to work if you're in pseudo distributed * mode, because the resource must not be moved, it will never be unpacked. @@ -161,32 +226,87 @@ * So we will check if our jar file has the file:// prefix and put it into * the CP directly */ - String cp = "$CLASSPATH:./*:./package/*:./*:"; + String cp = "$CLASSPATH:./*:./package/*:./*:./AppMaster.jar:./HamaCore.jar"; if (packageUrl.getScheme() != null && packageUrl.getScheme().equals("file")) { cp += packageFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) .toString() + ":"; LOG.info("Localized file scheme detected, adjusting CP to: " + cp); } - String[] cmds = { - "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" " - + BSPRunner.class.getCanonicalName(), - jobId.getJtIdentifier(), - id + "", - this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString(), - " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", - " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" }; - ctx.setCommands(Arrays.asList(cmds)); - LOG.info("Starting command: " + Arrays.toString(cmds)); + String containerId = allocatedContainer.getId().toString(); + String applicationId = allocatedContainer.getId().getApplicationAttemptId() + .getApplicationId().toString(); + + String containerHome = conf.get("yarn.nodemanager.local-dirs") + + File.separator + ContainerLocalizer.USERCACHE + + File.separator + + System.getenv().get(ApplicationConstants.Environment.USER.toString()) + + File.separator + ContainerLocalizer.APPCACHE + + File.separator + applicationId + File.separator + + containerId; + cp = "$CLASSPATH:./*:" + containerHome + "/AppMaster.jar"; + + StringBuilder classPathEnv = new StringBuilder( + ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar) + .append("./*"); + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(c.trim()); + } + //classPathEnv +// String[] cmds = { +// "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" " +// + BSPRunner.class.getCanonicalName(), +// jobId.getJtIdentifier(), +// id + "", +// this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) +// .toString(), +// " 1>/var/log/hadoop-yarn/yarn/hama-yarncontainer.stdout", +// " 2>/var/log/hadoop-yarn/yarn/hama-yarncontainer.stderr" }; +// //" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", +// //" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" }; + Vector vargs = new Vector(); + vargs.add("${JAVA_HOME}/bin/java"); + vargs.add("-cp " + classPathEnv + ""); + //vargs.add("-cp " + cp + ""); + LOG.info("ClassPath : " + cp); + vargs.add(BSPRunner.class.getCanonicalName()); + vargs.add(jobId.getJtIdentifier()); + vargs.add(Integer.toString(id)); + vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) + .toString()); + vargs.add("1>/var/log/hadoop/yarn/hama-yarncontainer.stdout"); + vargs.add("2>/var/log/hadoop/yarn/hama-yarncontainer.stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + List commands = new ArrayList(); + commands.add(command.toString()); + + ctx.setCommands(commands); + LOG.info("Starting command: " + commands); + StartContainerRequest startReq = Records .newRecord(StartContainerRequest.class); startReq.setContainerLaunchContext(ctx); - cm.startContainer(startReq); + startReq.setContainerToken(allocatedContainer.getContainerToken()); - GetContainerStatusRequest statusReq = Records - .newRecord(GetContainerStatusRequest.class); - statusReq.setContainerId(allocatedContainer.getId()); + List list = new ArrayList(); + list.add(startReq); + StartContainersRequest requestList = StartContainersRequest.newInstance(list); + cm.startContainers(requestList); + + GetContainerStatusesRequest statusReq = Records + .newRecord(GetContainerStatusesRequest.class); + List containerIds = new ArrayList(); + containerIds.add(allocatedContainer.getId()); + statusReq.setContainerIds(containerIds); return statusReq; } Index: yarn/src/main/java/org/apache/hama/bsp/Job.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/Job.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/Job.java (working copy) @@ -17,8 +17,10 @@ */ package org.apache.hama.bsp; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import java.io.IOException; + /** * Main interface to interact with the job. Provides only getters. */ @@ -34,7 +36,7 @@ public JobState startJob() throws Exception; - public void cleanup() throws YarnRemoteException; + public void cleanup() throws YarnException, IOException; JobState getState(); Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy) @@ -17,34 +17,43 @@ */ package org.apache.hama.bsp; +import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +//import org.apache.hadoop.security.token.Token; +//import org.apache.hadoop.security.token.TokenIdentifier; +//import org.apache.hadoop.security.token.*; +//import org.apache.hadoop.security.token.Token; +//import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus; @@ -66,7 +75,7 @@ private ApplicationAttemptId appAttemptId; private YarnRPC yarnRPC; - private AMRMProtocol resourceManager; + private ApplicationMasterProtocol resourceManager; private List allocatedContainers; private List releasedContainers = Collections.emptyList(); @@ -76,9 +85,19 @@ private int lastResponseID = 0; + private int getMemoryRequirements() { + String newMemoryProperty = conf.get("bsp.child.mem.in.mb"); + if (newMemoryProperty == null) { + LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts..."); + return getMemoryFromOptString(childOpts); + } else { + return Integer.valueOf(newMemoryProperty); + } + } + public JobImpl(ApplicationAttemptId appAttemptId, - Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC, - String jobFile, BSPJobID jobId) { + Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC, + String jobFile, BSPJobID jobId) { super(); this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1); this.appAttemptId = appAttemptId; @@ -91,32 +110,30 @@ this.childOpts = conf.get("bsp.child.java.opts"); this.taskMemoryInMb = getMemoryRequirements(); + LOG.info("Memory per task: " + taskMemoryInMb + "m!"); } - private int getMemoryRequirements() { - String newMemoryProperty = conf.get("bsp.child.mem.in.mb"); - if (newMemoryProperty == null) { - LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts..."); - return getMemoryFromOptString(childOpts); - } else { - return Integer.valueOf(newMemoryProperty); + // This really needs a testcase + private static int getMemoryFromOptString(String opts) { + if (opts == null) { + return DEFAULT_MEMORY_MB; } - } - // This really needs a testcase - private static int getMemoryFromOptString(String opts) { if (!opts.contains("-Xmx")) { LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!"); return DEFAULT_MEMORY_MB; } else { // e.G: -Xmx512m + int startIndex = opts.indexOf("-Xmx") + 4; - int endIndex = opts.indexOf(" ", startIndex); - String xmxString = opts.substring(startIndex, endIndex); + //int endIndex = opts.indexOf(" ", startIndex); + //LOG.info("start & end index : " + startIndex + ":" + endIndex); + //String xmxString = opts.substring(startIndex, endIndex); + String xmxString = opts.substring(startIndex); char qualifier = xmxString.charAt(xmxString.length() - 1); int memory = Integer.valueOf(xmxString.substring(0, - xmxString.length() - 2)); + xmxString.length() - 1)); if (qualifier == 'm') { return memory; } else if (qualifier == 'g') { @@ -133,28 +150,39 @@ public JobState startJob() throws Exception { this.allocatedContainers = new ArrayList(numBSPTasks); + List nmTokenList = null; + LOG.info("allocatedContainer size : " + allocatedContainers.size() + + " & numBSPTasks : " + numBSPTasks); while (allocatedContainers.size() < numBSPTasks) { - AllocateRequest req = BuilderUtils.newAllocateRequest( - appAttemptId, - lastResponseID, - 0.0f, - createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), - taskMemoryInMb, priority), releasedContainers); +// AllocateRequest req = BuilderUtils.newAllocateRequest( +// appAttemptId, +// lastResponseID, +// 0.0f, +// createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), +// taskMemoryInMb, priority), releasedContainers); + AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f, + createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb, + priority), releasedContainers, null); + AllocateResponse allocateResponse = resourceManager.allocate(req); - AMResponse amResponse = allocateResponse.getAMResponse(); - LOG.info("Got response! ID: " + amResponse.getResponseId() + LOG.info("Got response! ID: " + allocateResponse.getResponseId() + " with num of containers: " - + amResponse.getAllocatedContainers().size() + + allocateResponse.getAllocatedContainers().size() + " and following resources: " - + amResponse.getAvailableResources().getMemory() + "mb"); - this.lastResponseID = amResponse.getResponseId(); + + allocateResponse.getAvailableResources().getMemory() + "mb"); + this.lastResponseID = allocateResponse.getResponseId(); // availableResources = amResponse.getAvailableResources(); - this.allocatedContainers.addAll(amResponse.getAllocatedContainers()); - LOG.info("Waiting to allocate " - + (numBSPTasks - allocatedContainers.size()) + " more containers..."); + + this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); + LOG.info("allocatedContainer.size : " + allocatedContainers.size() + " numBSPTasks : " + numBSPTasks); + + LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers..."); + + nmTokenList = allocateResponse.getNMTokens(); + Thread.sleep(1000l); } @@ -166,20 +194,42 @@ + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" - + allocatedContainer.getNodeHttpAddress() + ", containerState" - + allocatedContainer.getState() + ", containerResourceMemory" + + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); // Connect to ContainerManager on the allocated container - String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":" - + allocatedContainer.getNodeId().getPort(); - InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); - ContainerManager cm = (ContainerManager) yarnRPC.getProxy( - ContainerManager.class, cmAddress, conf); + String user = conf.get("bsp.user.name"); + if (user == null) { + user = System.getenv(ApplicationConstants.Environment.USER.name()); + } + LOG.info("AppAttemptId : " + appAttemptId + ", AllocatedContainer NodeId : " + + allocatedContainer.getNodeId() + ", User : " + user); + //NMTokenSecretManagerInRM secretMgr = NMTokenSecretManagerInRM.newInstance('') + //LOG.info("secretManager : " + secretMgr.getCurrentKey()); + Token nmToken = null; + for (NMToken token : nmTokenList) { + nmToken = token.getToken(); + } + ContainerManagementProtocol cm = null; + + try { + cm = getContainerManagementProtocolProxy(yarnRPC, + nmToken, allocatedContainer.getNodeId(), user); + } catch (Exception e) { + LOG.error("failed to create cm!!!"); + if (cm != null) + yarnRPC.stopProxy(cm, conf); + e.printStackTrace(); + } + + LOG.info("ContainerId" + allocatedContainer.getId()); + BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id, allocatedContainer, cm, conf, jobFile, jobId); + LOG.info("allocate container : " + allocatedContainer.getId()); + launchers.put(id, runnableLaunchContainer); runnableLaunchContainer.start(); completionQueue.add(runnableLaunchContainer); @@ -186,6 +236,7 @@ id++; } LOG.info("Waiting for tasks to finish..."); + LOG.info(completionQueue.size()); state = JobState.RUNNING; int completed = 0; while (completed != numBSPTasks) { @@ -214,14 +265,50 @@ return state; } + /* + private org.apache.hadoop.security.token.Token setupAndReturnAMRMToken( + InetSocketAddress rmBindAddress, + Collection> allTokens) { + for (org.apache.hadoop.security.token.Token token : allTokens) { + if (token.getKind().equals(NMTokenIdentifier.KIND)) { + SecurityUtil.setTokenService(token, rmBindAddress); + return (org.apache.hadoop.security.token.Token) token; + } + } + return null; + }*/ + + protected ContainerManagementProtocol getContainerManagementProtocolProxy( + final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) { + ContainerManagementProtocol proxy; + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + final InetSocketAddress addr = + NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort()); + if (nmToken != null) { + ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr)); + } + + proxy = ugi + .doAs(new PrivilegedAction() { + @Override + public ContainerManagementProtocol run() { + LOG.info("ContainerManagementProtocol running!!!"); + return (ContainerManagementProtocol) rpc.getProxy( + ContainerManagementProtocol.class, + addr, conf); + } + }); + return proxy; + } + /** * Makes a lookup for the taskid and stops its container and task. It also * removes the task from the launcher so that we won't have to stop it twice. * * @param id - * @throws YarnRemoteException + * @throws YarnException */ - private void cleanupTask(int id) throws YarnRemoteException { + private void cleanupTask(int id) throws YarnException, IOException { BSPTaskLauncher bspTaskLauncher = launchers.get(id); bspTaskLauncher.stopAndCleanup(); launchers.remove(id); @@ -229,7 +316,7 @@ } @Override - public void cleanup() throws YarnRemoteException { + public void cleanup() throws YarnException, IOException { for (BSPTaskLauncher launcher : completionQueue) { launcher.stopAndCleanup(); } @@ -247,7 +334,7 @@ // whether a particular rack/host is needed // useful for applications that are sensitive // to data locality - rsrcRequest.setHostName("*"); + rsrcRequest.setResourceName("*"); // set the priority for the request Priority pri = Records.newRecord(Priority.class); Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (working copy) @@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.HamaConfiguration; @@ -46,7 +46,7 @@ private BSPClient client; private boolean submitted; private ApplicationReport report; - private ClientRMProtocol applicationsManager; + private ApplicationClientProtocol applicationsManager; private YarnRPC rpc; public YARNBSPJob(HamaConfiguration conf) throws IOException { @@ -58,8 +58,8 @@ YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); - this.applicationsManager = ((ClientRMProtocol) rpc.getProxy( - ClientRMProtocol.class, rmAddress, conf)); + this.applicationsManager = ((ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf)); } public void setMemoryUsedPerTaskInMb(int mem) { @@ -66,7 +66,7 @@ conf.setInt("bsp.child.mem.in.mb", mem); } - public void kill() throws YarnRemoteException { + public void kill() throws YarnException, IOException { if (submitClient != null) { KillApplicationRequest killRequest = Records .newRecord(KillApplicationRequest.class); @@ -95,45 +95,44 @@ this.submit(); } - client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID, - NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf); - GetApplicationReportRequest reportRequest = Records - .newRecord(GetApplicationReportRequest.class); - reportRequest.setApplicationId(submitClient.getId()); + client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID, NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf); + GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class); + reportRequest.setApplicationId(submitClient.getId()); - GetApplicationReportResponse reportResponse = applicationsManager - .getApplicationReport(reportRequest); - ApplicationReport localReport = reportResponse.getApplicationReport(); - long clientSuperStep = -1L; - while (localReport.getFinalApplicationStatus() != null - && localReport.getFinalApplicationStatus().ordinal() == 0) { - LOG.debug("currently in state: " - + localReport.getFinalApplicationStatus()); - if (verbose) { - long remoteSuperStep = client.getCurrentSuperStep().get(); - if (clientSuperStep < remoteSuperStep) { - clientSuperStep = remoteSuperStep; - LOG.info("Current supersteps number: " + clientSuperStep); + try { + GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest); + ApplicationReport localReport = reportResponse.getApplicationReport(); + long clientSuperStep = -1L; + while (localReport.getFinalApplicationStatus() != null && localReport.getFinalApplicationStatus().ordinal() == 0) { + LOG.debug("currently in state: " + localReport.getFinalApplicationStatus()); + if (verbose) { + long remoteSuperStep = client.getCurrentSuperStep().get(); + if (clientSuperStep < remoteSuperStep) { + clientSuperStep = remoteSuperStep; + LOG.info("Current supersteps number: " + clientSuperStep); + } } + reportResponse = applicationsManager.getApplicationReport(reportRequest); + localReport = reportResponse.getApplicationReport(); + + Thread.sleep(3000L); } - reportResponse = applicationsManager.getApplicationReport(reportRequest); - localReport = reportResponse.getApplicationReport(); - Thread.sleep(3000L); - } - if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) { - LOG.info("Job succeeded!"); - return true; - } else { - LOG.info("Job failed with status: " - + localReport.getFinalApplicationStatus().toString() + "!"); + if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) { + LOG.info("Job succeeded!"); + return true; + } else { + LOG.info("Job failed with status: " + localReport.getFinalApplicationStatus().toString() + "!"); + return false; + } + } catch (YarnException e) { + e.printStackTrace(); return false; } - } - ClientRMProtocol getApplicationsManager() { + ApplicationClientProtocol getApplicationsManager() { return applicationsManager; } Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy) @@ -17,9 +17,12 @@ */ package org.apache.hama.bsp; +import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -27,6 +30,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -33,14 +40,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.HamaConfiguration; @@ -61,6 +64,10 @@ Path submitJobFile, FileSystem pFs) throws IOException { YARNBSPJob job = (YARNBSPJob) normalJob; + YarnConfiguration conf = new YarnConfiguration(); + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); LOG.info("Submitting job..."); if (getConf().get("bsp.child.mem.in.mb") == null) { @@ -77,122 +84,246 @@ LOG.debug("Retrieved username: " + s); } - GetNewApplicationRequest request = Records - .newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse response = job.getApplicationsManager() - .getNewApplication(request); - id = response.getApplicationId(); - LOG.debug("Got new ApplicationId=" + id); + try { + List listAclInfo = yarnClient.getQueueAclsInfo(); + for (QueueUserACLInfo aclInfo : listAclInfo) { + for (QueueACL userAcl : aclInfo.getUserAcls()) { + LOG.info("User ACL Info for Queue" + + ", queueName=" + aclInfo.getQueueName() + + ", userAcl=" + userAcl.name()); + } + } + GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); - // Create a new ApplicationSubmissionContext - ApplicationSubmissionContext appContext = Records - .newRecord(ApplicationSubmissionContext.class); - // set the ApplicationId - appContext.setApplicationId(this.id); - // set the application name - appContext.setApplicationName(job.getJobName()); + GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request); + id = response.getApplicationId(); + LOG.debug("Got new ApplicationId=" + id); - // Create a new container launch context for the AM's container - ContainerLaunchContext amContainer = Records - .newRecord(ContainerLaunchContext.class); + // Create a new ApplicationSubmissionContext + ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); + // set the ApplicationId + appContext.setApplicationId(this.id); + // set the application name + appContext.setApplicationName(job.getJobName()); - // Define the local resources required - Map localResources = new HashMap(); - // Lets assume the jar we need for our ApplicationMaster is available in - // HDFS at a certain known path to us and we want to make it available to - // the ApplicationMaster in the launched container - if (job.getJar() == null) { - throw new IllegalArgumentException( - "Jar must be set in order to run the application!"); - } - Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar"); - fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath); - LOG.debug("Copying app jar to " + jarPath); - getConf() - .set( - "bsp.jar", - jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString()); - FileStatus jarStatus = fs.getFileStatus(jarPath); - LocalResource amJarRsrc = Records.newRecord(LocalResource.class); - amJarRsrc.setType(LocalResourceType.FILE); - amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); - amJarRsrc.setTimestamp(jarStatus.getModificationTime()); - amJarRsrc.setSize(jarStatus.getLen()); - // this creates a symlink in the working directory - localResources.put("AppMaster.jar", amJarRsrc); - // Set the local resources into the launch context - amContainer.setLocalResources(localResources); + // Create a new container launch context for the AM's container + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - // Set up the environment needed for the launch context - Map env = new HashMap(); - // Assuming our classes or jars are available as local resources in the - // working directory from which the command will be run, we need to append - // "." to the path. - // By default, all the hadoop specific classpaths will already be available - // in $CLASSPATH, so we should be careful not to overwrite it. - String classPathEnv = "$CLASSPATH:./*:"; - env.put("CLASSPATH", classPathEnv); - amContainer.setEnvironment(env); + // Define the local resources required + Map localResources = new HashMap(); + // Lets assume the jar we need for our ApplicationMaster is available in + // HDFS at a certain known path to us and we want to make it available to + // the ApplicationMaster in the launched container + if (job.getJar() == null) { + throw new IllegalArgumentException("Jar must be set in order to run the application!"); + } + Path jarPath = new Path(fs.getHomeDirectory(), "hama-yarn.jar"); + LOG.info(fs.getHomeDirectory()); + LOG.info(job.getLocalPath("hama-yarn-0.7.0-SNAPSHOT.jar")); + fs.copyFromLocalFile(false, true, + new Path("/opt/hama-yarn-0.7.0-SNAPSHOT.jar"), jarPath); + //fs.copyFromLocalFile(false, true, job.getLocalPath(job.getJar()), jarPath); + LOG.info("DST : " + jarPath); + LOG.debug("Copying app jar to " + jarPath); + //getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()); + getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), jarPath).toString()); - // Construct the command to be executed on the launched container - String command = "${JAVA_HOME}" - + "/bin/java -cp " - + classPathEnv - + " " - + BSPApplicationMaster.class.getCanonicalName() - + " " - + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR - + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR - + "/stderr"; + FileStatus jarStatus = fs.getFileStatus(jarPath); + LocalResource amJarRsrc = Records.newRecord(LocalResource.class); + amJarRsrc.setType(LocalResourceType.FILE); + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); + amJarRsrc.setTimestamp(jarStatus.getModificationTime()); + LOG.info("hama-yarn jar timestamp : " + jarStatus.getModificationTime()); + amJarRsrc.setSize(jarStatus.getLen()); + // this creates a symlink in the working directory + localResources.put("AppMaster.jar", amJarRsrc); + //localResources.put("package", amJarRsrc); - LOG.debug("Start command: " + command); + String hamaYarnLocation = jarPath.toUri().toString(); + FileStatus hamaYarnFileStatus = fs.getFileStatus(jarPath); + long hamaYarnLen = hamaYarnFileStatus.getLen(); + long hamaYarnTimestamp = hamaYarnFileStatus.getModificationTime(); - amContainer.setCommands(Collections.singletonList(command)); + Path hamaCoreJarPath = new Path(fs.getHomeDirectory(), "hama-core.jar"); + fs.copyFromLocalFile(false, true, + new Path("/opt/hama-0.6.4/hama-core-0.7.0-SNAPSHOT.jar"), hamaCoreJarPath); + LOG.info("DST : " + hamaCoreJarPath); - Resource capability = Records.newRecord(Resource.class); - // we have at least 3 threads, which comsumes 1mb each, for each bsptask and - // a base usage of 100mb - capability.setMemory(3 * job.getNumBspTask() - + getConf().getInt("hama.appmaster.memory.mb", 100)); - LOG.info("Set memory for the application master to " - + capability.getMemory() + "mb!"); - amContainer.setResource(capability); + FileStatus hamaCoreJarStatus = fs.getFileStatus(hamaCoreJarPath); + LocalResource hamaCoreJarRsrc = Records.newRecord(LocalResource.class); + hamaCoreJarRsrc.setType(LocalResourceType.FILE); + hamaCoreJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + hamaCoreJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(hamaCoreJarPath)); + hamaCoreJarRsrc.setTimestamp(hamaCoreJarStatus.getModificationTime()); + hamaCoreJarRsrc.setSize(hamaCoreJarStatus.getLen()); + localResources.put("HamaCore.jar", hamaCoreJarRsrc); - // Set the container launch content into the ApplicationSubmissionContext - appContext.setAMContainerSpec(amContainer); + // Set the local resources into the launch context + amContainer.setLocalResources(localResources); - // Create the request to send to the ApplicationsManager - SubmitApplicationRequest appRequest = Records - .newRecord(SubmitApplicationRequest.class); - appRequest.setApplicationSubmissionContext(appContext); - job.getApplicationsManager().submitApplication(appRequest); + String hamaCoreLocation = hamaCoreJarPath.toUri().toString(); + FileStatus hamaCoreFileStatus = fs.getFileStatus(hamaCoreJarPath); + long hamaCoreLen = hamaCoreFileStatus.getLen(); + long hamaCoreTimestamp = hamaCoreFileStatus.getModificationTime(); - GetApplicationReportRequest reportRequest = Records - .newRecord(GetApplicationReportRequest.class); - reportRequest.setApplicationId(id); - while (report == null || report.getHost().equals("N/A")) { - GetApplicationReportResponse reportResponse = job - .getApplicationsManager().getApplicationReport(reportRequest); - report = reportResponse.getApplicationReport(); - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - LOG.error( - "Got interrupted while waiting for a response report from AM.", e); + // Set up the environment needed for the launch context + Map env = new HashMap(); + // Assuming our classes or jars are available as local resources in the + // working directory from which the command will be run, we need to append + // "." to the path. + // By default, all the hadoop specific classpaths will already be available + // in $CLASSPATH, so we should be careful not to overwrite it. + StringBuilder classPathEnv = new StringBuilder( + ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar) + .append("./*"); + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(c.trim()); } + + env.put("HAMAYARNJARLOCATION", hamaYarnLocation); + env.put("HAMAYARNJARSIZE", Long.toString(hamaYarnLen)); + env.put("HAMAYARNJARTIMESTAMP", Long.toString(hamaYarnTimestamp)); + LOG.info("hamayarn>>>>>>>>>>" + Long.toString(hamaYarnTimestamp)); + + env.put("HAMACOREJARLOCATION", hamaCoreLocation); + env.put("HAMACOREJARSIZE", Long.toString(hamaCoreLen)); + env.put("HAMACOREJARTIMESTAMP", Long.toString(hamaCoreTimestamp)); + LOG.info("hamacore>>>>>>>>>>" + Long.toString(hamaCoreTimestamp)); + + env.put("CLASSPATH", classPathEnv.toString()); + amContainer.setEnvironment(env); + + // Construct the command to be executed on the launched container + String command = "${JAVA_HOME}" + "/bin/java -classpath " + classPathEnv + " " + + BSPApplicationMaster.class.getCanonicalName() + " " + + jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + + " 1>/var/log/hadoop/yarn/hama-yarn.stdout" + + " 2>/var/log/hadoop/yarn/hama-yarn.stderr"; + + LOG.info("Start command: " + command); + LOG.debug("Start command: " + command); + + amContainer.setCommands(Collections.singletonList(command)); + + Resource capability = Records.newRecord(Resource.class); + // we have at least 3 threads, which comsumes 1mb each, for each bsptask and + // a base usage of 100mb + capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100)); + LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!"); + //amContainer.setResource(capability); + + // Set the container launch content into the ApplicationSubmissionContext + appContext.setResource(capability); + + + // Setup security tokens + if (UserGroupInformation.isSecurityEnabled()) { + // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce + LOG.info("SecurityEnable!!!"); + Credentials credentials = new Credentials(); + String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); + if (tokenRenewer == null || tokenRenewer.length() == 0) { + throw new IOException( + "Can't get Master Kerberos principal for the RM to use as renewer"); + } + + // For now, only getting tokens for the default file-system. + final Token tokens[] = + fs.addDelegationTokens(tokenRenewer, credentials); + if (tokens != null) { + for (Token token : tokens) { + LOG.info("Got dt for " + fs.getUri() + "; " + token); + } + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + amContainer.setTokens(fsTokens); + } + + appContext.setAMContainerSpec(amContainer); + + // Create the request to send to the ApplicationsManager +// SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class); +// appRequest.setApplicationSubmissionContext(appContext); + //job.getApplicationsManager().submitApplication(appRequest); + ApplicationId appId = appContext.getApplicationId(); + LOG.info("Submitting application " + appId); + yarnClient.submitApplication(appContext); + + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + + // Get application report for the appId we are interested in + ApplicationReport report = yarnClient.getApplicationReport(appId); + + LOG.info("Got application report from ASM for" + ", appId=" + + appId.getId() + ", clientToAMToken=" + + report.getClientToAMToken() + ", appDiagnostics=" + + report.getDiagnostics() + ", appMasterHost=" + + report.getHost() + ", appQueue=" + report.getQueue() + + ", appMasterRpcPort=" + report.getRpcPort() + + ", appStartTime=" + report.getStartTime() + + ", yarnAppState=" + + report.getYarnApplicationState().toString() + + ", distributedFinalState=" + + report.getFinalApplicationStatus().toString() + + ", appTrackingUrl=" + report.getTrackingUrl() + + ", appUser=" + report.getUser()); + + YarnApplicationState state = report.getYarnApplicationState(); + FinalApplicationStatus dsStatus = report + .getFinalApplicationStatus(); + if (YarnApplicationState.FINISHED == state) { + if (FinalApplicationStatus.SUCCEEDED == dsStatus) { + LOG.info("Application has completed successfully. Breaking monitoring loop"); + return new NetworkedJob(); + } else { + LOG.info("Application did finished unsuccessfully." + + " YarnState=" + state.toString() + + ", DSFinalStatus=" + dsStatus.toString() + + ". Breaking monitoring loop"); + return null; + } + } else if (YarnApplicationState.KILLED == state + || YarnApplicationState.FAILED == state) { + LOG.info("Application did not finish." + " YarnState=" + + state.toString() + ", DSFinalStatus=" + + dsStatus.toString() + ". Breaking monitoring loop"); + return null; + } + } +// GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class); +// reportRequest.setApplicationId(id); +// while (report == null || report.getHost().equals("N/A")) { +// GetApplicationReportResponse reportResponse = job.getApplicationsManager().getApplicationReport(reportRequest); +// report = reportResponse.getApplicationReport(); +// try { +// Thread.sleep(1000L); +// } catch (InterruptedException e) { +// LOG.error("Got interrupted while waiting for a response report from AM.", e); +// } +// } +// LOG.info("Got report: " + report.getApplicationId() + " " + report.getHost() + ":" + report.getRpcPort()); + //return new NetworkedJob(); + } catch (YarnException e) { + e.printStackTrace(); + return null; } - LOG.info("Got report: " + report.getApplicationId() + " " - + report.getHost() + ":" + report.getRpcPort()); - return new NetworkedJob(); } - @Override - protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { - return Math.max(1, limitTasks); - } +// @Override +// protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { +// return Math.max(1, limitTasks); +// } @Override public Path getSystemDir() { Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy) @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.NullWritable; +import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.sync.SyncException; @@ -57,8 +58,15 @@ InterruptedException, ClassNotFoundException { HamaConfiguration conf = new HamaConfiguration(); // TODO some keys that should be within a conf - conf.set("yarn.resourcemanager.address", "0.0.0.0:8040"); - conf.set("bsp.local.dir", "/tmp/bsp-yarn/"); + //conf.set("yarn.resourcemanager.address", "slave1.hama.com:8050"); + conf.set("yarn.resourcemanager.address", "localhost:8032"); + conf.set("bsp.user.name", "root"); + conf.set("bsp.local.dir", "/opt/"); + conf.set("bsp.working.dir", "/user/root/bsp-yarn/"); + conf.set("bsp.child.mem.in.mb", "256"); + //conf.set("hama.zookeeper.quorum", "slave1.hama.com,slave2.hama.com,master.hama.com"); + //conf.set("hama.zookeeper.property.clientPort", "2181"); + conf.setInt(Constants.MAX_TASKS, 10); YARNBSPJob job = new YARNBSPJob(conf); job.setBspClass(HelloBSP.class);