Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1564184) +++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy) @@ -33,23 +33,22 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RPC.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.SystemClock; -import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.Job.JobState; import org.apache.hama.bsp.sync.SyncServerRunner; @@ -73,7 +72,7 @@ private Clock clock; private YarnRPC yarnRPC; - private AMRMProtocol amrmRPC; + private ApplicationMasterProtocol amrmRPC; private ApplicationAttemptId appAttemptId; private String applicationName; @@ -146,8 +145,7 @@ } this.amrmRPC = getYarnRPCConnection(localConf); - registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, - "http://localhost:8080"); + registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, "http://localhost:8080"); } /** @@ -159,14 +157,30 @@ */ private void startRPCServers() throws IOException { // start the RPC server which talks to the client - this.clientServer = RPC.getServer(BSPClient.class, this, hostname, - clientPort, jobConf); + YarnRPC rpc = YarnRPC.create(jobConf); + InetSocketAddress BSPClientAddress = jobConf.getSocketAddr( + hostname, + hostname, + clientPort); + this.clientServer = + rpc.getServer( BSPClient.class, this, BSPClientAddress, + jobConf, null, + jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); + this.clientServer.start(); // start the RPC server which talks to the tasks this.taskServerPort = BSPNetUtils.getFreePort(10000); - this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname, - taskServerPort, jobConf); + InetSocketAddress BSPPeerProtocolAddress = jobConf.getSocketAddr( + hostname, + hostname, + taskServerPort); + this.taskServer = + rpc.getServer(BSPPeerProtocol.class, this, BSPPeerProtocolAddress, + jobConf, null, + jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); this.taskServer.start(); // readjusting the configuration to let the tasks know where we are. this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort); @@ -190,37 +204,38 @@ * * @param yarnConf * @return a new RPC connection to the Resource Manager. + * @throws IOException */ - private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) { + private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) throws IOException { // Connect to the Scheduler of the ResourceManager. - InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); - LOG.info("Connecting to ResourceManager at " + rmAddress); - return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress, - yarnConf); + return ClientRMProxy.createRMProxy(yarnConf, ApplicationMasterProtocol.class); } /** * Registers this application master with the Resource Manager and retrieves a * response which is used to launch additional containers. + * @throws IOException + * @throws YarnException */ private static RegisterApplicationMasterResponse registerApplicationMaster( - AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID, + ApplicationMasterProtocol resourceManager, ApplicationAttemptId appAttemptID, String appMasterHostName, int appMasterRpcPort, - String appMasterTrackingUrl) throws YarnRemoteException { + String appMasterTrackingUrl) throws YarnException, IOException { RegisterApplicationMasterRequest appMasterRequest = Records .newRecord(RegisterApplicationMasterRequest.class); - appMasterRequest.setApplicationAttemptId(appAttemptID); + // Check where to use appAttemptID appMasterRequest.setHost(appMasterHostName); appMasterRequest.setRpcPort(appMasterRpcPort); - // TODO tracking URL + // TODO tracking URL appAttemptID appMasterRequest.setTrackingUrl(appMasterTrackingUrl); - RegisterApplicationMasterResponse response = resourceManager - .registerApplicationMaster(appMasterRequest); + RegisterApplicationMasterResponse response = resourceManager.registerApplicationMaster(appMasterRequest); LOG.debug("ApplicationMaster has maximum resource capability of: " + response.getMaximumResourceCapability().getMemory()); + // App Attempt id is set in the ClientToAMTokenMasterKey + response.getClientToAMTokenMasterKey(); + + return response; } @@ -233,13 +248,15 @@ */ private static ApplicationAttemptId getApplicationAttemptId() throws IOException { + // TODO Check Environment.CONTAINER_ID.name() which is found in YARN ApplicationMaster.java + // THe appAttemptID can also be get from register app master too, think about this later Map envs = System.getenv(); - if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { + if (!envs.containsKey(Environment.CONTAINER_ID.name())) { throw new IllegalArgumentException( "ApplicationAttemptId not set in the environment"); } return ConverterUtils.toContainerId( - envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)) + envs.get(Environment.CONTAINER_ID.name())) .getApplicationAttemptId(); } @@ -259,7 +276,7 @@ } } - private void cleanup() throws YarnRemoteException { + private void cleanup() throws YarnException, IOException { syncServer.stop(); if (threadPool != null && !threadPool.isShutdown()) { threadPool.shutdownNow(); @@ -268,24 +285,25 @@ taskServer.stop(); FinishApplicationMasterRequest finishReq = Records .newRecord(FinishApplicationMasterRequest.class); - finishReq.setAppAttemptId(appAttemptId); + //TODO check where to set the app attampt id + //finishReq.setAppAttemptId(appAttemptId); switch (job.getState()) { case SUCCESS: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); break; case KILLED: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED); break; case FAILED: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); break; default: - finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); } this.amrmRPC.finishApplicationMaster(finishReq); } - public static void main(String[] args) throws YarnRemoteException { + public static void main(String[] args) throws YarnException, IOException { // we expect getting the qualified path of the job.xml as the first // element in the arguments BSPApplicationMaster master = null; Index: yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (revision 1564184) +++ yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (working copy) @@ -48,6 +48,7 @@ @SuppressWarnings({ "unchecked", "rawtypes" }) public BSPRunner(String jobId, int taskAttemptId, Path confPath) throws Exception { + conf = new HamaConfiguration(); conf.addResource(confPath); this.id = new TaskAttemptID(jobId, 0, taskAttemptId, 0); @@ -75,10 +76,10 @@ BSPJob job = new BSPJob(new HamaConfiguration(conf)); BSPTask task = (BSPTask) umbilical.getTask(id); - - peer = new BSPPeerImpl(job, conf, id, umbilical, id.id, task.splitClass, - task.split, counters); - + // TODO check + // The old BSPPeerImpl does not container superstep and state parameters + // Set to -1, TaskStatus.State.RUNNING currently according to BSPPeerImpl code + peer = new BSPPeerImpl(job, new HamaConfiguration(conf), id, umbilical, id.id, task.splitClass, task.split, counters, -1, TaskStatus.State.RUNNING); // this is a checked cast because we can only set a class via the BSPJob // class which only allows derivates of BSP. bspClass = (Class) conf.getClassByName(conf Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1564184) +++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy) @@ -19,8 +19,10 @@ import java.io.IOException; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,11 +31,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.ContainerManager; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -41,7 +47,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -51,17 +57,16 @@ private final Container allocatedContainer; private final int id; - private final ContainerManager cm; + private final ContainerManagementProtocol cm; private final Configuration conf; private String user; private final Path jobFile; private final BSPJobID jobId; - private GetContainerStatusRequest statusRequest; + private GetContainerStatusesRequest statusRequest; - public BSPTaskLauncher(int id, Container container, ContainerManager cm, - Configuration conf, Path jobFile, BSPJobID jobId) - throws YarnRemoteException { + public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm, + Configuration conf, Path jobFile, BSPJobID jobId) { this.id = id; this.cm = cm; this.conf = conf; @@ -80,11 +85,28 @@ stopAndCleanup(); } - public void stopAndCleanup() throws YarnRemoteException { - StopContainerRequest stopRequest = Records - .newRecord(StopContainerRequest.class); - stopRequest.setContainerId(allocatedContainer.getId()); - cm.stopContainer(stopRequest); + public void stopAndCleanup() { + //TODO debug here + List containerIds = new ArrayList(); + containerIds.add(allocatedContainer.getId()); + StopContainersResponse response = null; + try { + response = cm.stopContainers(StopContainersRequest + .newInstance(containerIds)); + } catch (YarnException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + if (response.getFailedRequests() != null + && response.getFailedRequests().containsKey( + allocatedContainer.getId())) { + Throwable t = response.getFailedRequests() + .get(allocatedContainer.getId()).deSerialize(); + + } } public void start() throws IOException { @@ -103,19 +125,25 @@ */ public BSPTaskStatus poll() throws Exception { - ContainerStatus lastStatus; - if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus()) - .getState() != ContainerState.COMPLETE) { - return null; - } - LOG.info(this.id + "\tLast report comes with existatus of " - + lastStatus.getExitStatus() + " and diagnose string of " - + lastStatus.getDiagnostics()); + ContainerStatus lastStatus = null; + GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest); + List ContainerStatuses= getContainerStatusesResponse.getContainerStatuses(); + // TODO old logic only process one status, debug here for more detail, ask edward which yarn version they use for integration before + for(ContainerStatus containerStatus:ContainerStatuses){ + if( + (lastStatus = containerStatus).getState() != ContainerState.COMPLETE + ){ + return null; + } + LOG.info(this.id + "\tLast report comes with existatus of " + + lastStatus.getExitStatus() + " and diagnose string of " + + lastStatus.getDiagnostics()); + } return new BSPTaskStatus(id, lastStatus.getExitStatus()); } - private GetContainerStatusRequest setupContainer( - Container allocatedContainer, ContainerManager cm, String user, int id) + private GetContainerStatusesRequest setupContainer( + Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException { LOG.info("Setting up a container for user " + user + " with id of " + id + " and containerID of " + allocatedContainer.getId() + " as " + user); @@ -122,11 +150,11 @@ // Now we setup a ContainerLaunchContext ContainerLaunchContext ctx = Records .newRecord(ContainerLaunchContext.class); + // TODO debug here, how to set container id, resource and user (token?) +// ctx.setContainerId(allocatedContainer.getId()); +// ctx.setResource(allocatedContainer.getResource()); +// ctx.setUser(user); - ctx.setContainerId(allocatedContainer.getId()); - ctx.setResource(allocatedContainer.getResource()); - ctx.setUser(user); - /* * jar */ @@ -182,11 +210,21 @@ StartContainerRequest startReq = Records .newRecord(StartContainerRequest.class); startReq.setContainerLaunchContext(ctx); - cm.startContainer(startReq); + + List list = new ArrayList(); + list.add(startReq); + StartContainersRequest requestList = StartContainersRequest.newInstance(list); + try { + cm.startContainers(requestList); + } catch (YarnException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } - GetContainerStatusRequest statusReq = Records - .newRecord(GetContainerStatusRequest.class); - statusReq.setContainerId(allocatedContainer.getId()); + GetContainerStatusesRequest statusReq = Records.newRecord(GetContainerStatusesRequest.class); + List containerList= new ArrayList(); + containerList.add(allocatedContainer.getId()); + statusReq.setContainerIds(containerList); return statusReq; } Index: yarn/src/main/java/org/apache/hama/bsp/Job.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/Job.java (revision 1564184) +++ yarn/src/main/java/org/apache/hama/bsp/Job.java (working copy) @@ -17,7 +17,7 @@ */ package org.apache.hama.bsp; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; /** * Main interface to interact with the job. Provides only getters. @@ -34,7 +34,7 @@ public JobState startJob() throws Exception; - public void cleanup() throws YarnRemoteException; + public void cleanup() throws YarnException; JobState getState(); Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1564184) +++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy) @@ -31,11 +31,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -42,9 +41,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus; @@ -66,7 +65,7 @@ private ApplicationAttemptId appAttemptId; private YarnRPC yarnRPC; - private AMRMProtocol resourceManager; + private ApplicationMasterProtocol resourceManager; private List allocatedContainers; private List releasedContainers = Collections.emptyList(); @@ -77,7 +76,7 @@ private int lastResponseID = 0; public JobImpl(ApplicationAttemptId appAttemptId, - Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC, + Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC, String jobFile, BSPJobID jobId) { super(); this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1); @@ -134,25 +133,29 @@ this.allocatedContainers = new ArrayList(numBSPTasks); while (allocatedContainers.size() < numBSPTasks) { + + //TODO debug here + AllocateRequest req = AllocateRequest.newInstance(this.lastResponseID, 0.0f, new ArrayList(), new ArrayList(), null); + + +// AllocateRequest req = AllocateRequest.newInstance( +// appAttemptId, +// lastResponseID, +// 0.0f, +// createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), +// taskMemoryInMb, priority), releasedContainers); - AllocateRequest req = BuilderUtils.newAllocateRequest( - appAttemptId, - lastResponseID, - 0.0f, - createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), - taskMemoryInMb, priority), releasedContainers); - AllocateResponse allocateResponse = resourceManager.allocate(req); - AMResponse amResponse = allocateResponse.getAMResponse(); - LOG.info("Got response! ID: " + amResponse.getResponseId() + + LOG.info("Got response! ID: " + allocateResponse.getResponseId() + " with num of containers: " - + amResponse.getAllocatedContainers().size() + + allocateResponse.getAllocatedContainers().size() + " and following resources: " - + amResponse.getAvailableResources().getMemory() + "mb"); - this.lastResponseID = amResponse.getResponseId(); + + allocateResponse.getAvailableResources().getMemory() + "mb"); + this.lastResponseID = allocateResponse.getResponseId(); // availableResources = amResponse.getAvailableResources(); - this.allocatedContainers.addAll(amResponse.getAllocatedContainers()); + this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers..."); Thread.sleep(1000l); @@ -162,12 +165,12 @@ int id = 0; for (Container allocatedContainer : allocatedContainers) { + // TODO check where to get the container status LOG.info("Launching task on a new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" - + allocatedContainer.getNodeHttpAddress() + ", containerState" - + allocatedContainer.getState() + ", containerResourceMemory" + + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); // Connect to ContainerManager on the allocated container @@ -174,8 +177,9 @@ String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort(); InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); - ContainerManager cm = (ContainerManager) yarnRPC.getProxy( - ContainerManager.class, cmAddress, conf); + + // TODO debug here + ContainerManagementProtocol cm = ClientRMProxy.createRMProxy(conf, ContainerManagementProtocol.class, cmAddress); BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id, allocatedContainer, cm, conf, jobFile, jobId); @@ -221,7 +225,7 @@ * @param id * @throws YarnRemoteException */ - private void cleanupTask(int id) throws YarnRemoteException { + private void cleanupTask(int id) throws YarnException { BSPTaskLauncher bspTaskLauncher = launchers.get(id); bspTaskLauncher.stopAndCleanup(); launchers.remove(id); @@ -229,7 +233,7 @@ } @Override - public void cleanup() throws YarnRemoteException { + public void cleanup() throws YarnException { for (BSPTaskLauncher launcher : completionQueue) { launcher.stopAndCleanup(); } @@ -247,7 +251,8 @@ // whether a particular rack/host is needed // useful for applications that are sensitive // to data locality - rsrcRequest.setHostName("*"); + // TODO YARN support a list of resource names + rsrcRequest.setResourceName("*"); // set the priority for the request Priority pri = Records.newRecord(Priority.class); Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (revision 1564184) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (working copy) @@ -24,15 +24,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.HamaConfiguration; @@ -46,20 +46,16 @@ private BSPClient client; private boolean submitted; private ApplicationReport report; - private ClientRMProtocol applicationsManager; - private YarnRPC rpc; + private ApplicationClientProtocol applicationsManager; public YARNBSPJob(HamaConfiguration conf) throws IOException { super(conf); submitClient = new YARNBSPJobClient(conf); YarnConfiguration yarnConf = new YarnConfiguration(conf); - this.rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); - - this.applicationsManager = ((ClientRMProtocol) rpc.getProxy( - ClientRMProtocol.class, rmAddress, conf)); + this.applicationsManager = ClientRMProxy.createRMProxy(yarnConf, ApplicationClientProtocol.class, rmAddress); } public void setMemoryUsedPerTaskInMb(int mem) { @@ -66,7 +62,7 @@ conf.setInt("bsp.child.mem.in.mb", mem); } - public void kill() throws YarnRemoteException { + public void kill() throws YarnException, IOException { if (submitClient != null) { KillApplicationRequest killRequest = Records .newRecord(KillApplicationRequest.class); @@ -101,8 +97,13 @@ .newRecord(GetApplicationReportRequest.class); reportRequest.setApplicationId(submitClient.getId()); - GetApplicationReportResponse reportResponse = applicationsManager - .getApplicationReport(reportRequest); + GetApplicationReportResponse reportResponse = null; + try { + reportResponse = applicationsManager.getApplicationReport(reportRequest); + } catch (YarnException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } ApplicationReport localReport = reportResponse.getApplicationReport(); long clientSuperStep = -1L; while (localReport.getFinalApplicationStatus() != null @@ -116,7 +117,12 @@ LOG.info("Current supersteps number: " + clientSuperStep); } } - reportResponse = applicationsManager.getApplicationReport(reportRequest); + try { + reportResponse = applicationsManager.getApplicationReport(reportRequest); + } catch (YarnException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } localReport = reportResponse.getApplicationReport(); Thread.sleep(3000L); @@ -133,7 +139,7 @@ } - ClientRMProtocol getApplicationsManager() { + ApplicationClientProtocol getApplicationsManager() { return applicationsManager; } Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1564184) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy) @@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -41,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hama.HamaConfiguration; @@ -77,10 +80,14 @@ LOG.debug("Retrieved username: " + s); } - GetNewApplicationRequest request = Records - .newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse response = job.getApplicationsManager() - .getNewApplication(request); + GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); + GetNewApplicationResponse response = null; + try { + response = job.getApplicationsManager().getNewApplication(request); + } catch (YarnException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } id = response.getApplicationId(); LOG.debug("Got new ApplicationId=" + id); @@ -92,6 +99,14 @@ // set the application name appContext.setApplicationName(job.getJobName()); + // we have at least 3 threads, which comsumes 1mb each, for each bsptask and + // a base usage of 100mb + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(3 * job.getNumBspTask() + + getConf().getInt("hama.appmaster.memory.mb", 100)); + LOG.info("Set memory for the application master to " + + capability.getMemory() + "mb!"); + appContext.setResource(capability); // Create a new container launch context for the AM's container ContainerLaunchContext amContainer = Records .newRecord(ContainerLaunchContext.class); @@ -152,15 +167,6 @@ amContainer.setCommands(Collections.singletonList(command)); - Resource capability = Records.newRecord(Resource.class); - // we have at least 3 threads, which comsumes 1mb each, for each bsptask and - // a base usage of 100mb - capability.setMemory(3 * job.getNumBspTask() - + getConf().getInt("hama.appmaster.memory.mb", 100)); - LOG.info("Set memory for the application master to " - + capability.getMemory() + "mb!"); - amContainer.setResource(capability); - // Set the container launch content into the ApplicationSubmissionContext appContext.setAMContainerSpec(amContainer); @@ -168,14 +174,25 @@ SubmitApplicationRequest appRequest = Records .newRecord(SubmitApplicationRequest.class); appRequest.setApplicationSubmissionContext(appContext); - job.getApplicationsManager().submitApplication(appRequest); + try { + job.getApplicationsManager().submitApplication(appRequest); + } catch (YarnException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } GetApplicationReportRequest reportRequest = Records .newRecord(GetApplicationReportRequest.class); reportRequest.setApplicationId(id); while (report == null || report.getHost().equals("N/A")) { - GetApplicationReportResponse reportResponse = job - .getApplicationsManager().getApplicationReport(reportRequest); + GetApplicationReportResponse reportResponse = null; + try { + reportResponse = job + .getApplicationsManager().getApplicationReport(reportRequest); + } catch (YarnException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } report = reportResponse.getApplicationReport(); try { Thread.sleep(1000L); @@ -189,7 +206,6 @@ return new NetworkedJob(); } - @Override protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { return Math.max(1, limitTasks); }