Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1654753) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -303,10 +303,14 @@ BSPJob job = pJob; job.setJobID(jobId); - ClusterStatus clusterStatus = getClusterStatus(true); - int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, - clusterStatus.getMaxTasks() - clusterStatus.getTasks()); + int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS, 0); + if(maxTasks == 0) { + ClusterStatus clusterStatus = getClusterStatus(true); + maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, + clusterStatus.getMaxTasks() - clusterStatus.getTasks()); + } + if (maxTasks < job.getNumBspTask()) { LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with " + maxTasks + " tasks."); Index: pom.xml =================================================================== --- pom.xml (revision 1654753) +++ pom.xml (working copy) @@ -320,6 +320,7 @@ ml mesos dist + yarn Index: yarn/pom.xml =================================================================== --- yarn/pom.xml (revision 1654753) +++ yarn/pom.xml (working copy) @@ -19,7 +19,7 @@ org.apache.hama hama-parent - 0.6.3-SNAPSHOT + 0.7.0-SNAPSHOT 4.0.0 @@ -26,13 +26,15 @@ org.apache.hama hama-yarn yarn - 0.6.3-SNAPSHOT + 0.7.0-SNAPSHOT jar - 1.2.0 + - + org.apache.hama @@ -54,27 +56,36 @@ avro 1.5.3 - org.apache.hadoop hadoop-yarn-api - 0.23.1 + ${hadoop.version} org.apache.hadoop hadoop-yarn-common - 0.23.1 + ${hadoop.version} org.apache.hadoop hadoop-yarn-server-tests - 0.23.1 + ${hadoop.version} + + org.apache.hadoop + hadoop-yarn-client + ${hadoop.version} + org.apache.zookeeper zookeeper + + org.apache.hadoop + hadoop-common + ${hadoop.version} + Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy) @@ -33,13 +33,11 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.SystemClock; -import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -46,10 +44,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.Job.JobState; import org.apache.hama.bsp.sync.SyncServerRunner; @@ -73,7 +73,7 @@ private Clock clock; private YarnRPC yarnRPC; - private AMRMProtocol amrmRPC; + private ApplicationMasterProtocol amrmRPC; private ApplicationAttemptId appAttemptId; private String applicationName; @@ -159,14 +159,21 @@ */ private void startRPCServers() throws IOException { // start the RPC server which talks to the client - this.clientServer = RPC.getServer(BSPClient.class, this, hostname, - clientPort, jobConf); + YarnRPC rpc = YarnRPC.create(jobConf); + InetSocketAddress BSPClientAddress = jobConf.getSocketAddr(hostname, hostname, clientPort); + this.clientServer = rpc.getServer(BSPClient.class, this, BSPClientAddress, jobConf, null, + jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); this.clientServer.start(); // start the RPC server which talks to the tasks this.taskServerPort = BSPNetUtils.getFreePort(10000); - this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname, - taskServerPort, jobConf); + InetSocketAddress BSPPeerProtocolAddress = jobConf + .getSocketAddr(hostname, hostname, taskServerPort); + this.taskServer = rpc.getServer(BSPPeerProtocol.class, this, BSPPeerProtocolAddress, + jobConf, null, + jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); this.taskServer.start(); // readjusting the configuration to let the tasks know where we are. this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort); @@ -191,13 +198,13 @@ * @param yarnConf * @return a new RPC connection to the Resource Manager. */ - private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) { + private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) { // Connect to the Scheduler of the ResourceManager. InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); - return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress, + return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, yarnConf); } @@ -206,13 +213,13 @@ * response which is used to launch additional containers. */ private static RegisterApplicationMasterResponse registerApplicationMaster( - AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID, + ApplicationMasterProtocol resourceManager, ApplicationAttemptId appAttemptID, String appMasterHostName, int appMasterRpcPort, - String appMasterTrackingUrl) throws YarnRemoteException { + String appMasterTrackingUrl) throws YarnException, IOException { RegisterApplicationMasterRequest appMasterRequest = Records .newRecord(RegisterApplicationMasterRequest.class); - appMasterRequest.setApplicationAttemptId(appAttemptID); + //appMasterRequest.setApplicationAttemptId(appAttemptID); appMasterRequest.setHost(appMasterHostName); appMasterRequest.setRpcPort(appMasterRpcPort); // TODO tracking URL @@ -234,12 +241,13 @@ private static ApplicationAttemptId getApplicationAttemptId() throws IOException { Map envs = System.getenv(); - if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { + if (!envs.containsKey(Environment.CONTAINER_ID.name())) { throw new IllegalArgumentException( "ApplicationAttemptId not set in the environment"); } + return ConverterUtils.toContainerId( - envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)) + envs.get(Environment.CONTAINER_ID.name())) .getApplicationAttemptId(); } @@ -259,7 +267,7 @@ } } - private void cleanup() throws YarnRemoteException { + private void cleanup() throws YarnException, IOException { syncServer.stop(); if (threadPool != null && !threadPool.isShutdown()) { threadPool.shutdownNow(); @@ -268,24 +276,24 @@ taskServer.stop(); FinishApplicationMasterRequest finishReq = Records .newRecord(FinishApplicationMasterRequest.class); - finishReq.setAppAttemptId(appAttemptId); + //finishReq.setAppAttemptId(appAttemptId); switch (job.getState()) { case SUCCESS: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); break; case KILLED: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED); break; case FAILED: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); break; default: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); } this.amrmRPC.finishApplicationMaster(finishReq); } - public static void main(String[] args) throws YarnRemoteException { + public static void main(String[] args) throws YarnException, IOException { // we expect getting the qualified path of the job.xml as the first // element in the arguments BSPApplicationMaster master = null; Index: yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (working copy) @@ -37,7 +37,7 @@ private static final Log LOG = LogFactory.getLog(BSPRunner.class); - private Configuration conf; + private HamaConfiguration conf; private TaskAttemptID id; private BSPPeerImpl peer; private Counters counters = new Counters(); Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy) @@ -19,8 +19,10 @@ import java.io.IOException; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,19 +31,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.ContainerManager; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.URL; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.*; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -51,17 +44,17 @@ private final Container allocatedContainer; private final int id; - private final ContainerManager cm; + private final ContainerManagementProtocol cm; private final Configuration conf; private String user; private final Path jobFile; private final BSPJobID jobId; - private GetContainerStatusRequest statusRequest; + private GetContainerStatusesRequest statusRequest; - public BSPTaskLauncher(int id, Container container, ContainerManager cm, + public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm, Configuration conf, Path jobFile, BSPJobID jobId) - throws YarnRemoteException { + throws YarnException { this.id = id; this.cm = cm; this.conf = conf; @@ -80,14 +73,15 @@ stopAndCleanup(); } - public void stopAndCleanup() throws YarnRemoteException { - StopContainerRequest stopRequest = Records - .newRecord(StopContainerRequest.class); - stopRequest.setContainerId(allocatedContainer.getId()); - cm.stopContainer(stopRequest); + public void stopAndCleanup() throws YarnException, IOException { + StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class); + List containerIds = new ArrayList(); + containerIds.add(allocatedContainer.getId()); + stopRequest.setContainerIds(containerIds); + cm.stopContainers(stopRequest); } - public void start() throws IOException { + public void start() throws IOException, YarnException { LOG.info("Spawned task with id: " + this.id + " for allocated container id: " + this.allocatedContainer.getId().toString()); @@ -103,20 +97,22 @@ */ public BSPTaskStatus poll() throws Exception { - ContainerStatus lastStatus; - if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus()) - .getState() != ContainerState.COMPLETE) { - return null; + ContainerStatus lastStatus = null; + GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest); + List containerStatuses = getContainerStatusesResponse.getContainerStatuses(); + for (ContainerStatus containerStatus : containerStatuses) { + if ((lastStatus = containerStatus).getState() != ContainerState.COMPLETE) { + return null; + } + LOG.info(this.id + "\tLast report comes with existatus of " + + lastStatus.getExitStatus() + " and diagnose string of " + + lastStatus.getDiagnostics()); } - LOG.info(this.id + "\tLast report comes with existatus of " - + lastStatus.getExitStatus() + " and diagnose string of " - + lastStatus.getDiagnostics()); return new BSPTaskStatus(id, lastStatus.getExitStatus()); } - private GetContainerStatusRequest setupContainer( - Container allocatedContainer, ContainerManager cm, String user, int id) - throws IOException { + private GetContainerStatusesRequest setupContainer( + Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException { LOG.info("Setting up a container for user " + user + " with id of " + id + " and containerID of " + allocatedContainer.getId() + " as " + user); // Now we setup a ContainerLaunchContext @@ -123,9 +119,9 @@ ContainerLaunchContext ctx = Records .newRecord(ContainerLaunchContext.class); - ctx.setContainerId(allocatedContainer.getId()); - ctx.setResource(allocatedContainer.getResource()); - ctx.setUser(user); +// ctx.setContainerId(allocatedContainer.getId()); +// ctx.setResource(allocatedContainer.getResource()); +// ctx.setUser(user); /* * jar @@ -182,11 +178,17 @@ StartContainerRequest startReq = Records .newRecord(StartContainerRequest.class); startReq.setContainerLaunchContext(ctx); - cm.startContainer(startReq); - GetContainerStatusRequest statusReq = Records - .newRecord(GetContainerStatusRequest.class); - statusReq.setContainerId(allocatedContainer.getId()); + List list = new ArrayList(); + list.add(startReq); + StartContainersRequest requestList = StartContainersRequest.newInstance(list); + cm.startContainers(requestList); + + GetContainerStatusesRequest statusReq = Records + .newRecord(GetContainerStatusesRequest.class); + List containerIds = new ArrayList(); + containerIds.add(allocatedContainer.getId()); + statusReq.setContainerIds(containerIds); return statusReq; } Index: yarn/src/main/java/org/apache/hama/bsp/Job.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/Job.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/Job.java (working copy) @@ -17,8 +17,10 @@ */ package org.apache.hama.bsp; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import java.io.IOException; + /** * Main interface to interact with the job. Provides only getters. */ @@ -34,7 +36,7 @@ public JobState startJob() throws Exception; - public void cleanup() throws YarnRemoteException; + public void cleanup() throws YarnException, IOException; JobState getState(); Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy) @@ -17,6 +17,7 @@ */ package org.apache.hama.bsp; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -31,11 +32,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -42,9 +42,8 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus; @@ -66,7 +65,7 @@ private ApplicationAttemptId appAttemptId; private YarnRPC yarnRPC; - private AMRMProtocol resourceManager; + private ApplicationMasterProtocol resourceManager; private List allocatedContainers; private List releasedContainers = Collections.emptyList(); @@ -77,7 +76,7 @@ private int lastResponseID = 0; public JobImpl(ApplicationAttemptId appAttemptId, - Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC, + Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC, String jobFile, BSPJobID jobId) { super(); this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1); @@ -135,24 +134,27 @@ this.allocatedContainers = new ArrayList(numBSPTasks); while (allocatedContainers.size() < numBSPTasks) { - AllocateRequest req = BuilderUtils.newAllocateRequest( - appAttemptId, - lastResponseID, - 0.0f, - createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), - taskMemoryInMb, priority), releasedContainers); +// AllocateRequest req = BuilderUtils.newAllocateRequest( +// appAttemptId, +// lastResponseID, +// 0.0f, +// createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), +// taskMemoryInMb, priority), releasedContainers); + AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f, + createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb, + priority), releasedContainers, null); + AllocateResponse allocateResponse = resourceManager.allocate(req); - AMResponse amResponse = allocateResponse.getAMResponse(); - LOG.info("Got response! ID: " + amResponse.getResponseId() + LOG.info("Got response! ID: " + allocateResponse.getResponseId() + " with num of containers: " - + amResponse.getAllocatedContainers().size() + + allocateResponse.getAllocatedContainers().size() + " and following resources: " - + amResponse.getAvailableResources().getMemory() + "mb"); - this.lastResponseID = amResponse.getResponseId(); + + allocateResponse.getAvailableResources().getMemory() + "mb"); + this.lastResponseID = allocateResponse.getResponseId(); // availableResources = amResponse.getAvailableResources(); - this.allocatedContainers.addAll(amResponse.getAllocatedContainers()); + this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers..."); Thread.sleep(1000l); @@ -166,8 +168,7 @@ + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" - + allocatedContainer.getNodeHttpAddress() + ", containerState" - + allocatedContainer.getState() + ", containerResourceMemory" + + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); // Connect to ContainerManager on the allocated container @@ -174,8 +175,8 @@ String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort(); InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); - ContainerManager cm = (ContainerManager) yarnRPC.getProxy( - ContainerManager.class, cmAddress, conf); + ContainerManagementProtocol cm = (ContainerManagementProtocol) yarnRPC.getProxy( + ContainerManagementProtocol.class, cmAddress, conf); BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id, allocatedContainer, cm, conf, jobFile, jobId); @@ -219,9 +220,9 @@ * removes the task from the launcher so that we won't have to stop it twice. * * @param id - * @throws YarnRemoteException + * @throws YarnException */ - private void cleanupTask(int id) throws YarnRemoteException { + private void cleanupTask(int id) throws YarnException, IOException { BSPTaskLauncher bspTaskLauncher = launchers.get(id); bspTaskLauncher.stopAndCleanup(); launchers.remove(id); @@ -229,7 +230,7 @@ } @Override - public void cleanup() throws YarnRemoteException { + public void cleanup() throws YarnException, IOException { for (BSPTaskLauncher launcher : completionQueue) { launcher.stopAndCleanup(); } @@ -247,7 +248,7 @@ // whether a particular rack/host is needed // useful for applications that are sensitive // to data locality - rsrcRequest.setHostName("*"); + rsrcRequest.setResourceName("*"); // set the priority for the request Priority pri = Records.newRecord(Priority.class); Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (working copy) @@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.HamaConfiguration; @@ -46,7 +46,7 @@ private BSPClient client; private boolean submitted; private ApplicationReport report; - private ClientRMProtocol applicationsManager; + private ApplicationClientProtocol applicationsManager; private YarnRPC rpc; public YARNBSPJob(HamaConfiguration conf) throws IOException { @@ -58,8 +58,8 @@ YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); - this.applicationsManager = ((ClientRMProtocol) rpc.getProxy( - ClientRMProtocol.class, rmAddress, conf)); + this.applicationsManager = ((ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf)); } public void setMemoryUsedPerTaskInMb(int mem) { @@ -66,7 +66,7 @@ conf.setInt("bsp.child.mem.in.mb", mem); } - public void kill() throws YarnRemoteException { + public void kill() throws YarnException, IOException { if (submitClient != null) { KillApplicationRequest killRequest = Records .newRecord(KillApplicationRequest.class); @@ -95,45 +95,44 @@ this.submit(); } - client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID, - NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf); - GetApplicationReportRequest reportRequest = Records - .newRecord(GetApplicationReportRequest.class); - reportRequest.setApplicationId(submitClient.getId()); + client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID, NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf); + GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class); + reportRequest.setApplicationId(submitClient.getId()); - GetApplicationReportResponse reportResponse = applicationsManager - .getApplicationReport(reportRequest); - ApplicationReport localReport = reportResponse.getApplicationReport(); - long clientSuperStep = -1L; - while (localReport.getFinalApplicationStatus() != null - && localReport.getFinalApplicationStatus().ordinal() == 0) { - LOG.debug("currently in state: " - + localReport.getFinalApplicationStatus()); - if (verbose) { - long remoteSuperStep = client.getCurrentSuperStep().get(); - if (clientSuperStep < remoteSuperStep) { - clientSuperStep = remoteSuperStep; - LOG.info("Current supersteps number: " + clientSuperStep); + try { + GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest); + ApplicationReport localReport = reportResponse.getApplicationReport(); + long clientSuperStep = -1L; + while (localReport.getFinalApplicationStatus() != null && localReport.getFinalApplicationStatus().ordinal() == 0) { + LOG.debug("currently in state: " + localReport.getFinalApplicationStatus()); + if (verbose) { + long remoteSuperStep = client.getCurrentSuperStep().get(); + if (clientSuperStep < remoteSuperStep) { + clientSuperStep = remoteSuperStep; + LOG.info("Current supersteps number: " + clientSuperStep); + } } + reportResponse = applicationsManager.getApplicationReport(reportRequest); + localReport = reportResponse.getApplicationReport(); + + Thread.sleep(3000L); } - reportResponse = applicationsManager.getApplicationReport(reportRequest); - localReport = reportResponse.getApplicationReport(); - Thread.sleep(3000L); - } - if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) { - LOG.info("Job succeeded!"); - return true; - } else { - LOG.info("Job failed with status: " - + localReport.getFinalApplicationStatus().toString() + "!"); + if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) { + LOG.info("Job succeeded!"); + return true; + } else { + LOG.info("Job failed with status: " + localReport.getFinalApplicationStatus().toString() + "!"); + return false; + } + } catch (YarnException e) { + e.printStackTrace(); return false; } - } - ClientRMProtocol getApplicationsManager() { + ApplicationClientProtocol getApplicationsManager() { return applicationsManager; } Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.HamaConfiguration; @@ -77,122 +78,105 @@ LOG.debug("Retrieved username: " + s); } - GetNewApplicationRequest request = Records - .newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse response = job.getApplicationsManager() - .getNewApplication(request); - id = response.getApplicationId(); - LOG.debug("Got new ApplicationId=" + id); + try { + GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); - // Create a new ApplicationSubmissionContext - ApplicationSubmissionContext appContext = Records - .newRecord(ApplicationSubmissionContext.class); - // set the ApplicationId - appContext.setApplicationId(this.id); - // set the application name - appContext.setApplicationName(job.getJobName()); - // Create a new container launch context for the AM's container - ContainerLaunchContext amContainer = Records - .newRecord(ContainerLaunchContext.class); + GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request); + id = response.getApplicationId(); + LOG.debug("Got new ApplicationId=" + id); - // Define the local resources required - Map localResources = new HashMap(); - // Lets assume the jar we need for our ApplicationMaster is available in - // HDFS at a certain known path to us and we want to make it available to - // the ApplicationMaster in the launched container - if (job.getJar() == null) { - throw new IllegalArgumentException( - "Jar must be set in order to run the application!"); - } - Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar"); - fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath); - LOG.debug("Copying app jar to " + jarPath); - getConf() - .set( - "bsp.jar", - jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString()); - FileStatus jarStatus = fs.getFileStatus(jarPath); - LocalResource amJarRsrc = Records.newRecord(LocalResource.class); - amJarRsrc.setType(LocalResourceType.FILE); - amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); - amJarRsrc.setTimestamp(jarStatus.getModificationTime()); - amJarRsrc.setSize(jarStatus.getLen()); - // this creates a symlink in the working directory - localResources.put("AppMaster.jar", amJarRsrc); - // Set the local resources into the launch context - amContainer.setLocalResources(localResources); + // Create a new ApplicationSubmissionContext + ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); + // set the ApplicationId + appContext.setApplicationId(this.id); + // set the application name + appContext.setApplicationName(job.getJobName()); - // Set up the environment needed for the launch context - Map env = new HashMap(); - // Assuming our classes or jars are available as local resources in the - // working directory from which the command will be run, we need to append - // "." to the path. - // By default, all the hadoop specific classpaths will already be available - // in $CLASSPATH, so we should be careful not to overwrite it. - String classPathEnv = "$CLASSPATH:./*:"; - env.put("CLASSPATH", classPathEnv); - amContainer.setEnvironment(env); + // Create a new container launch context for the AM's container + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - // Construct the command to be executed on the launched container - String command = "${JAVA_HOME}" - + "/bin/java -cp " - + classPathEnv - + " " - + BSPApplicationMaster.class.getCanonicalName() - + " " - + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) - .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR - + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR - + "/stderr"; + // Define the local resources required + Map localResources = new HashMap(); + // Lets assume the jar we need for our ApplicationMaster is available in + // HDFS at a certain known path to us and we want to make it available to + // the ApplicationMaster in the launched container + if (job.getJar() == null) { + throw new IllegalArgumentException("Jar must be set in order to run the application!"); + } + Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar"); + fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath); + LOG.debug("Copying app jar to " + jarPath); + getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()); + FileStatus jarStatus = fs.getFileStatus(jarPath); + LocalResource amJarRsrc = Records.newRecord(LocalResource.class); + amJarRsrc.setType(LocalResourceType.FILE); + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); + amJarRsrc.setTimestamp(jarStatus.getModificationTime()); + amJarRsrc.setSize(jarStatus.getLen()); + // this creates a symlink in the working directory + localResources.put("AppMaster.jar", amJarRsrc); + // Set the local resources into the launch context + amContainer.setLocalResources(localResources); - LOG.debug("Start command: " + command); + // Set up the environment needed for the launch context + Map env = new HashMap(); + // Assuming our classes or jars are available as local resources in the + // working directory from which the command will be run, we need to append + // "." to the path. + // By default, all the hadoop specific classpaths will already be available + // in $CLASSPATH, so we should be careful not to overwrite it. + String classPathEnv = "$CLASSPATH:./*:"; + env.put("CLASSPATH", classPathEnv); + amContainer.setEnvironment(env); - amContainer.setCommands(Collections.singletonList(command)); + // Construct the command to be executed on the launched container + String command = "${JAVA_HOME}" + "/bin/java -cp " + classPathEnv + " " + BSPApplicationMaster.class.getCanonicalName() + " " + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; - Resource capability = Records.newRecord(Resource.class); - // we have at least 3 threads, which comsumes 1mb each, for each bsptask and - // a base usage of 100mb - capability.setMemory(3 * job.getNumBspTask() - + getConf().getInt("hama.appmaster.memory.mb", 100)); - LOG.info("Set memory for the application master to " - + capability.getMemory() + "mb!"); - amContainer.setResource(capability); + LOG.debug("Start command: " + command); - // Set the container launch content into the ApplicationSubmissionContext - appContext.setAMContainerSpec(amContainer); + amContainer.setCommands(Collections.singletonList(command)); - // Create the request to send to the ApplicationsManager - SubmitApplicationRequest appRequest = Records - .newRecord(SubmitApplicationRequest.class); - appRequest.setApplicationSubmissionContext(appContext); - job.getApplicationsManager().submitApplication(appRequest); + Resource capability = Records.newRecord(Resource.class); + // we have at least 3 threads, which comsumes 1mb each, for each bsptask and + // a base usage of 100mb + capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100)); + LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!"); + //amContainer.setResource(capability); - GetApplicationReportRequest reportRequest = Records - .newRecord(GetApplicationReportRequest.class); - reportRequest.setApplicationId(id); - while (report == null || report.getHost().equals("N/A")) { - GetApplicationReportResponse reportResponse = job - .getApplicationsManager().getApplicationReport(reportRequest); - report = reportResponse.getApplicationReport(); - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - LOG.error( - "Got interrupted while waiting for a response report from AM.", e); + // Set the container launch content into the ApplicationSubmissionContext + appContext.setResource(capability); + appContext.setAMContainerSpec(amContainer); + + // Create the request to send to the ApplicationsManager + SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + job.getApplicationsManager().submitApplication(appRequest); + + GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class); + reportRequest.setApplicationId(id); + while (report == null || report.getHost().equals("N/A")) { + GetApplicationReportResponse reportResponse = job.getApplicationsManager().getApplicationReport(reportRequest); + report = reportResponse.getApplicationReport(); + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + LOG.error("Got interrupted while waiting for a response report from AM.", e); + } } + LOG.info("Got report: " + report.getApplicationId() + " " + report.getHost() + ":" + report.getRpcPort()); + return new NetworkedJob(); + } catch (YarnException e) { + e.printStackTrace(); + return null; } - LOG.info("Got report: " + report.getApplicationId() + " " - + report.getHost() + ":" + report.getRpcPort()); - return new NetworkedJob(); } - @Override - protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { - return Math.max(1, limitTasks); - } +// @Override +// protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { +// return Math.max(1, limitTasks); +// } @Override public Path getSystemDir() { Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1654753) +++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy) @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.NullWritable; +import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.sync.SyncException; @@ -57,8 +58,11 @@ InterruptedException, ClassNotFoundException { HamaConfiguration conf = new HamaConfiguration(); // TODO some keys that should be within a conf - conf.set("yarn.resourcemanager.address", "0.0.0.0:8040"); - conf.set("bsp.local.dir", "/tmp/bsp-yarn/"); + conf.set("yarn.resourcemanager.address", "slave1.hama.com:8050"); + conf.set("bsp.user.name", "minho"); + conf.set("bsp.local.dir", "/root/"); + conf.set("bsp.working.dir", "/user/root/bsp-yarn/"); + conf.setInt(Constants.MAX_TASKS, 10); YARNBSPJob job = new YARNBSPJob(conf); job.setBspClass(HelloBSP.class);