Index: conf/hama-site.xml =================================================================== --- conf/hama-site.xml (revision 1659147) +++ conf/hama-site.xml (working copy) @@ -22,4 +22,32 @@ */ --> + + bsp.master.address + host1.mydomain.com:40000 + The address of the bsp master server. Either the + literal string "local" or a host:port for distributed mode + + + + + fs.default.name + hdfs://host1.mydomain.com:9000/ + + The name of the default file system. Either the literal string + "local" or a host:port for HDFS. + + + + + hama.zookeeper.quorum + localhost + Comma separated list of servers in the ZooKeeper Quorum. + For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com". + By default this is set to localhost for local and pseudo-distributed modes + of operation. For a fully-distributed setup, this should be set to a full + list of ZooKeeper quorum servers. If HAMA_MANAGES_ZK is set in hama-env.sh + this is the list of servers which we will start/stop zookeeper on. + + Index: core/src/main/java/org/apache/hama/bsp/BSPJob.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJob.java (revision 1659147) +++ core/src/main/java/org/apache/hama/bsp/BSPJob.java (working copy) @@ -263,8 +263,7 @@ @SuppressWarnings({ "rawtypes" }) public InputFormat getInputFormat() { return ReflectionUtils.newInstance( - conf.getClass(Constants.INPUT_FORMAT_CLASS, TextInputFormat.class, - InputFormat.class), conf); + conf.getClass(Constants.INPUT_FORMAT_CLASS, TextInputFormat.class, InputFormat.class), conf); } @SuppressWarnings({ "rawtypes" }) Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1659147) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -56,6 +56,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; import org.apache.hama.ipc.HamaRPCProtocolVersion; @@ -303,10 +304,18 @@ BSPJob job = pJob; job.setJobID(jobId); - ClusterStatus clusterStatus = getClusterStatus(true); - int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, - clusterStatus.getMaxTasks() - clusterStatus.getTasks()); + int maxTasks = 0; + if (job.getConfiguration().getBoolean("hama.yarn.application", false)) { + YarnConfiguration yarnConf = new YarnConfiguration(job.getConfiguration()); + int maxMem = yarnConf.getInt("yarn.nodemanager.resource.memory-mb", 0); + int minAllocationMem = yarnConf.getInt("yarn.scheduler.minimum-allocation-mb", 1024); + maxTasks = maxMem / minAllocationMem; + } else { + ClusterStatus clusterStatus = getClusterStatus(true); + maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, clusterStatus.getMaxTasks() - clusterStatus.getTasks()); + } + if (maxTasks < job.getNumBspTask()) { LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with " + maxTasks + " tasks."); @@ -389,6 +398,7 @@ out.close(); } + return launchJob(jobId, job, submitJobFile, fs); } Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1659147) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -249,6 +249,7 @@ try { if (splitClass != null) { inputSplit = (InputSplit) ReflectionUtils.newInstance( + //Class.forName(splitClass), getConfiguration()); getConfiguration().getClassByName(splitClass), getConfiguration()); } } catch (ClassNotFoundException exp) { Index: core/src/main/java/org/apache/hama/bsp/NullInputFormat.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/NullInputFormat.java (revision 1659147) +++ core/src/main/java/org/apache/hama/bsp/NullInputFormat.java (working copy) @@ -81,26 +81,4 @@ } } - - public static class NullInputSplit implements InputSplit { - @Override - public long getLength() { - return 0; - } - - @Override - public String[] getLocations() { - String[] locs = {}; - return locs; - } - - @Override - public void readFields(DataInput in) throws IOException { - } - - @Override - public void write(DataOutput out) throws IOException { - } - } - } Index: core/src/main/java/org/apache/hama/bsp/NullInputSplit.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/NullInputSplit.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/NullInputSplit.java (working copy) @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class NullInputSplit implements InputSplit { + @Override + public long getLength() { + return 0; + } + + @Override + public String[] getLocations() { + String[] locs = {}; + return locs; + } + + @Override + public void readFields(DataInput in) throws IOException { + } + + @Override + public void write(DataOutput out) throws IOException { + } +} \ No newline at end of file Index: pom.xml =================================================================== --- pom.xml (revision 1659147) +++ pom.xml (working copy) @@ -319,6 +319,7 @@ examples ml mesos + yarn dist Index: yarn/pom.xml =================================================================== --- yarn/pom.xml (revision 1659147) +++ yarn/pom.xml (working copy) @@ -19,7 +19,7 @@ org.apache.hama hama-parent - 0.6.3-SNAPSHOT + 0.7.0-SNAPSHOT 4.0.0 @@ -26,13 +26,9 @@ org.apache.hama hama-yarn yarn - 0.6.3-SNAPSHOT + 0.7.0-SNAPSHOT jar - - 1.2.0 - - org.apache.hama @@ -54,27 +50,41 @@ avro 1.5.3 - org.apache.hadoop hadoop-yarn-api - 0.23.1 + ${hadoop.version} org.apache.hadoop hadoop-yarn-common - 0.23.1 + ${hadoop.version} org.apache.hadoop hadoop-yarn-server-tests - 0.23.1 + ${hadoop.version} + + org.apache.hadoop + hadoop-yarn-client + ${hadoop.version} + org.apache.zookeeper zookeeper + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-yarn-client + ${hadoop.version} + Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1659147) +++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy) @@ -19,7 +19,11 @@ import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; +import java.net.URI; +import java.security.PrivilegedAction; +import java.util.Collection; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,14 +36,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.SystemClock; -import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -46,18 +50,24 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.Job.JobState; import org.apache.hama.bsp.sync.SyncServerRunner; import org.apache.hama.bsp.sync.SyncServiceFactory; import org.apache.hama.ipc.BSPPeerProtocol; -import org.apache.hama.ipc.HamaRPCProtocolVersion; +import org.apache.hama.ipc.RPC; +import org.apache.hama.ipc.Server; import org.apache.hama.util.BSPNetUtils; + /** * BSPApplicationMaster is an application master for Apache Hamas BSP Engine. */ @@ -73,8 +83,10 @@ private Clock clock; private YarnRPC yarnRPC; - private AMRMProtocol amrmRPC; + private RPC rpc; + private ApplicationMasterProtocol amrmRPC; + private ApplicationAttemptId appAttemptId; private String applicationName; private long startTime; @@ -106,6 +118,7 @@ this.jobFile = args[0]; this.localConf = new YarnConfiguration(); this.jobConf = getSubmitConfiguration(jobFile); + fs = FileSystem.get(jobConf); this.applicationName = jobConf.get("bsp.job.name", ""); @@ -125,13 +138,15 @@ this.clientPort = BSPNetUtils.getFreePort(12000); // start our synchronization service - startSyncServer(); + //startSyncServer(); startRPCServers(); + /* * Make sure that this executes after the start the RPC servers, because we * are readjusting the configuration. */ + rewriteSubmitConfiguration(jobFile, jobConf); String jobSplit = jobConf.get("bsp.job.split.file"); @@ -146,7 +161,7 @@ } this.amrmRPC = getYarnRPCConnection(localConf); - registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, + registerApplicationMaster(amrmRPC, hostname, clientPort, "http://localhost:8080"); } @@ -159,15 +174,14 @@ */ private void startRPCServers() throws IOException { // start the RPC server which talks to the client - this.clientServer = RPC.getServer(BSPClient.class, this, hostname, - clientPort, jobConf); + this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf); this.clientServer.start(); // start the RPC server which talks to the tasks this.taskServerPort = BSPNetUtils.getFreePort(10000); - this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname, - taskServerPort, jobConf); + this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf); this.taskServer.start(); + // readjusting the configuration to let the tasks know where we are. this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort); } @@ -191,28 +205,55 @@ * @param yarnConf * @return a new RPC connection to the Resource Manager. */ - private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) { + private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) throws IOException { // Connect to the Scheduler of the ResourceManager. - InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( + UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(appAttemptId.toString()); + Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + + final InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); + + Token amRMToken = setupAndReturnAMRMToken(rmAddress, credentials.getAllTokens()); + currentUser.addToken(amRMToken); + + final Configuration conf = yarnConf; + + ApplicationMasterProtocol client = currentUser + .doAs(new PrivilegedAction() { + @Override + public ApplicationMasterProtocol run() { + return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, conf); + } + }); LOG.info("Connecting to ResourceManager at " + rmAddress); - return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress, - yarnConf); + return client; } + @SuppressWarnings("unchecked") + private Token setupAndReturnAMRMToken( + InetSocketAddress rmBindAddress, + Collection> allTokens) { + for (Token token : allTokens) { + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + SecurityUtil.setTokenService(token, rmBindAddress); + return token; + } + } + return null; + } + + /** * Registers this application master with the Resource Manager and retrieves a * response which is used to launch additional containers. */ private static RegisterApplicationMasterResponse registerApplicationMaster( - AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID, - String appMasterHostName, int appMasterRpcPort, - String appMasterTrackingUrl) throws YarnRemoteException { + ApplicationMasterProtocol resourceManager, String appMasterHostName, int appMasterRpcPort, + String appMasterTrackingUrl) throws YarnException, IOException { RegisterApplicationMasterRequest appMasterRequest = Records .newRecord(RegisterApplicationMasterRequest.class); - appMasterRequest.setApplicationAttemptId(appAttemptID); appMasterRequest.setHost(appMasterHostName); appMasterRequest.setRpcPort(appMasterRpcPort); // TODO tracking URL @@ -219,7 +260,7 @@ appMasterRequest.setTrackingUrl(appMasterTrackingUrl); RegisterApplicationMasterResponse response = resourceManager .registerApplicationMaster(appMasterRequest); - LOG.debug("ApplicationMaster has maximum resource capability of: " + LOG.info("ApplicationMaster has maximum resource capability of: " + response.getMaximumResourceCapability().getMemory()); return response; } @@ -234,12 +275,13 @@ private static ApplicationAttemptId getApplicationAttemptId() throws IOException { Map envs = System.getenv(); - if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { + if (!envs.containsKey(Environment.CONTAINER_ID.name())) { throw new IllegalArgumentException( "ApplicationAttemptId not set in the environment"); } + return ConverterUtils.toContainerId( - envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)) + envs.get(Environment.CONTAINER_ID.name())) .getApplicationAttemptId(); } @@ -259,8 +301,9 @@ } } - private void cleanup() throws YarnRemoteException { - syncServer.stop(); + private void cleanup() throws YarnException, IOException { + // syncServer.stop(); + if (threadPool != null && !threadPool.isShutdown()) { threadPool.shutdownNow(); } @@ -268,24 +311,23 @@ taskServer.stop(); FinishApplicationMasterRequest finishReq = Records .newRecord(FinishApplicationMasterRequest.class); - finishReq.setAppAttemptId(appAttemptId); switch (job.getState()) { case SUCCESS: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); break; case KILLED: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED); break; case FAILED: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); break; default: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); } this.amrmRPC.finishApplicationMaster(finishReq); } - public static void main(String[] args) throws YarnRemoteException { + public static void main(String[] args) throws YarnException, IOException { // we expect getting the qualified path of the job.xml as the first // element in the arguments BSPApplicationMaster master = null; @@ -301,17 +343,19 @@ } } - /* - * Some utility methods - */ - /** * Reads the configuration from the given path. */ - private static Configuration getSubmitConfiguration(String path) { + private static Configuration getSubmitConfiguration(String path) + throws IOException { Path jobSubmitPath = new Path(path); Configuration jobConf = new HamaConfiguration(); - jobConf.addResource(jobSubmitPath); + + FileSystem fs = FileSystem.get(URI.create(path), jobConf); + + InputStream in =fs.open(jobSubmitPath); + jobConf.addResource(in); + return jobConf; } @@ -326,6 +370,7 @@ FSDataOutputStream out = fs.create(jobSubmitPath); conf.writeXml(out); out.close(); + LOG.info("Written new configuration back to " + path); } @@ -340,12 +385,6 @@ } @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - return new ProtocolSignature(HamaRPCProtocolVersion.versionID, null); - } - - @Override public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) throws IOException, InterruptedException { if (taskStatus.getSuperstepCount() > superstep) { @@ -372,7 +411,7 @@ @Override public Task getTask(TaskAttemptID taskid) throws IOException { BSPJobClient.RawSplit assignedSplit = null; - String splitName = NullInputFormat.NullInputSplit.class.getCanonicalName(); + String splitName = NullInputSplit.class.getCanonicalName(); if (splits != null) { assignedSplit = splits[taskid.id]; splitName = assignedSplit.getClassName(); Index: yarn/src/main/java/org/apache/hama/bsp/BSPClient.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPClient.java (revision 1659147) +++ yarn/src/main/java/org/apache/hama/bsp/BSPClient.java (working copy) @@ -18,7 +18,8 @@ package org.apache.hama.bsp; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hama.ipc.VersionedProtocol; +//import org.apache.hadoop.ipc.VersionedProtocol; public interface BSPClient extends VersionedProtocol { Index: yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (revision 1659147) +++ yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (working copy) @@ -17,15 +17,16 @@ */ package org.apache.hama.bsp; +import java.io.InputStream; import java.net.InetSocketAddress; import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.RPC; +import org.apache.hama.ipc.RPC; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; @@ -37,7 +38,7 @@ private static final Log LOG = LogFactory.getLog(BSPRunner.class); - private Configuration conf; + private HamaConfiguration conf; private TaskAttemptID id; private BSPPeerImpl peer; private Counters counters = new Counters(); @@ -49,7 +50,10 @@ public BSPRunner(String jobId, int taskAttemptId, Path confPath) throws Exception { conf = new HamaConfiguration(); - conf.addResource(confPath); + FileSystem fs = FileSystem.get(confPath.toUri(), conf); + InputStream in = fs.open(confPath); + conf.addResource(in); + this.id = new TaskAttemptID(jobId, 0, taskAttemptId, 0); this.id.id = taskAttemptId; @@ -59,6 +63,7 @@ conf.set(Constants.PEER_HOST, BSPNetUtils.getCanonicalHostname()); String umbilicalAddress = conf.get("hama.umbilical.address"); + if (umbilicalAddress == null || umbilicalAddress.isEmpty() || !umbilicalAddress.contains(":")) { throw new IllegalArgumentException( @@ -69,9 +74,10 @@ InetSocketAddress address = new InetSocketAddress(hostPort[0], Integer.valueOf(hostPort[1])); - BSPPeerProtocol umbilical = RPC.getProxy(BSPPeerProtocol.class, - HamaRPCProtocolVersion.versionID, address, conf); - + BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy( + BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, address, + conf); + BSPJob job = new BSPJob(new HamaConfiguration(conf)); BSPTask task = (BSPTask) umbilical.getTask(id); Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1659147) +++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy) @@ -17,31 +17,24 @@ */ package org.apache.hama.bsp; +import java.io.File; import java.io.IOException; import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Collections; +import java.nio.ByteBuffer; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.ContainerManager; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.URL; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.*; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -51,17 +44,22 @@ private final Container allocatedContainer; private final int id; - private final ContainerManager cm; + private final ContainerManagementProtocol cm; private final Configuration conf; private String user; private final Path jobFile; private final BSPJobID jobId; - private GetContainerStatusRequest statusRequest; + private GetContainerStatusesRequest statusRequest; + + @Override + protected void finalize() throws Throwable { + stopAndCleanup(); + } - public BSPTaskLauncher(int id, Container container, ContainerManager cm, - Configuration conf, Path jobFile, BSPJobID jobId) - throws YarnRemoteException { + public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm, + Configuration conf, Path jobFile, BSPJobID jobId) + throws YarnException { this.id = id; this.cm = cm; this.conf = conf; @@ -75,19 +73,15 @@ } } - @Override - protected void finalize() throws Throwable { - stopAndCleanup(); + public void stopAndCleanup() throws YarnException, IOException { + StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class); + List containerIds = new ArrayList(); + containerIds.add(allocatedContainer.getId()); + stopRequest.setContainerIds(containerIds); + cm.stopContainers(stopRequest); } - public void stopAndCleanup() throws YarnRemoteException { - StopContainerRequest stopRequest = Records - .newRecord(StopContainerRequest.class); - stopRequest.setContainerId(allocatedContainer.getId()); - cm.stopContainer(stopRequest); - } - - public void start() throws IOException { + public void start() throws IOException, YarnException { LOG.info("Spawned task with id: " + this.id + " for allocated container id: " + this.allocatedContainer.getId().toString()); @@ -103,20 +97,24 @@ */ public BSPTaskStatus poll() throws Exception { - ContainerStatus lastStatus; - if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus()) - .getState() != ContainerState.COMPLETE) { - return null; + ContainerStatus lastStatus = null; + GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest); + List containerStatuses = getContainerStatusesResponse.getContainerStatuses(); + for (ContainerStatus containerStatus : containerStatuses) { + if ((lastStatus = containerStatus).getState() != ContainerState.COMPLETE) { + return null; + } + LOG.info(this.id + "\tLast report comes with exitstatus of " + lastStatus.getExitStatus() + " and diagnose string of " + lastStatus.getDiagnostics()); } - LOG.info(this.id + "\tLast report comes with existatus of " - + lastStatus.getExitStatus() + " and diagnose string of " - + lastStatus.getDiagnostics()); + + if (lastStatus == null) + return new BSPTaskStatus(id, -1000); + return new BSPTaskStatus(id, lastStatus.getExitStatus()); } - private GetContainerStatusRequest setupContainer( - Container allocatedContainer, ContainerManager cm, String user, int id) - throws IOException { + private GetContainerStatusesRequest setupContainer( + Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException { LOG.info("Setting up a container for user " + user + " with id of " + id + " and containerID of " + allocatedContainer.getId() + " as " + user); // Now we setup a ContainerLaunchContext @@ -123,18 +121,11 @@ ContainerLaunchContext ctx = Records .newRecord(ContainerLaunchContext.class); - ctx.setContainerId(allocatedContainer.getId()); - ctx.setResource(allocatedContainer.getResource()); - ctx.setUser(user); - - /* - * jar - */ + // Set the local resources + Map localResources = new HashMap(); LocalResource packageResource = Records.newRecord(LocalResource.class); FileSystem fs = FileSystem.get(conf); - Path packageFile = new Path(conf.get("bsp.jar")); - // FIXME there seems to be a problem with the converter utils and URL - // transformation + Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION)); URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile .makeQualified(fs.getUri(), fs.getWorkingDirectory())); LOG.info("PackageURL has been composed to " + packageUrl.toString()); @@ -145,16 +136,30 @@ LOG.fatal("If you see this error the workarround does not work", e); } - FileStatus fileStatus = fs.getFileStatus(packageFile); packageResource.setResource(packageUrl); - packageResource.setSize(fileStatus.getLen()); - packageResource.setTimestamp(fileStatus.getModificationTime()); - packageResource.setType(LocalResourceType.ARCHIVE); + packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE))); + packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP))); + packageResource.setType(LocalResourceType.FILE); packageResource.setVisibility(LocalResourceVisibility.APPLICATION); - LOG.info("Package resource: " + packageResource.getResource()); - ctx.setLocalResources(Collections.singletonMap("package", packageResource)); + localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource); + Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_RELEASE_LOCATION)); + URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile + .makeQualified(fs.getUri(), fs.getWorkingDirectory())); + LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString()); + + LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class); + hamaReleaseRsrc.setResource(hamaReleaseUrl); + hamaReleaseRsrc.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_SIZE))); + hamaReleaseRsrc.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP))); + hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE); + hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + + localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc); + + ctx.setLocalResources(localResources); + /* * TODO Package classpath seems not to work if you're in pseudo distributed * mode, because the resource must not be moved, it will never be unpacked. @@ -161,32 +166,57 @@ * So we will check if our jar file has the file:// prefix and put it into * the CP directly */ - String cp = "$CLASSPATH:./*:./package/*:./*:"; - if (packageUrl.getScheme() != null && packageUrl.getScheme().equals("file")) { - cp += packageFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString() + ":"; - LOG.info("Localized file scheme detected, adjusting CP to: " + cp); + + StringBuilder classPathEnv = new StringBuilder( + ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar) + .append("./*"); + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(c.trim()); } - String[] cmds = { - "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" " - + BSPRunner.class.getCanonicalName(), - jobId.getJtIdentifier(), - id + "", - this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString(), - " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", - " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" }; - ctx.setCommands(Arrays.asList(cmds)); - LOG.info("Starting command: " + Arrays.toString(cmds)); + Vector vargs = new Vector(); + vargs.add("${JAVA_HOME}/bin/java"); + vargs.add("-cp " + classPathEnv + ""); + vargs.add(BSPRunner.class.getCanonicalName()); + + vargs.add(jobId.getJtIdentifier()); + vargs.add(Integer.toString(id)); + vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) + .toString()); + + vargs.add("1>/var/log/hadoop/yarn/hama-yarncontainer.stdout"); + vargs.add("2>/var/log/hadoop/yarn/hama-yarncontainer.stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + List commands = new ArrayList(); + commands.add(command.toString()); + + ctx.setCommands(commands); + LOG.debug("Starting command: " + commands); + StartContainerRequest startReq = Records .newRecord(StartContainerRequest.class); startReq.setContainerLaunchContext(ctx); - cm.startContainer(startReq); + startReq.setContainerToken(allocatedContainer.getContainerToken()); - GetContainerStatusRequest statusReq = Records - .newRecord(GetContainerStatusRequest.class); - statusReq.setContainerId(allocatedContainer.getId()); + List list = new ArrayList(); + list.add(startReq); + StartContainersRequest requestList = StartContainersRequest.newInstance(list); + cm.startContainers(requestList); + + GetContainerStatusesRequest statusReq = Records + .newRecord(GetContainerStatusesRequest.class); + List containerIds = new ArrayList(); + containerIds.add(allocatedContainer.getId()); + statusReq.setContainerIds(containerIds); return statusReq; } Index: yarn/src/main/java/org/apache/hama/bsp/Job.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/Job.java (revision 1659147) +++ yarn/src/main/java/org/apache/hama/bsp/Job.java (working copy) @@ -17,8 +17,10 @@ */ package org.apache.hama.bsp; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import java.io.IOException; + /** * Main interface to interact with the job. Provides only getters. */ @@ -34,7 +36,7 @@ public JobState startJob() throws Exception; - public void cleanup() throws YarnRemoteException; + public void cleanup() throws YarnException, IOException; JobState getState(); Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1659147) +++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy) @@ -17,14 +17,10 @@ */ package org.apache.hama.bsp; +import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.security.PrivilegedAction; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,20 +27,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus; @@ -66,7 +59,7 @@ private ApplicationAttemptId appAttemptId; private YarnRPC yarnRPC; - private AMRMProtocol resourceManager; + private ApplicationMasterProtocol resourceManager; private List allocatedContainers; private List releasedContainers = Collections.emptyList(); @@ -76,11 +69,20 @@ private int lastResponseID = 0; + private int getMemoryRequirements() { + String newMemoryProperty = conf.get("bsp.child.mem.in.mb"); + if (newMemoryProperty == null) { + LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts..."); + return getMemoryFromOptString(childOpts); + } else { + return Integer.valueOf(newMemoryProperty); + } + } + public JobImpl(ApplicationAttemptId appAttemptId, - Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC, - String jobFile, BSPJobID jobId) { + Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC, + String jobFile, BSPJobID jobId) { super(); - this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1); this.appAttemptId = appAttemptId; this.yarnRPC = yarnRPC; this.resourceManager = amrmRPC; @@ -88,35 +90,29 @@ this.state = JobState.NEW; this.jobId = jobId; this.conf = jobConfiguration; + this.numBSPTasks = conf.getInt("bsp.peers.num", 1); this.childOpts = conf.get("bsp.child.java.opts"); this.taskMemoryInMb = getMemoryRequirements(); - LOG.info("Memory per task: " + taskMemoryInMb + "m!"); } - private int getMemoryRequirements() { - String newMemoryProperty = conf.get("bsp.child.mem.in.mb"); - if (newMemoryProperty == null) { - LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts..."); - return getMemoryFromOptString(childOpts); - } else { - return Integer.valueOf(newMemoryProperty); + // This really needs a testcase + private static int getMemoryFromOptString(String opts) { + if (opts == null) { + return DEFAULT_MEMORY_MB; } - } - // This really needs a testcase - private static int getMemoryFromOptString(String opts) { if (!opts.contains("-Xmx")) { LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!"); return DEFAULT_MEMORY_MB; } else { // e.G: -Xmx512m + int startIndex = opts.indexOf("-Xmx") + 4; - int endIndex = opts.indexOf(" ", startIndex); - String xmxString = opts.substring(startIndex, endIndex); + String xmxString = opts.substring(startIndex); char qualifier = xmxString.charAt(xmxString.length() - 1); int memory = Integer.valueOf(xmxString.substring(0, - xmxString.length() - 2)); + xmxString.length() - 1)); if (qualifier == 'm') { return memory; } else if (qualifier == 'g') { @@ -133,28 +129,28 @@ public JobState startJob() throws Exception { this.allocatedContainers = new ArrayList(numBSPTasks); + List nmTokenList = null; while (allocatedContainers.size() < numBSPTasks) { + AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f, + createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb, + priority), releasedContainers, null); - AllocateRequest req = BuilderUtils.newAllocateRequest( - appAttemptId, - lastResponseID, - 0.0f, - createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), - taskMemoryInMb, priority), releasedContainers); - AllocateResponse allocateResponse = resourceManager.allocate(req); - AMResponse amResponse = allocateResponse.getAMResponse(); - LOG.info("Got response! ID: " + amResponse.getResponseId() + LOG.info("Got response ID: " + allocateResponse.getResponseId() + " with num of containers: " - + amResponse.getAllocatedContainers().size() + + allocateResponse.getAllocatedContainers().size() + " and following resources: " - + amResponse.getAvailableResources().getMemory() + "mb"); - this.lastResponseID = amResponse.getResponseId(); + + allocateResponse.getAvailableResources().getMemory() + "mb"); + this.lastResponseID = allocateResponse.getResponseId(); - // availableResources = amResponse.getAvailableResources(); - this.allocatedContainers.addAll(amResponse.getAllocatedContainers()); - LOG.info("Waiting to allocate " - + (numBSPTasks - allocatedContainers.size()) + " more containers..."); + this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); + + LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers..."); + + if (allocateResponse.getNMTokens().size() > 0) { + nmTokenList = allocateResponse.getNMTokens(); + } + Thread.sleep(1000l); } @@ -166,17 +162,30 @@ + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" - + allocatedContainer.getNodeHttpAddress() + ", containerState" - + allocatedContainer.getState() + ", containerResourceMemory" + + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); // Connect to ContainerManager on the allocated container - String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":" - + allocatedContainer.getNodeId().getPort(); - InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); - ContainerManager cm = (ContainerManager) yarnRPC.getProxy( - ContainerManager.class, cmAddress, conf); + String user = conf.get("bsp.user.name"); + if (user == null) { + user = System.getenv(ApplicationConstants.Environment.USER.name()); + } + Token nmToken = null; + for (NMToken token : nmTokenList) { + nmToken = token.getToken(); + } + ContainerManagementProtocol cm = null; + try { + cm = getContainerManagementProtocolProxy(yarnRPC, + nmToken, allocatedContainer.getNodeId(), user); + } catch (Exception e) { + LOG.error("Failed to create ContainerManager..."); + if (cm != null) + yarnRPC.stopProxy(cm, conf); + e.printStackTrace(); + } + BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id, allocatedContainer, cm, conf, jobFile, jobId); @@ -185,7 +194,9 @@ completionQueue.add(runnableLaunchContainer); id++; } + LOG.info("Waiting for tasks to finish..."); + LOG.info(completionQueue.size()); state = JobState.RUNNING; int completed = 0; while (completed != numBSPTasks) { @@ -215,13 +226,43 @@ } /** + * + * @param rpc + * @param nmToken + * @param nodeId + * @param user + * @return + */ + protected ContainerManagementProtocol getContainerManagementProtocolProxy( + final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) { + ContainerManagementProtocol proxy; + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + final InetSocketAddress addr = + NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort()); + if (nmToken != null) { + ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr)); + } + + proxy = ugi + .doAs(new PrivilegedAction() { + @Override + public ContainerManagementProtocol run() { + return (ContainerManagementProtocol) rpc.getProxy( + ContainerManagementProtocol.class, + addr, conf); + } + }); + return proxy; + } + + /** * Makes a lookup for the taskid and stops its container and task. It also * removes the task from the launcher so that we won't have to stop it twice. * * @param id - * @throws YarnRemoteException + * @throws YarnException */ - private void cleanupTask(int id) throws YarnRemoteException { + private void cleanupTask(int id) throws YarnException, IOException { BSPTaskLauncher bspTaskLauncher = launchers.get(id); bspTaskLauncher.stopAndCleanup(); launchers.remove(id); @@ -228,13 +269,6 @@ completionQueue.remove(bspTaskLauncher); } - @Override - public void cleanup() throws YarnRemoteException { - for (BSPTaskLauncher launcher : completionQueue) { - launcher.stopAndCleanup(); - } - } - private List createBSPTaskRequest(int numTasks, int memoryInMb, int priority) { @@ -247,7 +281,7 @@ // whether a particular rack/host is needed // useful for applications that are sensitive // to data locality - rsrcRequest.setHostName("*"); + rsrcRequest.setResourceName("*"); // set the priority for the request Priority pri = Records.newRecord(Priority.class); @@ -269,6 +303,13 @@ } @Override + public void cleanup() throws YarnException, IOException { + for (BSPTaskLauncher launcher : completionQueue) { + launcher.stopAndCleanup(); + } + } + + @Override public JobState getState() { return state; } Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java (revision 0) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java (working copy) @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +/** + * Contants used in both Client and Application Master + */ +public class YARNBSPConstants { + + /** + * Environment key name pointing to the hama-yarn's location + */ + public static final String HAMA_YARN_LOCATION = "HAMAYARNJARLOCATION"; + + /** + * Environment key name denoting the file content length for the hama-yarn application. + * Used to validate the local resource. + */ + public static final String HAMA_YARN_SIZE = "HAMAYARNJARSIZE"; + + /** + * Environment key name denoting the file timestamp for the hama-yarn application. + * Used to validate the local resource. + */ + public static final String HAMA_YARN_TIMESTAMP = "HAMAYARNJARTIMESTAMP"; + + /** + * Environment key name pointing to the hama release's location + */ + public static final String HAMA_RELEASE_LOCATION = "HAMARELEASELOCATION"; + + /** + * Environment key name denoting the file content length for the hama release. + * Used to validate the local resource. + */ + public static final String HAMA_RELEASE_SIZE = "HAMARELEASESIZE"; + + /** + * Environment key name denoting the file timestamp for the hama release. + * Used to validate the local resource. + */ + public static final String HAMA_RELEASE_TIMESTAMP = "HAMARELEASETIMESTAMP"; + + /** + * Symbolic link name for application master's jar file in container local resource + */ + public static final String APP_MASTER_JAR_PATH = "AppMaster.jar"; + + /** + * Symbolic link name for hama release archive in container local resource + */ + public static final String HAMA_SYMLINK = "hama"; + + /** + * Hama release file name + */ + public static final String HAMA_RELEASE_FILE = "hama-0.6.4.tar.gz"; + + /** + * Hama release file source location + */ + public static final String HAMA_SRC_PATH = "/opt"; +} Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (revision 1659147) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (working copy) @@ -22,16 +22,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.HamaConfiguration; @@ -43,14 +39,12 @@ private static volatile int id = 0; private YARNBSPJobClient submitClient; - private BSPClient client; private boolean submitted; private ApplicationReport report; - private ClientRMProtocol applicationsManager; + private ApplicationClientProtocol applicationsManager; private YarnRPC rpc; public YARNBSPJob(HamaConfiguration conf) throws IOException { - super(conf); submitClient = new YARNBSPJobClient(conf); YarnConfiguration yarnConf = new YarnConfiguration(conf); this.rpc = YarnRPC.create(conf); @@ -58,8 +52,8 @@ YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); - this.applicationsManager = ((ClientRMProtocol) rpc.getProxy( - ClientRMProtocol.class, rmAddress, conf)); + this.applicationsManager = ((ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf)); } public void setMemoryUsedPerTaskInMb(int mem) { @@ -66,7 +60,7 @@ conf.setInt("bsp.child.mem.in.mb", mem); } - public void kill() throws YarnRemoteException { + public void kill() throws YarnException, IOException { if (submitClient != null) { KillApplicationRequest killRequest = Records .newRecord(KillApplicationRequest.class); @@ -79,6 +73,7 @@ public void submit() throws IOException, InterruptedException { RunningJob submitJobInternal = submitClient.submitJobInternal(this, new BSPJobID("hama_yarn", id++)); + if (submitJobInternal != null) { submitted = true; report = submitClient.getReport(); @@ -95,45 +90,55 @@ this.submit(); } - client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID, + if (report != null && report.getApplicationId() == submitClient.getId()) { + return true; + } else { + return false; + } + + /* + LOG.info("host addr : " + report.getHost() + " & host rpc port : " + report.getRpcPort()); + client = (BSPClient)RPC.waitForProxy(BSPClient.class, BSPClient.versionID, NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf); - GetApplicationReportRequest reportRequest = Records - .newRecord(GetApplicationReportRequest.class); + LOG.info("client : " + client.getCurrentSuperStep()); + //client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID, NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf); + GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class); reportRequest.setApplicationId(submitClient.getId()); - GetApplicationReportResponse reportResponse = applicationsManager - .getApplicationReport(reportRequest); - ApplicationReport localReport = reportResponse.getApplicationReport(); - long clientSuperStep = -1L; - while (localReport.getFinalApplicationStatus() != null - && localReport.getFinalApplicationStatus().ordinal() == 0) { - LOG.debug("currently in state: " - + localReport.getFinalApplicationStatus()); - if (verbose) { - long remoteSuperStep = client.getCurrentSuperStep().get(); - if (clientSuperStep < remoteSuperStep) { - clientSuperStep = remoteSuperStep; - LOG.info("Current supersteps number: " + clientSuperStep); + try { + GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest); + ApplicationReport localReport = reportResponse.getApplicationReport(); + long clientSuperStep = -1L; + while (localReport.getFinalApplicationStatus() != null && localReport.getFinalApplicationStatus().ordinal() == 0) { + LOG.debug("currently in state: " + localReport.getFinalApplicationStatus()); + if (verbose) { + long remoteSuperStep = client.getCurrentSuperStep().get(); + if (clientSuperStep < remoteSuperStep) { + clientSuperStep = remoteSuperStep; + LOG.info("Current supersteps number: " + clientSuperStep); + } } + reportResponse = applicationsManager.getApplicationReport(reportRequest); + localReport = reportResponse.getApplicationReport(); + + Thread.sleep(3000L); } - reportResponse = applicationsManager.getApplicationReport(reportRequest); - localReport = reportResponse.getApplicationReport(); - Thread.sleep(3000L); - } - if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) { - LOG.info("Job succeeded!"); - return true; - } else { - LOG.info("Job failed with status: " - + localReport.getFinalApplicationStatus().toString() + "!"); + if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) { + LOG.info("Job succeeded!"); + return true; + } else { + LOG.info("Job failed with status: " + localReport.getFinalApplicationStatus().toString() + "!"); + return false; + } + } catch (YarnException e) { + e.printStackTrace(); return false; - } - + } */ } - ClientRMProtocol getApplicationsManager() { + ApplicationClientProtocol getApplicationsManager() { return applicationsManager; } Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1659147) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy) @@ -17,9 +17,12 @@ */ package org.apache.hama.bsp; +import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -27,20 +30,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.HamaConfiguration; @@ -52,14 +52,93 @@ private ApplicationId id; private ApplicationReport report; + // Configuration + private YarnClient yarnClient; + private YarnConfiguration yarnConf; + + // Start time for client + private final long clientStartTime = System.currentTimeMillis(); + // Timeout threshold for client. Kill app after time interval expires. + private long clientTimeout = 600000; + + class NetworkedJob implements RunningJob { + @Override + public BSPJobID getID() { + return null; + } + + @Override + public String getJobName() { + return null; + } + + @Override + public long progress() throws IOException { + return 0; + } + + @Override + public boolean isComplete() throws IOException { + return false; + } + + @Override + public boolean isSuccessful() throws IOException { + return false; + } + + @Override + public void waitForCompletion() throws IOException { + + } + + @Override + public int getJobState() throws IOException { + return 0; + } + + @Override + public void killJob() throws IOException { + + } + + @Override + public void killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException { + + } + + @Override + public long getSuperstepCount() throws IOException { + return 0; + } + + @Override + public JobStatus getStatus() { + return null; + } + + @Override + public TaskCompletionEvent[] getTaskCompletionEvents(int eventCounter) { + return new TaskCompletionEvent[0]; + } + + @Override + public String getJobFile() { + return null; + } + } + public YARNBSPJobClient(HamaConfiguration conf) { setConf(conf); + yarnConf = new YarnConfiguration(conf); + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(yarnConf); + yarnClient.start(); } @Override protected RunningJob launchJob(BSPJobID jobId, BSPJob normalJob, Path submitJobFile, FileSystem pFs) throws IOException { - YARNBSPJob job = (YARNBSPJob) normalJob; LOG.info("Submitting job..."); @@ -66,6 +145,7 @@ if (getConf().get("bsp.child.mem.in.mb") == null) { LOG.warn("BSP Child memory has not been set, YARN will guess your needs or use default values."); } + FileSystem fs = pFs; if (fs == null) { fs = FileSystem.get(getConf()); @@ -77,124 +157,176 @@ LOG.debug("Retrieved username: " + s); } - GetNewApplicationRequest request = Records - .newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse response = job.getApplicationsManager() - .getNewApplication(request); - id = response.getApplicationId(); - LOG.debug("Got new ApplicationId=" + id); + try { + YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); + LOG.info("Got Cluster metric info from ASM" + + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); - // Create a new ApplicationSubmissionContext - ApplicationSubmissionContext appContext = Records - .newRecord(ApplicationSubmissionContext.class); - // set the ApplicationId - appContext.setApplicationId(this.id); - // set the application name - appContext.setApplicationName(job.getJobName()); + List clusterNodeReports = yarnClient.getNodeReports( + NodeState.RUNNING); + LOG.info("Got Cluster node info from ASM"); + for (NodeReport node : clusterNodeReports) { + LOG.info("Got node report from ASM for" + + ", nodeId=" + node.getNodeId() + + ", nodeAddress" + node.getHttpAddress() + + ", nodeRackName" + node.getRackName() + + ", nodeNumContainers" + node.getNumContainers()); + } - // Create a new container launch context for the AM's container - ContainerLaunchContext amContainer = Records - .newRecord(ContainerLaunchContext.class); + QueueInfo queueInfo = yarnClient.getQueueInfo("default"); + LOG.info("Queue info" + + ", queueName=" + queueInfo.getQueueName() + + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() + + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() + + ", queueApplicationCount=" + queueInfo.getApplications().size() + + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); - // Define the local resources required - Map localResources = new HashMap(); - // Lets assume the jar we need for our ApplicationMaster is available in - // HDFS at a certain known path to us and we want to make it available to - // the ApplicationMaster in the launched container - if (job.getJar() == null) { - throw new IllegalArgumentException( - "Jar must be set in order to run the application!"); - } - Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar"); - fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath); - LOG.debug("Copying app jar to " + jarPath); - getConf() - .set( - "bsp.jar", - jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString()); - FileStatus jarStatus = fs.getFileStatus(jarPath); - LocalResource amJarRsrc = Records.newRecord(LocalResource.class); - amJarRsrc.setType(LocalResourceType.FILE); - amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); - amJarRsrc.setTimestamp(jarStatus.getModificationTime()); - amJarRsrc.setSize(jarStatus.getLen()); - // this creates a symlink in the working directory - localResources.put("AppMaster.jar", amJarRsrc); - // Set the local resources into the launch context - amContainer.setLocalResources(localResources); + List listAclInfo = yarnClient.getQueueAclsInfo(); + for (QueueUserACLInfo aclInfo : listAclInfo) { + for (QueueACL userAcl : aclInfo.getUserAcls()) { + LOG.info("User ACL Info for Queue" + + ", queueName=" + aclInfo.getQueueName() + + ", userAcl=" + userAcl.name()); + } + } - // Set up the environment needed for the launch context - Map env = new HashMap(); - // Assuming our classes or jars are available as local resources in the - // working directory from which the command will be run, we need to append - // "." to the path. - // By default, all the hadoop specific classpaths will already be available - // in $CLASSPATH, so we should be careful not to overwrite it. - String classPathEnv = "$CLASSPATH:./*:"; - env.put("CLASSPATH", classPathEnv); - amContainer.setEnvironment(env); + GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); + GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request); + id = response.getApplicationId(); - // Construct the command to be executed on the launched container - String command = "${JAVA_HOME}" - + "/bin/java -cp " - + classPathEnv - + " " - + BSPApplicationMaster.class.getCanonicalName() - + " " - + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR - + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR - + "/stderr"; + // Create a new ApplicationSubmissionContext + ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); + // set the ApplicationId + appContext.setApplicationId(this.id); + // set the application name + appContext.setApplicationName(job.getJobName()); - LOG.debug("Start command: " + command); + // Create a new container launch context for the AM's container + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - amContainer.setCommands(Collections.singletonList(command)); + // Define the local resources required + Map localResources = new HashMap(); + // Lets assume the jar we need for our ApplicationMaster is available in + // HDFS at a certain known path to us and we want to make it available to + // the ApplicationMaster in the launched container + if (job.getJar() == null) { + throw new IllegalArgumentException("Jar must be set in order to run the application!"); + } + + Path jarPath = new Path(job.getJar()); + jarPath = fs.makeQualified(jarPath); + getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), jarPath).toString()); - Resource capability = Records.newRecord(Resource.class); - // we have at least 3 threads, which comsumes 1mb each, for each bsptask and - // a base usage of 100mb - capability.setMemory(3 * job.getNumBspTask() - + getConf().getInt("hama.appmaster.memory.mb", 100)); - LOG.info("Set memory for the application master to " - + capability.getMemory() + "mb!"); - amContainer.setResource(capability); + FileStatus jarStatus = fs.getFileStatus(jarPath); + LocalResource amJarRsrc = Records.newRecord(LocalResource.class); + amJarRsrc.setType(LocalResourceType.FILE); + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); + amJarRsrc.setTimestamp(jarStatus.getModificationTime()); + amJarRsrc.setSize(jarStatus.getLen()); - // Set the container launch content into the ApplicationSubmissionContext - appContext.setAMContainerSpec(amContainer); + // this creates a symlink in the working directory + localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc); - // Create the request to send to the ApplicationsManager - SubmitApplicationRequest appRequest = Records - .newRecord(SubmitApplicationRequest.class); - appRequest.setApplicationSubmissionContext(appContext); - job.getApplicationsManager().submitApplication(appRequest); + // Copy from hama-${version}.tar.gz to HDFS + Path hamaDstPath = new Path(getSystemDir(), YARNBSPConstants.HAMA_RELEASE_FILE); + hamaDstPath = fs.makeQualified(hamaDstPath); + fs.copyFromLocalFile(false, true, + new Path(YARNBSPConstants.HAMA_SRC_PATH, YARNBSPConstants.HAMA_RELEASE_FILE), + hamaDstPath); + FileStatus hamaStatus = fs.getFileStatus(hamaDstPath); - GetApplicationReportRequest reportRequest = Records - .newRecord(GetApplicationReportRequest.class); - reportRequest.setApplicationId(id); - while (report == null || report.getHost().equals("N/A")) { - GetApplicationReportResponse reportResponse = job - .getApplicationsManager().getApplicationReport(reportRequest); - report = reportResponse.getApplicationReport(); - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - LOG.error( - "Got interrupted while waiting for a response report from AM.", e); + // Set the local resources into the launch context + amContainer.setLocalResources(localResources); + + // Set up the environment needed for the launch context + Map env = new HashMap(); + // Assuming our classes or jars are available as local resources in the + // working directory from which the command will be run, we need to append + // "." to the path. + // By default, all the hadoop specific classpaths will already be available + // in $CLASSPATH, so we should be careful not to overwrite it. + StringBuilder classPathEnv = new StringBuilder( + ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar) + .append("./*"); + for (String c : yarnConf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(c.trim()); } + + env.put(YARNBSPConstants.HAMA_YARN_LOCATION, jarPath.toUri().toString()); + env.put(YARNBSPConstants.HAMA_YARN_SIZE, Long.toString(jarStatus.getLen())); + env.put(YARNBSPConstants.HAMA_YARN_TIMESTAMP, Long.toString(jarStatus.getModificationTime())); + + env.put(YARNBSPConstants.HAMA_RELEASE_LOCATION, hamaDstPath.toUri().toString()); + env.put(YARNBSPConstants.HAMA_RELEASE_SIZE, Long.toString(hamaStatus.getLen())); + env.put(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP, Long.toString(hamaStatus.getModificationTime())); + env.put("CLASSPATH", classPathEnv.toString()); + amContainer.setEnvironment(env); + + // Construct the command to be executed on the launched container + String command = "${JAVA_HOME}/bin/java -classpath " + classPathEnv + " " + + BSPApplicationMaster.class.getCanonicalName() + " " + + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) + + " 1>/var/log/hadoop/yarn/hama-yarn.stdout" + + " 2>/var/log/hadoop/yarn/hama-yarn.stderr"; + + LOG.debug("Start command: " + command); + + amContainer.setCommands(Collections.singletonList(command)); + + Resource capability = Records.newRecord(Resource.class); + // we have at least 3 threads, which comsumes 1mb each, for each bsptask and + // a base usage of 100mb + capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100)); + LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!"); + + // Set the container launch content into the ApplicationSubmissionContext + appContext.setResource(capability); + + + // Setup security tokens + if (UserGroupInformation.isSecurityEnabled()) { + // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce + LOG.info("SecurityEnable!!!"); + Credentials credentials = new Credentials(); + String tokenRenewer = yarnConf.get(YarnConfiguration.RM_PRINCIPAL); + if (tokenRenewer == null || tokenRenewer.length() == 0) { + throw new IOException( + "Can't get Master Kerberos principal for the RM to use as renewer"); + } + + // For now, only getting tokens for the default file-system. + final Token tokens[] = + fs.addDelegationTokens(tokenRenewer, credentials); + if (tokens != null) { + for (Token token : tokens) { + LOG.info("Got dt for " + fs.getUri() + "; " + token); + } + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + amContainer.setTokens(fsTokens); + } + + appContext.setAMContainerSpec(amContainer); + + // Create the request to send to the ApplicationsManager + ApplicationId appId = appContext.getApplicationId(); + LOG.info("Submitting application " + appId); + yarnClient.submitApplication(appContext); + + return monitorApplication(appId) ? new NetworkedJob() : null; + } catch (YarnException e) { + e.printStackTrace(); + return null; } - LOG.info("Got report: " + report.getApplicationId() + " " - + report.getHost() + ":" + report.getRpcPort()); - return new NetworkedJob(); } @Override - protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { - return Math.max(1, limitTasks); - } - - @Override public Path getSystemDir() { return new Path(getConf().get("bsp.local.dir", "/tmp/hama-yarn/")); } @@ -207,4 +339,76 @@ return report; } + private boolean monitorApplication(ApplicationId appId) + throws IOException, YarnException { + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + + // Get application report for the appId we are interested in + report = yarnClient.getApplicationReport(appId); + + LOG.info("Got application report from ASM for" + ", appId=" + + appId.getId() + ", clientToAMToken=" + + report.getClientToAMToken() + ", appDiagnostics=" + + report.getDiagnostics() + ", appMasterHost=" + + report.getHost() + ", appQueue=" + report.getQueue() + + ", appMasterRpcPort=" + report.getRpcPort() + + ", appStartTime=" + report.getStartTime() + + ", yarnAppState=" + + report.getYarnApplicationState().toString() + + ", distributedFinalState=" + + report.getFinalApplicationStatus().toString() + + ", appTrackingUrl=" + report.getTrackingUrl() + + ", appUser=" + report.getUser()); + + YarnApplicationState state = report.getYarnApplicationState(); + FinalApplicationStatus dsStatus = report + .getFinalApplicationStatus(); + if (YarnApplicationState.FINISHED == state) { + if (FinalApplicationStatus.SUCCEEDED == dsStatus) { + LOG.info("Application has completed successfully. Breaking monitoring loop"); + return true; + } else { + LOG.info("Application did finished unsuccessfully." + + " YarnState=" + state.toString() + + ", DSFinalStatus=" + dsStatus.toString() + + ". Breaking monitoring loop"); + return false; + } + } else if (YarnApplicationState.KILLED == state + || YarnApplicationState.FAILED == state) { + LOG.info("Application did not finish." + " YarnState=" + + state.toString() + ", DSFinalStatus=" + + dsStatus.toString() + ". Breaking monitoring loop"); + return false; + } + + if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { + LOG.info("Reached client specified timeout for application. Killing application"); + forceKillApplication(appId); + return false; + } + } + } + + /** + * Kill a submitted application by sending a call to the ASM + * @param appId Application Id to be killed. + * @throws YarnException + * @throws IOException + */ + private void forceKillApplication(ApplicationId appId) + throws YarnException, IOException { + // TODO clarify whether multiple jobs with the same app id can be submitted and be running at + // the same time. + // If yes, can we kill a particular attempt only? + + // Response can be ignored as it is non-null on success or + // throws an exception in case of failures + yarnClient.killApplication(appId); + } } Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1659147) +++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy) @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.NullWritable; +import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.sync.SyncException; @@ -37,7 +38,7 @@ public void bsp( BSPPeer bspPeer) throws IOException, SyncException, InterruptedException { - num = bspPeer.getConfiguration().getInt("bsp.peers.num", 0); + num = bspPeer.getConfiguration().getInt("bsp.peers.num", 1); LOG.info(bspPeer.getAllPeerNames()); int i = 0; for (String otherPeer : bspPeer.getAllPeerNames()) { @@ -57,15 +58,24 @@ InterruptedException, ClassNotFoundException { HamaConfiguration conf = new HamaConfiguration(); // TODO some keys that should be within a conf - conf.set("yarn.resourcemanager.address", "0.0.0.0:8040"); - conf.set("bsp.local.dir", "/tmp/bsp-yarn/"); + conf.set("yarn.resourcemanager.address", "localhost:8032"); + conf.set("bsp.user.name", "root"); + conf.set("bsp.groom.report.address", "127.0.0.1:50001"); + conf.setInt(Constants.MAX_TASKS, 10); YARNBSPJob job = new YARNBSPJob(conf); + job.set("hama.zookeeper.quorum", "localhost"); + job.set("hama.zookeeper.property.clientPort", "21810"); + job.setBoolean("hama.yarn.application", true); job.setBspClass(HelloBSP.class); job.setJarByClass(HelloBSP.class); job.setJobName("Serialize Printing"); + job.setInputFormat(NullInputFormat.class); + job.setOutputFormat(NullOutputFormat.class); + job.setMemoryUsedPerTaskInMb(50); job.setNumBspTask(2); + System.out.println("bsp.peers.num : " + job.getConfiguration().get("bsp.peers.num")); job.waitForCompletion(true); } }