Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1654753)
+++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy)
@@ -303,10 +303,14 @@
BSPJob job = pJob;
job.setJobID(jobId);
- ClusterStatus clusterStatus = getClusterStatus(true);
- int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
- clusterStatus.getMaxTasks() - clusterStatus.getTasks());
+ int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS, 0);
+ if(maxTasks == 0) {
+ ClusterStatus clusterStatus = getClusterStatus(true);
+ maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+ clusterStatus.getMaxTasks() - clusterStatus.getTasks());
+ }
+
if (maxTasks < job.getNumBspTask()) {
LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with "
+ maxTasks + " tasks.");
Index: pom.xml
===================================================================
--- pom.xml (revision 1654753)
+++ pom.xml (working copy)
@@ -320,6 +320,7 @@
ml
mesos
dist
+ yarn
Index: yarn/pom.xml
===================================================================
--- yarn/pom.xml (revision 1654753)
+++ yarn/pom.xml (working copy)
@@ -19,7 +19,7 @@
org.apache.hama
hama-parent
- 0.6.3-SNAPSHOT
+ 0.7.0-SNAPSHOT
4.0.0
@@ -26,13 +26,15 @@
org.apache.hama
hama-yarn
yarn
- 0.6.3-SNAPSHOT
+ 0.7.0-SNAPSHOT
jar
- 1.2.0
+
-
+
org.apache.hama
@@ -54,27 +56,36 @@
avro
1.5.3
-
org.apache.hadoop
hadoop-yarn-api
- 0.23.1
+ ${hadoop.version}
org.apache.hadoop
hadoop-yarn-common
- 0.23.1
+ ${hadoop.version}
org.apache.hadoop
hadoop-yarn-server-tests
- 0.23.1
+ ${hadoop.version}
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ ${hadoop.version}
+
org.apache.zookeeper
zookeeper
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy)
@@ -33,13 +33,11 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -46,10 +44,12 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Job.JobState;
import org.apache.hama.bsp.sync.SyncServerRunner;
@@ -73,7 +73,7 @@
private Clock clock;
private YarnRPC yarnRPC;
- private AMRMProtocol amrmRPC;
+ private ApplicationMasterProtocol amrmRPC;
private ApplicationAttemptId appAttemptId;
private String applicationName;
@@ -159,14 +159,21 @@
*/
private void startRPCServers() throws IOException {
// start the RPC server which talks to the client
- this.clientServer = RPC.getServer(BSPClient.class, this, hostname,
- clientPort, jobConf);
+ YarnRPC rpc = YarnRPC.create(jobConf);
+ InetSocketAddress BSPClientAddress = jobConf.getSocketAddr(hostname, hostname, clientPort);
+ this.clientServer = rpc.getServer(BSPClient.class, this, BSPClientAddress, jobConf, null,
+ jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
this.clientServer.start();
// start the RPC server which talks to the tasks
this.taskServerPort = BSPNetUtils.getFreePort(10000);
- this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname,
- taskServerPort, jobConf);
+ InetSocketAddress BSPPeerProtocolAddress = jobConf
+ .getSocketAddr(hostname, hostname, taskServerPort);
+ this.taskServer = rpc.getServer(BSPPeerProtocol.class, this, BSPPeerProtocolAddress,
+ jobConf, null,
+ jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
this.taskServer.start();
// readjusting the configuration to let the tasks know where we are.
this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
@@ -191,13 +198,13 @@
* @param yarnConf
* @return a new RPC connection to the Resource Manager.
*/
- private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) {
+ private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) {
// Connect to the Scheduler of the ResourceManager.
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
- return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress,
+ return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress,
yarnConf);
}
@@ -206,13 +213,13 @@
* response which is used to launch additional containers.
*/
private static RegisterApplicationMasterResponse registerApplicationMaster(
- AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID,
+ ApplicationMasterProtocol resourceManager, ApplicationAttemptId appAttemptID,
String appMasterHostName, int appMasterRpcPort,
- String appMasterTrackingUrl) throws YarnRemoteException {
+ String appMasterTrackingUrl) throws YarnException, IOException {
RegisterApplicationMasterRequest appMasterRequest = Records
.newRecord(RegisterApplicationMasterRequest.class);
- appMasterRequest.setApplicationAttemptId(appAttemptID);
+ //appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setHost(appMasterHostName);
appMasterRequest.setRpcPort(appMasterRpcPort);
// TODO tracking URL
@@ -234,12 +241,13 @@
private static ApplicationAttemptId getApplicationAttemptId()
throws IOException {
Map envs = System.getenv();
- if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+ if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
throw new IllegalArgumentException(
"ApplicationAttemptId not set in the environment");
}
+
return ConverterUtils.toContainerId(
- envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV))
+ envs.get(Environment.CONTAINER_ID.name()))
.getApplicationAttemptId();
}
@@ -259,7 +267,7 @@
}
}
- private void cleanup() throws YarnRemoteException {
+ private void cleanup() throws YarnException, IOException {
syncServer.stop();
if (threadPool != null && !threadPool.isShutdown()) {
threadPool.shutdownNow();
@@ -268,24 +276,24 @@
taskServer.stop();
FinishApplicationMasterRequest finishReq = Records
.newRecord(FinishApplicationMasterRequest.class);
- finishReq.setAppAttemptId(appAttemptId);
+ //finishReq.setAppAttemptId(appAttemptId);
switch (job.getState()) {
case SUCCESS:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
break;
case KILLED:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED);
break;
case FAILED:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
break;
default:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
}
this.amrmRPC.finishApplicationMaster(finishReq);
}
- public static void main(String[] args) throws YarnRemoteException {
+ public static void main(String[] args) throws YarnException, IOException {
// we expect getting the qualified path of the job.xml as the first
// element in the arguments
BSPApplicationMaster master = null;
Index: yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (working copy)
@@ -37,7 +37,7 @@
private static final Log LOG = LogFactory.getLog(BSPRunner.class);
- private Configuration conf;
+ private HamaConfiguration conf;
private TaskAttemptID id;
private BSPPeerImpl, ?, ?, ?, ? extends Writable> peer;
private Counters counters = new Counters();
Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy)
@@ -19,8 +19,10 @@
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,19 +31,10 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -51,17 +44,17 @@
private final Container allocatedContainer;
private final int id;
- private final ContainerManager cm;
+ private final ContainerManagementProtocol cm;
private final Configuration conf;
private String user;
private final Path jobFile;
private final BSPJobID jobId;
- private GetContainerStatusRequest statusRequest;
+ private GetContainerStatusesRequest statusRequest;
- public BSPTaskLauncher(int id, Container container, ContainerManager cm,
+ public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm,
Configuration conf, Path jobFile, BSPJobID jobId)
- throws YarnRemoteException {
+ throws YarnException {
this.id = id;
this.cm = cm;
this.conf = conf;
@@ -80,14 +73,15 @@
stopAndCleanup();
}
- public void stopAndCleanup() throws YarnRemoteException {
- StopContainerRequest stopRequest = Records
- .newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(allocatedContainer.getId());
- cm.stopContainer(stopRequest);
+ public void stopAndCleanup() throws YarnException, IOException {
+ StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class);
+ List containerIds = new ArrayList();
+ containerIds.add(allocatedContainer.getId());
+ stopRequest.setContainerIds(containerIds);
+ cm.stopContainers(stopRequest);
}
- public void start() throws IOException {
+ public void start() throws IOException, YarnException {
LOG.info("Spawned task with id: " + this.id
+ " for allocated container id: "
+ this.allocatedContainer.getId().toString());
@@ -103,20 +97,22 @@
*/
public BSPTaskStatus poll() throws Exception {
- ContainerStatus lastStatus;
- if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
- .getState() != ContainerState.COMPLETE) {
- return null;
+ ContainerStatus lastStatus = null;
+ GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
+ List containerStatuses = getContainerStatusesResponse.getContainerStatuses();
+ for (ContainerStatus containerStatus : containerStatuses) {
+ if ((lastStatus = containerStatus).getState() != ContainerState.COMPLETE) {
+ return null;
+ }
+ LOG.info(this.id + "\tLast report comes with existatus of "
+ + lastStatus.getExitStatus() + " and diagnose string of "
+ + lastStatus.getDiagnostics());
}
- LOG.info(this.id + "\tLast report comes with existatus of "
- + lastStatus.getExitStatus() + " and diagnose string of "
- + lastStatus.getDiagnostics());
return new BSPTaskStatus(id, lastStatus.getExitStatus());
}
- private GetContainerStatusRequest setupContainer(
- Container allocatedContainer, ContainerManager cm, String user, int id)
- throws IOException {
+ private GetContainerStatusesRequest setupContainer(
+ Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException {
LOG.info("Setting up a container for user " + user + " with id of " + id
+ " and containerID of " + allocatedContainer.getId() + " as " + user);
// Now we setup a ContainerLaunchContext
@@ -123,9 +119,9 @@
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
- ctx.setContainerId(allocatedContainer.getId());
- ctx.setResource(allocatedContainer.getResource());
- ctx.setUser(user);
+// ctx.setContainerId(allocatedContainer.getId());
+// ctx.setResource(allocatedContainer.getResource());
+// ctx.setUser(user);
/*
* jar
@@ -182,11 +178,17 @@
StartContainerRequest startReq = Records
.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
- cm.startContainer(startReq);
- GetContainerStatusRequest statusReq = Records
- .newRecord(GetContainerStatusRequest.class);
- statusReq.setContainerId(allocatedContainer.getId());
+ List list = new ArrayList();
+ list.add(startReq);
+ StartContainersRequest requestList = StartContainersRequest.newInstance(list);
+ cm.startContainers(requestList);
+
+ GetContainerStatusesRequest statusReq = Records
+ .newRecord(GetContainerStatusesRequest.class);
+ List containerIds = new ArrayList();
+ containerIds.add(allocatedContainer.getId());
+ statusReq.setContainerIds(containerIds);
return statusReq;
}
Index: yarn/src/main/java/org/apache/hama/bsp/Job.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/Job.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/Job.java (working copy)
@@ -17,8 +17,10 @@
*/
package org.apache.hama.bsp;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import java.io.IOException;
+
/**
* Main interface to interact with the job. Provides only getters.
*/
@@ -34,7 +36,7 @@
public JobState startJob() throws Exception;
- public void cleanup() throws YarnRemoteException;
+ public void cleanup() throws YarnException, IOException;
JobState getState();
Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy)
@@ -17,6 +17,7 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -31,11 +32,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -42,9 +42,8 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
@@ -66,7 +65,7 @@
private ApplicationAttemptId appAttemptId;
private YarnRPC yarnRPC;
- private AMRMProtocol resourceManager;
+ private ApplicationMasterProtocol resourceManager;
private List allocatedContainers;
private List releasedContainers = Collections.emptyList();
@@ -77,7 +76,7 @@
private int lastResponseID = 0;
public JobImpl(ApplicationAttemptId appAttemptId,
- Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
+ Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC,
String jobFile, BSPJobID jobId) {
super();
this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1);
@@ -135,24 +134,27 @@
this.allocatedContainers = new ArrayList(numBSPTasks);
while (allocatedContainers.size() < numBSPTasks) {
- AllocateRequest req = BuilderUtils.newAllocateRequest(
- appAttemptId,
- lastResponseID,
- 0.0f,
- createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
- taskMemoryInMb, priority), releasedContainers);
+// AllocateRequest req = BuilderUtils.newAllocateRequest(
+// appAttemptId,
+// lastResponseID,
+// 0.0f,
+// createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
+// taskMemoryInMb, priority), releasedContainers);
+ AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f,
+ createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb,
+ priority), releasedContainers, null);
+
AllocateResponse allocateResponse = resourceManager.allocate(req);
- AMResponse amResponse = allocateResponse.getAMResponse();
- LOG.info("Got response! ID: " + amResponse.getResponseId()
+ LOG.info("Got response! ID: " + allocateResponse.getResponseId()
+ " with num of containers: "
- + amResponse.getAllocatedContainers().size()
+ + allocateResponse.getAllocatedContainers().size()
+ " and following resources: "
- + amResponse.getAvailableResources().getMemory() + "mb");
- this.lastResponseID = amResponse.getResponseId();
+ + allocateResponse.getAvailableResources().getMemory() + "mb");
+ this.lastResponseID = allocateResponse.getResponseId();
// availableResources = amResponse.getAvailableResources();
- this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
+ this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Waiting to allocate "
+ (numBSPTasks - allocatedContainers.size()) + " more containers...");
Thread.sleep(1000l);
@@ -166,8 +168,7 @@
+ allocatedContainer.getId() + ", containerNode="
+ allocatedContainer.getNodeId().getHost() + ":"
+ allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
- + allocatedContainer.getNodeHttpAddress() + ", containerState"
- + allocatedContainer.getState() + ", containerResourceMemory"
+ + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
// Connect to ContainerManager on the allocated container
@@ -174,8 +175,8 @@
String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":"
+ allocatedContainer.getNodeId().getPort();
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
- ContainerManager cm = (ContainerManager) yarnRPC.getProxy(
- ContainerManager.class, cmAddress, conf);
+ ContainerManagementProtocol cm = (ContainerManagementProtocol) yarnRPC.getProxy(
+ ContainerManagementProtocol.class, cmAddress, conf);
BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
allocatedContainer, cm, conf, jobFile, jobId);
@@ -219,9 +220,9 @@
* removes the task from the launcher so that we won't have to stop it twice.
*
* @param id
- * @throws YarnRemoteException
+ * @throws YarnException
*/
- private void cleanupTask(int id) throws YarnRemoteException {
+ private void cleanupTask(int id) throws YarnException, IOException {
BSPTaskLauncher bspTaskLauncher = launchers.get(id);
bspTaskLauncher.stopAndCleanup();
launchers.remove(id);
@@ -229,7 +230,7 @@
}
@Override
- public void cleanup() throws YarnRemoteException {
+ public void cleanup() throws YarnException, IOException {
for (BSPTaskLauncher launcher : completionQueue) {
launcher.stopAndCleanup();
}
@@ -247,7 +248,7 @@
// whether a particular rack/host is needed
// useful for applications that are sensitive
// to data locality
- rsrcRequest.setHostName("*");
+ rsrcRequest.setResourceName("*");
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (working copy)
@@ -24,7 +24,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -31,7 +31,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.HamaConfiguration;
@@ -46,7 +46,7 @@
private BSPClient client;
private boolean submitted;
private ApplicationReport report;
- private ClientRMProtocol applicationsManager;
+ private ApplicationClientProtocol applicationsManager;
private YarnRPC rpc;
public YARNBSPJob(HamaConfiguration conf) throws IOException {
@@ -58,8 +58,8 @@
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
- this.applicationsManager = ((ClientRMProtocol) rpc.getProxy(
- ClientRMProtocol.class, rmAddress, conf));
+ this.applicationsManager = ((ApplicationClientProtocol) rpc.getProxy(
+ ApplicationClientProtocol.class, rmAddress, conf));
}
public void setMemoryUsedPerTaskInMb(int mem) {
@@ -66,7 +66,7 @@
conf.setInt("bsp.child.mem.in.mb", mem);
}
- public void kill() throws YarnRemoteException {
+ public void kill() throws YarnException, IOException {
if (submitClient != null) {
KillApplicationRequest killRequest = Records
.newRecord(KillApplicationRequest.class);
@@ -95,45 +95,44 @@
this.submit();
}
- client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID,
- NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf);
- GetApplicationReportRequest reportRequest = Records
- .newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(submitClient.getId());
+ client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID, NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf);
+ GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
+ reportRequest.setApplicationId(submitClient.getId());
- GetApplicationReportResponse reportResponse = applicationsManager
- .getApplicationReport(reportRequest);
- ApplicationReport localReport = reportResponse.getApplicationReport();
- long clientSuperStep = -1L;
- while (localReport.getFinalApplicationStatus() != null
- && localReport.getFinalApplicationStatus().ordinal() == 0) {
- LOG.debug("currently in state: "
- + localReport.getFinalApplicationStatus());
- if (verbose) {
- long remoteSuperStep = client.getCurrentSuperStep().get();
- if (clientSuperStep < remoteSuperStep) {
- clientSuperStep = remoteSuperStep;
- LOG.info("Current supersteps number: " + clientSuperStep);
+ try {
+ GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest);
+ ApplicationReport localReport = reportResponse.getApplicationReport();
+ long clientSuperStep = -1L;
+ while (localReport.getFinalApplicationStatus() != null && localReport.getFinalApplicationStatus().ordinal() == 0) {
+ LOG.debug("currently in state: " + localReport.getFinalApplicationStatus());
+ if (verbose) {
+ long remoteSuperStep = client.getCurrentSuperStep().get();
+ if (clientSuperStep < remoteSuperStep) {
+ clientSuperStep = remoteSuperStep;
+ LOG.info("Current supersteps number: " + clientSuperStep);
+ }
}
+ reportResponse = applicationsManager.getApplicationReport(reportRequest);
+ localReport = reportResponse.getApplicationReport();
+
+ Thread.sleep(3000L);
}
- reportResponse = applicationsManager.getApplicationReport(reportRequest);
- localReport = reportResponse.getApplicationReport();
- Thread.sleep(3000L);
- }
- if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
- LOG.info("Job succeeded!");
- return true;
- } else {
- LOG.info("Job failed with status: "
- + localReport.getFinalApplicationStatus().toString() + "!");
+ if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
+ LOG.info("Job succeeded!");
+ return true;
+ } else {
+ LOG.info("Job failed with status: " + localReport.getFinalApplicationStatus().toString() + "!");
+ return false;
+ }
+ } catch (YarnException e) {
+ e.printStackTrace();
return false;
}
-
}
- ClientRMProtocol getApplicationsManager() {
+ ApplicationClientProtocol getApplicationsManager() {
return applicationsManager;
}
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy)
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.HamaConfiguration;
@@ -77,122 +78,105 @@
LOG.debug("Retrieved username: " + s);
}
- GetNewApplicationRequest request = Records
- .newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse response = job.getApplicationsManager()
- .getNewApplication(request);
- id = response.getApplicationId();
- LOG.debug("Got new ApplicationId=" + id);
+ try {
+ GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
- // Create a new ApplicationSubmissionContext
- ApplicationSubmissionContext appContext = Records
- .newRecord(ApplicationSubmissionContext.class);
- // set the ApplicationId
- appContext.setApplicationId(this.id);
- // set the application name
- appContext.setApplicationName(job.getJobName());
- // Create a new container launch context for the AM's container
- ContainerLaunchContext amContainer = Records
- .newRecord(ContainerLaunchContext.class);
+ GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request);
+ id = response.getApplicationId();
+ LOG.debug("Got new ApplicationId=" + id);
- // Define the local resources required
- Map localResources = new HashMap();
- // Lets assume the jar we need for our ApplicationMaster is available in
- // HDFS at a certain known path to us and we want to make it available to
- // the ApplicationMaster in the launched container
- if (job.getJar() == null) {
- throw new IllegalArgumentException(
- "Jar must be set in order to run the application!");
- }
- Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar");
- fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath);
- LOG.debug("Copying app jar to " + jarPath);
- getConf()
- .set(
- "bsp.jar",
- jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString());
- FileStatus jarStatus = fs.getFileStatus(jarPath);
- LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
- amJarRsrc.setType(LocalResourceType.FILE);
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
- amJarRsrc.setTimestamp(jarStatus.getModificationTime());
- amJarRsrc.setSize(jarStatus.getLen());
- // this creates a symlink in the working directory
- localResources.put("AppMaster.jar", amJarRsrc);
- // Set the local resources into the launch context
- amContainer.setLocalResources(localResources);
+ // Create a new ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+ // set the ApplicationId
+ appContext.setApplicationId(this.id);
+ // set the application name
+ appContext.setApplicationName(job.getJobName());
- // Set up the environment needed for the launch context
- Map env = new HashMap();
- // Assuming our classes or jars are available as local resources in the
- // working directory from which the command will be run, we need to append
- // "." to the path.
- // By default, all the hadoop specific classpaths will already be available
- // in $CLASSPATH, so we should be careful not to overwrite it.
- String classPathEnv = "$CLASSPATH:./*:";
- env.put("CLASSPATH", classPathEnv);
- amContainer.setEnvironment(env);
+ // Create a new container launch context for the AM's container
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
- // Construct the command to be executed on the launched container
- String command = "${JAVA_HOME}"
- + "/bin/java -cp "
- + classPathEnv
- + " "
- + BSPApplicationMaster.class.getCanonicalName()
- + " "
- + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/stderr";
+ // Define the local resources required
+ Map localResources = new HashMap();
+ // Lets assume the jar we need for our ApplicationMaster is available in
+ // HDFS at a certain known path to us and we want to make it available to
+ // the ApplicationMaster in the launched container
+ if (job.getJar() == null) {
+ throw new IllegalArgumentException("Jar must be set in order to run the application!");
+ }
+ Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar");
+ fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath);
+ LOG.debug("Copying app jar to " + jarPath);
+ getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+ FileStatus jarStatus = fs.getFileStatus(jarPath);
+ LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+ amJarRsrc.setType(LocalResourceType.FILE);
+ amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+ amJarRsrc.setTimestamp(jarStatus.getModificationTime());
+ amJarRsrc.setSize(jarStatus.getLen());
+ // this creates a symlink in the working directory
+ localResources.put("AppMaster.jar", amJarRsrc);
+ // Set the local resources into the launch context
+ amContainer.setLocalResources(localResources);
- LOG.debug("Start command: " + command);
+ // Set up the environment needed for the launch context
+ Map env = new HashMap();
+ // Assuming our classes or jars are available as local resources in the
+ // working directory from which the command will be run, we need to append
+ // "." to the path.
+ // By default, all the hadoop specific classpaths will already be available
+ // in $CLASSPATH, so we should be careful not to overwrite it.
+ String classPathEnv = "$CLASSPATH:./*:";
+ env.put("CLASSPATH", classPathEnv);
+ amContainer.setEnvironment(env);
- amContainer.setCommands(Collections.singletonList(command));
+ // Construct the command to be executed on the launched container
+ String command = "${JAVA_HOME}" + "/bin/java -cp " + classPathEnv + " " + BSPApplicationMaster.class.getCanonicalName() + " " + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
- Resource capability = Records.newRecord(Resource.class);
- // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
- // a base usage of 100mb
- capability.setMemory(3 * job.getNumBspTask()
- + getConf().getInt("hama.appmaster.memory.mb", 100));
- LOG.info("Set memory for the application master to "
- + capability.getMemory() + "mb!");
- amContainer.setResource(capability);
+ LOG.debug("Start command: " + command);
- // Set the container launch content into the ApplicationSubmissionContext
- appContext.setAMContainerSpec(amContainer);
+ amContainer.setCommands(Collections.singletonList(command));
- // Create the request to send to the ApplicationsManager
- SubmitApplicationRequest appRequest = Records
- .newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
- job.getApplicationsManager().submitApplication(appRequest);
+ Resource capability = Records.newRecord(Resource.class);
+ // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
+ // a base usage of 100mb
+ capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100));
+ LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!");
+ //amContainer.setResource(capability);
- GetApplicationReportRequest reportRequest = Records
- .newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(id);
- while (report == null || report.getHost().equals("N/A")) {
- GetApplicationReportResponse reportResponse = job
- .getApplicationsManager().getApplicationReport(reportRequest);
- report = reportResponse.getApplicationReport();
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- LOG.error(
- "Got interrupted while waiting for a response report from AM.", e);
+ // Set the container launch content into the ApplicationSubmissionContext
+ appContext.setResource(capability);
+ appContext.setAMContainerSpec(amContainer);
+
+ // Create the request to send to the ApplicationsManager
+ SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ job.getApplicationsManager().submitApplication(appRequest);
+
+ GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
+ reportRequest.setApplicationId(id);
+ while (report == null || report.getHost().equals("N/A")) {
+ GetApplicationReportResponse reportResponse = job.getApplicationsManager().getApplicationReport(reportRequest);
+ report = reportResponse.getApplicationReport();
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ LOG.error("Got interrupted while waiting for a response report from AM.", e);
+ }
}
+ LOG.info("Got report: " + report.getApplicationId() + " " + report.getHost() + ":" + report.getRpcPort());
+ return new NetworkedJob();
+ } catch (YarnException e) {
+ e.printStackTrace();
+ return null;
}
- LOG.info("Got report: " + report.getApplicationId() + " "
- + report.getHost() + ":" + report.getRpcPort());
- return new NetworkedJob();
}
- @Override
- protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
- return Math.max(1, limitTasks);
- }
+// @Override
+// protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
+// return Math.max(1, limitTasks);
+// }
@Override
public Path getSystemDir() {
Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1654753)
+++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy)
@@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
@@ -57,8 +58,11 @@
InterruptedException, ClassNotFoundException {
HamaConfiguration conf = new HamaConfiguration();
// TODO some keys that should be within a conf
- conf.set("yarn.resourcemanager.address", "0.0.0.0:8040");
- conf.set("bsp.local.dir", "/tmp/bsp-yarn/");
+ conf.set("yarn.resourcemanager.address", "slave1.hama.com:8050");
+ conf.set("bsp.user.name", "minho");
+ conf.set("bsp.local.dir", "/root/");
+ conf.set("bsp.working.dir", "/user/root/bsp-yarn/");
+ conf.setInt(Constants.MAX_TASKS, 10);
YARNBSPJob job = new YARNBSPJob(conf);
job.setBspClass(HelloBSP.class);