Index: conf/hama-site.xml
===================================================================
--- conf/hama-site.xml (revision 1659147)
+++ conf/hama-site.xml (working copy)
@@ -22,4 +22,32 @@
*/
-->
+
+ bsp.master.address
+ host1.mydomain.com:40000
+ The address of the bsp master server. Either the
+ literal string "local" or a host:port for distributed mode
+
+
+
+
+ fs.default.name
+ hdfs://host1.mydomain.com:9000/
+
+ The name of the default file system. Either the literal string
+ "local" or a host:port for HDFS.
+
+
+
+
+ hama.zookeeper.quorum
+ localhost
+ Comma separated list of servers in the ZooKeeper Quorum.
+ For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
+ By default this is set to localhost for local and pseudo-distributed modes
+ of operation. For a fully-distributed setup, this should be set to a full
+ list of ZooKeeper quorum servers. If HAMA_MANAGES_ZK is set in hama-env.sh
+ this is the list of servers which we will start/stop zookeeper on.
+
+
Index: core/src/main/java/org/apache/hama/bsp/BSPJob.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/BSPJob.java (revision 1659147)
+++ core/src/main/java/org/apache/hama/bsp/BSPJob.java (working copy)
@@ -263,8 +263,7 @@
@SuppressWarnings({ "rawtypes" })
public InputFormat getInputFormat() {
return ReflectionUtils.newInstance(
- conf.getClass(Constants.INPUT_FORMAT_CLASS, TextInputFormat.class,
- InputFormat.class), conf);
+ conf.getClass(Constants.INPUT_FORMAT_CLASS, TextInputFormat.class, InputFormat.class), conf);
}
@SuppressWarnings({ "rawtypes" })
Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1659147)
+++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy)
@@ -303,9 +303,12 @@
BSPJob job = pJob;
job.setJobID(jobId);
- ClusterStatus clusterStatus = getClusterStatus(true);
- int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
- clusterStatus.getMaxTasks() - clusterStatus.getTasks());
+ int maxTasks = 0;
+ if (this.jobSubmitClient != null) {
+ ClusterStatus clusterStatus = getClusterStatus(true);
+ maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+ clusterStatus.getMaxTasks() - clusterStatus.getTasks());
+ }
if (maxTasks < job.getNumBspTask()) {
LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with "
Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1659147)
+++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy)
@@ -249,6 +249,7 @@
try {
if (splitClass != null) {
inputSplit = (InputSplit) ReflectionUtils.newInstance(
+ //Class.forName(splitClass), getConfiguration());
getConfiguration().getClassByName(splitClass), getConfiguration());
}
} catch (ClassNotFoundException exp) {
Index: core/src/main/java/org/apache/hama/bsp/NullInputFormat.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/NullInputFormat.java (revision 1659147)
+++ core/src/main/java/org/apache/hama/bsp/NullInputFormat.java (working copy)
@@ -81,26 +81,4 @@
}
}
-
- public static class NullInputSplit implements InputSplit {
- @Override
- public long getLength() {
- return 0;
- }
-
- @Override
- public String[] getLocations() {
- String[] locs = {};
- return locs;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- }
- }
-
}
Index: core/src/main/java/org/apache/hama/bsp/NullInputSplit.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/NullInputSplit.java (revision 0)
+++ core/src/main/java/org/apache/hama/bsp/NullInputSplit.java (working copy)
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class NullInputSplit implements InputSplit {
+ @Override
+ public long getLength() {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() {
+ String[] locs = {};
+ return locs;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+}
\ No newline at end of file
Index: pom.xml
===================================================================
--- pom.xml (revision 1659147)
+++ pom.xml (working copy)
@@ -319,6 +319,7 @@
examples
ml
mesos
+ yarn
dist
Index: yarn/pom.xml
===================================================================
--- yarn/pom.xml (revision 1659147)
+++ yarn/pom.xml (working copy)
@@ -19,7 +19,7 @@
org.apache.hama
hama-parent
- 0.6.3-SNAPSHOT
+ 0.7.0-SNAPSHOT
4.0.0
@@ -26,13 +26,9 @@
org.apache.hama
hama-yarn
yarn
- 0.6.3-SNAPSHOT
+ 0.7.0-SNAPSHOT
jar
-
- 1.2.0
-
-
org.apache.hama
@@ -54,27 +50,41 @@
avro
1.5.3
-
org.apache.hadoop
hadoop-yarn-api
- 0.23.1
+ ${hadoop.version}
org.apache.hadoop
hadoop-yarn-common
- 0.23.1
+ ${hadoop.version}
org.apache.hadoop
hadoop-yarn-server-tests
- 0.23.1
+ ${hadoop.version}
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ ${hadoop.version}
+
org.apache.zookeeper
zookeeper
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ ${hadoop.version}
+
Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy)
@@ -18,8 +18,11 @@
package org.apache.hama.bsp;
import java.io.DataInputStream;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -33,13 +36,16 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
+//import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -46,18 +52,27 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+//import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Job.JobState;
+import org.apache.hama.bsp.NullInputSplit;
import org.apache.hama.bsp.sync.SyncServerRunner;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.Server;
import org.apache.hama.util.BSPNetUtils;
+
/**
* BSPApplicationMaster is an application master for Apache Hamas BSP Engine.
*/
@@ -73,8 +88,10 @@
private Clock clock;
private YarnRPC yarnRPC;
- private AMRMProtocol amrmRPC;
+ private RPC rpc;
+ private ApplicationMasterProtocol amrmRPC;
+
private ApplicationAttemptId appAttemptId;
private String applicationName;
private long startTime;
@@ -104,8 +121,12 @@
}
this.jobFile = args[0];
+ LOG.info("jobFile >>>> " + jobFile);
this.localConf = new YarnConfiguration();
this.jobConf = getSubmitConfiguration(jobFile);
+ fs = FileSystem.get(jobConf);
+ LOG.info("jobConf : " + jobConf.get(Constants.INPUT_FORMAT_CLASS));
+ fs.copyToLocalFile(false, new Path(jobFile), new Path("/root/job2.xml"));
this.applicationName = jobConf.get("bsp.job.name",
"");
@@ -125,15 +146,19 @@
this.clientPort = BSPNetUtils.getFreePort(12000);
// start our synchronization service
- startSyncServer();
+ // startSyncServer();
startRPCServers();
+ fs.copyToLocalFile(false, new Path(jobFile), new Path("/root/jobAfterStartRPC.xml"));
/*
* Make sure that this executes after the start the RPC servers, because we
* are readjusting the configuration.
*/
+
rewriteSubmitConfiguration(jobFile, jobConf);
+ fs.copyToLocalFile(false, new Path(jobFile), new Path("/root/jobAfterRewrite.xml"));
+
String jobSplit = jobConf.get("bsp.job.split.file");
splits = null;
if (jobSplit != null) {
@@ -159,14 +184,23 @@
*/
private void startRPCServers() throws IOException {
// start the RPC server which talks to the client
- this.clientServer = RPC.getServer(BSPClient.class, this, hostname,
- clientPort, jobConf);
+ InetSocketAddress BSPClientAddress = jobConf.getSocketAddr(hostname, hostname, clientPort);
+ LOG.info("BSPClientAddress >>>> " + BSPClientAddress);
+ this.clientServer = rpc.getServer(BSPClient.class, hostname, clientPort, jobConf);
+// this.clientServer = rpc.getServer(BSPClient.class, this, BSPClientAddress, jobConf, null,
+// jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+// YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
this.clientServer.start();
// start the RPC server which talks to the tasks
- this.taskServerPort = BSPNetUtils.getFreePort(10000);
- this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname,
- taskServerPort, jobConf);
+ this.taskServerPort = 10008; // BSPNetUtils.getFreePort(10000);
+ InetSocketAddress BSPPeerProtocolAddress = jobConf
+ .getSocketAddr("127.0.0.1", "127.0.0.1", taskServerPort);
+ this.taskServer = RPC.getServer(this, "127.0.0.1", taskServerPort, jobConf);
+// this.taskServer = rpc.getServer(BSPPeerProtocol.class, this, BSPPeerProtocolAddress,
+// jobConf, null,
+// jobConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+// YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
this.taskServer.start();
// readjusting the configuration to let the tasks know where we are.
this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
@@ -191,28 +225,57 @@
* @param yarnConf
* @return a new RPC connection to the Resource Manager.
*/
- private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) {
+ private ApplicationMasterProtocol getYarnRPCConnection(Configuration yarnConf) throws IOException {
// Connect to the Scheduler of the ResourceManager.
- InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ LOG.info("current User & appAttemptId : " + currentUser + " " + appAttemptId.toString());
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ final InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+ Token extends TokenIdentifier> amRMToken = setupAndReturnAMRMToken(rmAddress, credentials.getAllTokens());
+ currentUser.addToken(amRMToken);
+ final Configuration conf = yarnConf;
+ ApplicationMasterProtocol client = currentUser
+ .doAs(new PrivilegedAction() {
+ @Override
+ public ApplicationMasterProtocol run() {
+ return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);
+ }
+ });
LOG.info("Connecting to ResourceManager at " + rmAddress);
- return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress,
- yarnConf);
+ return client;
+// return (ApplicationMasterProtocol)yarnRPC
+// .getProxy(ApplicationMasterProtocol.class, rmAddress, yarnConf);
}
+ @SuppressWarnings("unchecked")
+ private Token extends TokenIdentifier> setupAndReturnAMRMToken(
+ InetSocketAddress rmBindAddress,
+ Collection> allTokens) {
+ for (Token extends TokenIdentifier> token : allTokens) {
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ SecurityUtil.setTokenService(token, rmBindAddress);
+ return token;
+ }
+ }
+ return null;
+ }
+
+
/**
* Registers this application master with the Resource Manager and retrieves a
* response which is used to launch additional containers.
*/
private static RegisterApplicationMasterResponse registerApplicationMaster(
- AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID,
+ ApplicationMasterProtocol resourceManager, ApplicationAttemptId appAttemptID,
String appMasterHostName, int appMasterRpcPort,
- String appMasterTrackingUrl) throws YarnRemoteException {
+ String appMasterTrackingUrl) throws YarnException, IOException {
+
RegisterApplicationMasterRequest appMasterRequest = Records
.newRecord(RegisterApplicationMasterRequest.class);
- appMasterRequest.setApplicationAttemptId(appAttemptID);
+ //appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setHost(appMasterHostName);
appMasterRequest.setRpcPort(appMasterRpcPort);
// TODO tracking URL
@@ -219,7 +282,7 @@
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
RegisterApplicationMasterResponse response = resourceManager
.registerApplicationMaster(appMasterRequest);
- LOG.debug("ApplicationMaster has maximum resource capability of: "
+ LOG.info("ApplicationMaster has maximum resource capability of: "
+ response.getMaximumResourceCapability().getMemory());
return response;
}
@@ -234,12 +297,13 @@
private static ApplicationAttemptId getApplicationAttemptId()
throws IOException {
Map envs = System.getenv();
- if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+ if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
throw new IllegalArgumentException(
"ApplicationAttemptId not set in the environment");
}
+
return ConverterUtils.toContainerId(
- envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV))
+ envs.get(Environment.CONTAINER_ID.name()))
.getApplicationAttemptId();
}
@@ -247,6 +311,7 @@
JobState finalState = null;
try {
job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId);
+ LOG.info("job Conf : " + jobConf.get(Constants.INPUT_FORMAT_CLASS));
finalState = job.startJob();
} finally {
if (finalState != null) {
@@ -259,8 +324,9 @@
}
}
- private void cleanup() throws YarnRemoteException {
- syncServer.stop();
+ private void cleanup() throws YarnException, IOException {
+ // syncServer.stop();
+
if (threadPool != null && !threadPool.isShutdown()) {
threadPool.shutdownNow();
}
@@ -268,29 +334,30 @@
taskServer.stop();
FinishApplicationMasterRequest finishReq = Records
.newRecord(FinishApplicationMasterRequest.class);
- finishReq.setAppAttemptId(appAttemptId);
+ //finishReq.setAppAttemptId(appAttemptId);
switch (job.getState()) {
case SUCCESS:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
break;
case KILLED:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.KILLED);
break;
case FAILED:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
break;
default:
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
}
this.amrmRPC.finishApplicationMaster(finishReq);
}
- public static void main(String[] args) throws YarnRemoteException {
+ public static void main(String[] args) throws YarnException, IOException {
// we expect getting the qualified path of the job.xml as the first
// element in the arguments
BSPApplicationMaster master = null;
try {
master = new BSPApplicationMaster(args);
+ LOG.info("master : " + master.getCurrentSuperStep());
master.start();
} catch (Exception e) {
LOG.fatal("Error starting BSPApplicationMaster", e);
@@ -326,6 +393,8 @@
FSDataOutputStream out = fs.create(jobSubmitPath);
conf.writeXml(out);
out.close();
+ fs.copyToLocalFile(false, jobSubmitPath, new Path("/root/job1.xml"));
+
LOG.info("Written new configuration back to " + path);
}
@@ -372,7 +441,7 @@
@Override
public Task getTask(TaskAttemptID taskid) throws IOException {
BSPJobClient.RawSplit assignedSplit = null;
- String splitName = NullInputFormat.NullInputSplit.class.getCanonicalName();
+ String splitName = NullInputSplit.class.getCanonicalName();
if (splits != null) {
assignedSplit = splits[taskid.id];
splitName = assignedSplit.getClassName();
Index: yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (working copy)
@@ -22,10 +22,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hama.ipc.RPC;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
@@ -37,7 +36,7 @@
private static final Log LOG = LogFactory.getLog(BSPRunner.class);
- private Configuration conf;
+ private HamaConfiguration conf;
private TaskAttemptID id;
private BSPPeerImpl, ?, ?, ?, ? extends Writable> peer;
private Counters counters = new Counters();
@@ -57,8 +56,8 @@
int port = BSPNetUtils.getFreePort(taskAttemptId * 2 + 16000);
conf.setInt(Constants.PEER_PORT, port);
conf.set(Constants.PEER_HOST, BSPNetUtils.getCanonicalHostname());
+ String umbilicalAddress = "127.0.0.1:10008";
- String umbilicalAddress = conf.get("hama.umbilical.address");
if (umbilicalAddress == null || umbilicalAddress.isEmpty()
|| !umbilicalAddress.contains(":")) {
throw new IllegalArgumentException(
@@ -69,13 +68,25 @@
InetSocketAddress address = new InetSocketAddress(hostPort[0],
Integer.valueOf(hostPort[1]));
- BSPPeerProtocol umbilical = RPC.getProxy(BSPPeerProtocol.class,
- HamaRPCProtocolVersion.versionID, address, conf);
-
+ BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
+ BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, address,
+ conf);
+
BSPJob job = new BSPJob(new HamaConfiguration(conf));
+ LOG.info("InputFormat : " + job.getInputFormat());
+ job.setInputFormat(NullInputFormat.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ conf.set("hama.zookeeper.quorum", "localhost");
+ conf.set("hama.zookeeper.property.clientPort", "2181");
BSPTask task = (BSPTask) umbilical.getTask(id);
+ LOG.info("TaskAttemptID : " + id);
+ LOG.info("BSPTask splitClass : " + task.splitClass);
+ //conf.set("bsp.input.format.class", NullInputFormat.class.getCanonicalName());
+
+ LOG.info("bsp.input.format.class : " + job.getConfiguration().get("bsp.input.format.class"));
+
peer = new BSPPeerImpl(job, conf, id, umbilical, id.id, task.splitClass,
task.split, counters);
Index: yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (working copy)
@@ -17,10 +17,12 @@
*/
package org.apache.hama.bsp;
+import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,22 +30,25 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.Server;
+import org.apache.hama.util.BSPNetUtils;
public class BSPTaskLauncher {
@@ -51,17 +56,20 @@
private final Container allocatedContainer;
private final int id;
- private final ContainerManager cm;
+ private final ContainerManagementProtocol cm;
private final Configuration conf;
private String user;
private final Path jobFile;
private final BSPJobID jobId;
- private GetContainerStatusRequest statusRequest;
+ private ByteBuffer allTokens;
+ private UserGroupInformation appSubmitterUgi;
- public BSPTaskLauncher(int id, Container container, ContainerManager cm,
+ private GetContainerStatusesRequest statusRequest;
+
+ public BSPTaskLauncher(int id, Container container, ContainerManagementProtocol cm,
Configuration conf, Path jobFile, BSPJobID jobId)
- throws YarnRemoteException {
+ throws YarnException {
this.id = id;
this.cm = cm;
this.conf = conf;
@@ -71,7 +79,8 @@
// FIXME why does this contain mapreduce here?
this.user = conf.get("bsp.user.name");
if (this.user == null) {
- this.user = conf.get("mapreduce.job.user.name");
+ //this.user = conf.get("mapreduce.job.user.name");
+ user = System.getenv(ApplicationConstants.Environment.USER.name());
}
}
@@ -80,14 +89,15 @@
stopAndCleanup();
}
- public void stopAndCleanup() throws YarnRemoteException {
- StopContainerRequest stopRequest = Records
- .newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(allocatedContainer.getId());
- cm.stopContainer(stopRequest);
+ public void stopAndCleanup() throws YarnException, IOException {
+ StopContainersRequest stopRequest = Records.newRecord(StopContainersRequest.class);
+ List containerIds = new ArrayList();
+ containerIds.add(allocatedContainer.getId());
+ stopRequest.setContainerIds(containerIds);
+ cm.stopContainers(stopRequest);
}
- public void start() throws IOException {
+ public void start() throws IOException, YarnException {
LOG.info("Spawned task with id: " + this.id
+ " for allocated container id: "
+ this.allocatedContainer.getId().toString());
@@ -103,20 +113,28 @@
*/
public BSPTaskStatus poll() throws Exception {
- ContainerStatus lastStatus;
- if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
- .getState() != ContainerState.COMPLETE) {
- return null;
+ ContainerStatus lastStatus = null;
+ GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
+ List containerStatuses = getContainerStatusesResponse.getContainerStatuses();
+ LOG.info("getContainerStatusesREsponse : " + getContainerStatusesResponse);
+ LOG.info("ContainerStatuses : " + containerStatuses);
+ LOG.info("ContainerStatuses Size : " + containerStatuses.size());
+ for (ContainerStatus containerStatus : containerStatuses) {
+ LOG.info("ContainerStatus getStatus : " + containerStatus.getState());
+ if ((lastStatus = containerStatus).getState() != ContainerState.COMPLETE) {
+ return null;
+ }
+ LOG.info(this.id + "\tLast report comes with exitstatus of " + lastStatus.getExitStatus() + " and diagnose string of " + lastStatus.getDiagnostics());
}
- LOG.info(this.id + "\tLast report comes with existatus of "
- + lastStatus.getExitStatus() + " and diagnose string of "
- + lastStatus.getDiagnostics());
+
+ if (lastStatus == null)
+ return new BSPTaskStatus(id, -1000);
+
return new BSPTaskStatus(id, lastStatus.getExitStatus());
}
- private GetContainerStatusRequest setupContainer(
- Container allocatedContainer, ContainerManager cm, String user, int id)
- throws IOException {
+ private GetContainerStatusesRequest setupContainer(
+ Container allocatedContainer, ContainerManagementProtocol cm, String user, int id) throws IOException, YarnException {
LOG.info("Setting up a container for user " + user + " with id of " + id
+ " and containerID of " + allocatedContainer.getId() + " as " + user);
// Now we setup a ContainerLaunchContext
@@ -123,16 +141,48 @@
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
- ctx.setContainerId(allocatedContainer.getId());
- ctx.setResource(allocatedContainer.getResource());
- ctx.setUser(user);
+// ctx.setContainerId(allocatedContainer.getId());
+// ctx.setResource(allocatedContainer.getResource());
+// ctx.setUser(user);
+ // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
+ // are marked as LimitedPrivate
+
+
/*
+ Credentials credentials =
+ UserGroupInformation.getCurrentUser().getCredentials();/
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ // Now remove the AM->RM token so that containers cannot access it.
+ Iterator> iter = credentials.getAllTokens().iterator();
+ LOG.info("Executing with tokens:");
+ while (iter.hasNext()) {
+ Token> token = iter.next();
+ LOG.info("token is " + token);
+ if (token.getKind().equals(ContainerTokenIdentifier.KIND)) {
+ iter.remove();
+ }
+ }
+ allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ ctx.setTokens(allTokens.duplicate());
+
+
+ // Create appSubmitterUgi and add original tokens to it
+ String appSubmitterUserName =
+ System.getenv(ApplicationConstants.Environment.USER.name());
+ appSubmitterUgi =
+ UserGroupInformation.createRemoteUser(appSubmitterUserName);
+ appSubmitterUgi.addCredentials(credentials);
+ */
+
+ /*
* jar
*/
+ Map localResources = new HashMap();
LocalResource packageResource = Records.newRecord(LocalResource.class);
FileSystem fs = FileSystem.get(conf);
- Path packageFile = new Path(conf.get("bsp.jar"));
+ Path packageFile = new Path(System.getenv("HAMAYARNJARLOCATION"));
// FIXME there seems to be a problem with the converter utils and URL
// transformation
URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
@@ -147,14 +197,32 @@
FileStatus fileStatus = fs.getFileStatus(packageFile);
packageResource.setResource(packageUrl);
- packageResource.setSize(fileStatus.getLen());
- packageResource.setTimestamp(fileStatus.getModificationTime());
- packageResource.setType(LocalResourceType.ARCHIVE);
+ //packageResource.setSize(fileStatus.getLen());
+ //packageResource.setTimestamp(fileStatus.getModificationTime());
+ packageResource.setSize(Long.parseLong(System.getenv("HAMAYARNJARSIZE")));
+ packageResource.setTimestamp(Long.parseLong(System.getenv("HAMAYARNJARTIMESTAMP")));
+ packageResource.setType(LocalResourceType.FILE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
- LOG.info("Package resource: " + packageResource.getResource());
- ctx.setLocalResources(Collections.singletonMap("package", packageResource));
+ LocalResource hamaResource = Records.newRecord(LocalResource.class);
+ Path hamaCoreJar = new Path(System.getenv("HAMACOREJARLOCATION"));
+ URL hamaCoreUrl = ConverterUtils.getYarnUrlFromPath(hamaCoreJar
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+ FileStatus HamaCoreStatus = fs.getFileStatus(hamaCoreJar);
+ hamaResource.setResource(hamaCoreUrl);
+ hamaResource.setSize(HamaCoreStatus.getLen());
+ hamaResource.setTimestamp(HamaCoreStatus.getModificationTime());
+ hamaResource.setType(LocalResourceType.ARCHIVE);
+ hamaResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ localResources.put("HamaCore.jar", hamaResource);
+ localResources.put("AppMaster.jar", packageResource);
+ LOG.info("LocalResources" + localResources);
+
+ //ctx.setLocalResources(Collections.singletonMap("package", packageResource));
+ ctx.setLocalResources(localResources);
+
/*
* TODO Package classpath seems not to work if you're in pseudo distributed
* mode, because the resource must not be moved, it will never be unpacked.
@@ -161,32 +229,97 @@
* So we will check if our jar file has the file:// prefix and put it into
* the CP directly
*/
- String cp = "$CLASSPATH:./*:./package/*:./*:";
+ String cp = "$CLASSPATH:./*:./package/*:./*:./AppMaster.jar:./HamaCore.jar";
if (packageUrl.getScheme() != null && packageUrl.getScheme().equals("file")) {
cp += packageFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
.toString() + ":";
LOG.info("Localized file scheme detected, adjusting CP to: " + cp);
}
- String[] cmds = {
- "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" "
- + BSPRunner.class.getCanonicalName(),
- jobId.getJtIdentifier(),
- id + "",
- this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString(),
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
- " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" };
- ctx.setCommands(Arrays.asList(cmds));
- LOG.info("Starting command: " + Arrays.toString(cmds));
+ String containerId = allocatedContainer.getId().toString();
+ String applicationId = allocatedContainer.getId().getApplicationAttemptId()
+ .getApplicationId().toString();
+
+ String containerHome = conf.get("yarn.nodemanager.local-dirs")
+ + File.separator + ContainerLocalizer.USERCACHE
+ + File.separator
+ + System.getenv().get(ApplicationConstants.Environment.USER.toString())
+ + File.separator + ContainerLocalizer.APPCACHE
+ + File.separator + applicationId + File.separator
+ + containerId;
+ cp = "$CLASSPATH:./*:" + containerHome + "/*";
+
+ StringBuilder classPathEnv = new StringBuilder(
+ ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+ .append("./*");
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+
+ // Pls remove this.
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(containerHome + "/*");
+ //classPathEnv.append("/opt/hama-0.6.4/hama-core-0.7.0-SNAPSHOT.jar");
+ //classPathEnv
+// String[] cmds = {
+// "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" "
+// + BSPRunner.class.getCanonicalName(),
+// jobId.getJtIdentifier(),
+// id + "",
+// this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+// .toString(),
+// " 1>/var/log/hadoop-yarn/yarn/hama-yarncontainer.stdout",
+// " 2>/var/log/hadoop-yarn/yarn/hama-yarncontainer.stderr" };
+// //" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+// //" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" };
+ Vector vargs = new Vector();
+ vargs.add("${JAVA_HOME}/bin/java");
+ vargs.add("-cp " + classPathEnv + "");
+ //vargs.add("-cp " + cp + "");
+ vargs.add(BSPRunner.class.getCanonicalName());
+
+ vargs.add(jobId.getJtIdentifier());
+ vargs.add(Integer.toString(id));
+ vargs.add(this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+ .toString());
+
+ vargs.add("1>/var/log/hadoop/yarn/hama-yarncontainer.stdout");
+ vargs.add("2>/var/log/hadoop/yarn/hama-yarncontainer.stderr");
+
+ LOG.info("ARG3 : " + this.jobFile);
+ LOG.info("fs.getworking directory : " + fs.getWorkingDirectory());
+ LOG.info("fs.getURI : " + fs.getUri());
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ List commands = new ArrayList();
+ commands.add(command.toString());
+
+ ctx.setCommands(commands);
+ LOG.info("Starting command: " + commands);
+
StartContainerRequest startReq = Records
.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
- cm.startContainer(startReq);
+ startReq.setContainerToken(allocatedContainer.getContainerToken());
- GetContainerStatusRequest statusReq = Records
- .newRecord(GetContainerStatusRequest.class);
- statusReq.setContainerId(allocatedContainer.getId());
+ List list = new ArrayList();
+ list.add(startReq);
+ StartContainersRequest requestList = StartContainersRequest.newInstance(list);
+ cm.startContainers(requestList);
+
+ GetContainerStatusesRequest statusReq = Records
+ .newRecord(GetContainerStatusesRequest.class);
+ List containerIds = new ArrayList();
+ containerIds.add(allocatedContainer.getId());
+ statusReq.setContainerIds(containerIds);
return statusReq;
}
Index: yarn/src/main/java/org/apache/hama/bsp/Job.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/Job.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/Job.java (working copy)
@@ -17,8 +17,10 @@
*/
package org.apache.hama.bsp;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import java.io.IOException;
+
/**
* Main interface to interact with the job. Provides only getters.
*/
@@ -34,7 +36,7 @@
public JobState startJob() throws Exception;
- public void cleanup() throws YarnRemoteException;
+ public void cleanup() throws YarnException, IOException;
JobState getState();
Index: yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (working copy)
@@ -17,34 +17,43 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+//import org.apache.hadoop.security.token.Token;
+//import org.apache.hadoop.security.token.TokenIdentifier;
+//import org.apache.hadoop.security.token.*;
+//import org.apache.hadoop.security.token.Token;
+//import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
@@ -66,7 +75,7 @@
private ApplicationAttemptId appAttemptId;
private YarnRPC yarnRPC;
- private AMRMProtocol resourceManager;
+ private ApplicationMasterProtocol resourceManager;
private List allocatedContainers;
private List releasedContainers = Collections.emptyList();
@@ -76,9 +85,19 @@
private int lastResponseID = 0;
+ private int getMemoryRequirements() {
+ String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+ if (newMemoryProperty == null) {
+ LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
+ return getMemoryFromOptString(childOpts);
+ } else {
+ return Integer.valueOf(newMemoryProperty);
+ }
+ }
+
public JobImpl(ApplicationAttemptId appAttemptId,
- Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
- String jobFile, BSPJobID jobId) {
+ Configuration jobConfiguration, YarnRPC yarnRPC, ApplicationMasterProtocol amrmRPC,
+ String jobFile, BSPJobID jobId) {
super();
this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1);
this.appAttemptId = appAttemptId;
@@ -91,32 +110,30 @@
this.childOpts = conf.get("bsp.child.java.opts");
this.taskMemoryInMb = getMemoryRequirements();
+
LOG.info("Memory per task: " + taskMemoryInMb + "m!");
}
- private int getMemoryRequirements() {
- String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
- if (newMemoryProperty == null) {
- LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
- return getMemoryFromOptString(childOpts);
- } else {
- return Integer.valueOf(newMemoryProperty);
+ // This really needs a testcase
+ private static int getMemoryFromOptString(String opts) {
+ if (opts == null) {
+ return DEFAULT_MEMORY_MB;
}
- }
- // This really needs a testcase
- private static int getMemoryFromOptString(String opts) {
if (!opts.contains("-Xmx")) {
LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
return DEFAULT_MEMORY_MB;
} else {
// e.G: -Xmx512m
+
int startIndex = opts.indexOf("-Xmx") + 4;
- int endIndex = opts.indexOf(" ", startIndex);
- String xmxString = opts.substring(startIndex, endIndex);
+ //int endIndex = opts.indexOf(" ", startIndex);
+ //LOG.info("start & end index : " + startIndex + ":" + endIndex);
+ //String xmxString = opts.substring(startIndex, endIndex);
+ String xmxString = opts.substring(startIndex);
char qualifier = xmxString.charAt(xmxString.length() - 1);
int memory = Integer.valueOf(xmxString.substring(0,
- xmxString.length() - 2));
+ xmxString.length() - 1));
if (qualifier == 'm') {
return memory;
} else if (qualifier == 'g') {
@@ -133,28 +150,39 @@
public JobState startJob() throws Exception {
this.allocatedContainers = new ArrayList(numBSPTasks);
+ List nmTokenList = null;
+ LOG.info("allocatedContainer size : " + allocatedContainers.size()
+ + " & numBSPTasks : " + numBSPTasks);
while (allocatedContainers.size() < numBSPTasks) {
- AllocateRequest req = BuilderUtils.newAllocateRequest(
- appAttemptId,
- lastResponseID,
- 0.0f,
- createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
- taskMemoryInMb, priority), releasedContainers);
+// AllocateRequest req = BuilderUtils.newAllocateRequest(
+// appAttemptId,
+// lastResponseID,
+// 0.0f,
+// createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
+// taskMemoryInMb, priority), releasedContainers);
+ AllocateRequest req = AllocateRequest.newInstance(lastResponseID, 0.0f,
+ createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb,
+ priority), releasedContainers, null);
+
AllocateResponse allocateResponse = resourceManager.allocate(req);
- AMResponse amResponse = allocateResponse.getAMResponse();
- LOG.info("Got response! ID: " + amResponse.getResponseId()
+ LOG.info("Got response! ID: " + allocateResponse.getResponseId()
+ " with num of containers: "
- + amResponse.getAllocatedContainers().size()
+ + allocateResponse.getAllocatedContainers().size()
+ " and following resources: "
- + amResponse.getAvailableResources().getMemory() + "mb");
- this.lastResponseID = amResponse.getResponseId();
+ + allocateResponse.getAvailableResources().getMemory() + "mb");
+ this.lastResponseID = allocateResponse.getResponseId();
// availableResources = amResponse.getAvailableResources();
- this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
- LOG.info("Waiting to allocate "
- + (numBSPTasks - allocatedContainers.size()) + " more containers...");
+
+ this.allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
+ LOG.info("allocatedContainer.size : " + allocatedContainers.size() + " numBSPTasks : " + numBSPTasks);
+
+ LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers...");
+
+ nmTokenList = allocateResponse.getNMTokens();
+
Thread.sleep(1000l);
}
@@ -166,20 +194,42 @@
+ allocatedContainer.getId() + ", containerNode="
+ allocatedContainer.getNodeId().getHost() + ":"
+ allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
- + allocatedContainer.getNodeHttpAddress() + ", containerState"
- + allocatedContainer.getState() + ", containerResourceMemory"
+ + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
// Connect to ContainerManager on the allocated container
- String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":"
- + allocatedContainer.getNodeId().getPort();
- InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
- ContainerManager cm = (ContainerManager) yarnRPC.getProxy(
- ContainerManager.class, cmAddress, conf);
+ String user = conf.get("bsp.user.name");
+ if (user == null) {
+ user = System.getenv(ApplicationConstants.Environment.USER.name());
+ }
+ LOG.info("AppAttemptId : " + appAttemptId + ", AllocatedContainer NodeId : "
+ + allocatedContainer.getNodeId() + ", User : " + user);
+ //NMTokenSecretManagerInRM secretMgr = NMTokenSecretManagerInRM.newInstance('')
+ //LOG.info("secretManager : " + secretMgr.getCurrentKey());
+ Token nmToken = null;
+ for (NMToken token : nmTokenList) {
+ nmToken = token.getToken();
+ }
+ ContainerManagementProtocol cm = null;
+
+ try {
+ cm = getContainerManagementProtocolProxy(yarnRPC,
+ nmToken, allocatedContainer.getNodeId(), user);
+ } catch (Exception e) {
+ LOG.error("failed to create cm!!!");
+ if (cm != null)
+ yarnRPC.stopProxy(cm, conf);
+ e.printStackTrace();
+ }
+
+ LOG.info("ContainerId" + allocatedContainer.getId());
+
BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
allocatedContainer, cm, conf, jobFile, jobId);
+ LOG.info("allocate container : " + allocatedContainer.getId());
+
launchers.put(id, runnableLaunchContainer);
runnableLaunchContainer.start();
completionQueue.add(runnableLaunchContainer);
@@ -186,6 +236,7 @@
id++;
}
LOG.info("Waiting for tasks to finish...");
+ LOG.info(completionQueue.size());
state = JobState.RUNNING;
int completed = 0;
while (completed != numBSPTasks) {
@@ -214,14 +265,50 @@
return state;
}
+ /*
+ private org.apache.hadoop.security.token.Token extends NMTokenIdentifier> setupAndReturnAMRMToken(
+ InetSocketAddress rmBindAddress,
+ Collection> allTokens) {
+ for (org.apache.hadoop.security.token.Token extends TokenIdentifier> token : allTokens) {
+ if (token.getKind().equals(NMTokenIdentifier.KIND)) {
+ SecurityUtil.setTokenService(token, rmBindAddress);
+ return (org.apache.hadoop.security.token.Token) token;
+ }
+ }
+ return null;
+ }*/
+
+ protected ContainerManagementProtocol getContainerManagementProtocolProxy(
+ final YarnRPC rpc, Token nmToken, NodeId nodeId, String user) {
+ ContainerManagementProtocol proxy;
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+ final InetSocketAddress addr =
+ NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort());
+ if (nmToken != null) {
+ ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
+ }
+
+ proxy = ugi
+ .doAs(new PrivilegedAction() {
+ @Override
+ public ContainerManagementProtocol run() {
+ LOG.info("ContainerManagementProtocol running!!!");
+ return (ContainerManagementProtocol) rpc.getProxy(
+ ContainerManagementProtocol.class,
+ addr, conf);
+ }
+ });
+ return proxy;
+ }
+
/**
* Makes a lookup for the taskid and stops its container and task. It also
* removes the task from the launcher so that we won't have to stop it twice.
*
* @param id
- * @throws YarnRemoteException
+ * @throws YarnException
*/
- private void cleanupTask(int id) throws YarnRemoteException {
+ private void cleanupTask(int id) throws YarnException, IOException {
BSPTaskLauncher bspTaskLauncher = launchers.get(id);
bspTaskLauncher.stopAndCleanup();
launchers.remove(id);
@@ -229,7 +316,7 @@
}
@Override
- public void cleanup() throws YarnRemoteException {
+ public void cleanup() throws YarnException, IOException {
for (BSPTaskLauncher launcher : completionQueue) {
launcher.stopAndCleanup();
}
@@ -247,7 +334,7 @@
// whether a particular rack/host is needed
// useful for applications that are sensitive
// to data locality
- rsrcRequest.setHostName("*");
+ rsrcRequest.setResourceName("*");
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (working copy)
@@ -20,11 +20,12 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import com.sun.tools.internal.jxc.apt.Const;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -31,9 +32,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
public class YARNBSPJob extends BSPJob {
@@ -46,20 +48,22 @@
private BSPClient client;
private boolean submitted;
private ApplicationReport report;
- private ClientRMProtocol applicationsManager;
+ private ApplicationClientProtocol applicationsManager;
private YarnRPC rpc;
public YARNBSPJob(HamaConfiguration conf) throws IOException {
- super(conf);
submitClient = new YARNBSPJobClient(conf);
YarnConfiguration yarnConf = new YarnConfiguration(conf);
this.rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
+ LOG.info("zookeeper info : " + conf.get("hama.zookeeper.quorum"));
+ LOG.info("zookeeper info : " + this.getConfiguration().get("hama.zookeeper.quorum"));
LOG.info("Connecting to ResourceManager at " + rmAddress);
- this.applicationsManager = ((ClientRMProtocol) rpc.getProxy(
- ClientRMProtocol.class, rmAddress, conf));
+
+ this.applicationsManager = ((ApplicationClientProtocol) rpc.getProxy(
+ ApplicationClientProtocol.class, rmAddress, conf));
}
public void setMemoryUsedPerTaskInMb(int mem) {
@@ -66,7 +70,7 @@
conf.setInt("bsp.child.mem.in.mb", mem);
}
- public void kill() throws YarnRemoteException {
+ public void kill() throws YarnException, IOException {
if (submitClient != null) {
KillApplicationRequest killRequest = Records
.newRecord(KillApplicationRequest.class);
@@ -90,50 +94,50 @@
InterruptedException, ClassNotFoundException {
LOG.info("Starting job...");
+ LOG.info("InputFormatPath : " + this.getConfiguration().get(Constants.INPUT_FORMAT_CLASS));
if (!submitted) {
this.submit();
}
- client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID,
- NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf);
- GetApplicationReportRequest reportRequest = Records
- .newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(submitClient.getId());
+ client = RPC.waitForProxy(BSPClient.class, BSPClient.versionID, NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf);
+ GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
+ reportRequest.setApplicationId(submitClient.getId());
- GetApplicationReportResponse reportResponse = applicationsManager
- .getApplicationReport(reportRequest);
- ApplicationReport localReport = reportResponse.getApplicationReport();
- long clientSuperStep = -1L;
- while (localReport.getFinalApplicationStatus() != null
- && localReport.getFinalApplicationStatus().ordinal() == 0) {
- LOG.debug("currently in state: "
- + localReport.getFinalApplicationStatus());
- if (verbose) {
- long remoteSuperStep = client.getCurrentSuperStep().get();
- if (clientSuperStep < remoteSuperStep) {
- clientSuperStep = remoteSuperStep;
- LOG.info("Current supersteps number: " + clientSuperStep);
+ try {
+ GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest);
+ ApplicationReport localReport = reportResponse.getApplicationReport();
+ long clientSuperStep = -1L;
+ while (localReport.getFinalApplicationStatus() != null && localReport.getFinalApplicationStatus().ordinal() == 0) {
+ LOG.debug("currently in state: " + localReport.getFinalApplicationStatus());
+ if (verbose) {
+ long remoteSuperStep = client.getCurrentSuperStep().get();
+ if (clientSuperStep < remoteSuperStep) {
+ clientSuperStep = remoteSuperStep;
+ LOG.info("Current supersteps number: " + clientSuperStep);
+ }
}
+ reportResponse = applicationsManager.getApplicationReport(reportRequest);
+ localReport = reportResponse.getApplicationReport();
+
+ Thread.sleep(3000L);
}
- reportResponse = applicationsManager.getApplicationReport(reportRequest);
- localReport = reportResponse.getApplicationReport();
- Thread.sleep(3000L);
- }
- if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
- LOG.info("Job succeeded!");
- return true;
- } else {
- LOG.info("Job failed with status: "
- + localReport.getFinalApplicationStatus().toString() + "!");
+ if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
+ LOG.info("Job succeeded!");
+ return true;
+ } else {
+ LOG.info("Job failed with status: " + localReport.getFinalApplicationStatus().toString() + "!");
+ return false;
+ }
+ } catch (YarnException e) {
+ e.printStackTrace();
return false;
}
-
}
- ClientRMProtocol getApplicationsManager() {
+ ApplicationClientProtocol getApplicationsManager() {
return applicationsManager;
}
Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy)
@@ -17,9 +17,12 @@
*/
package org.apache.hama.bsp;
+import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -27,6 +30,10 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@@ -33,16 +40,13 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
public class YARNBSPJobClient extends BSPJobClient {
@@ -61,7 +65,21 @@
Path submitJobFile, FileSystem pFs) throws IOException {
YARNBSPJob job = (YARNBSPJob) normalJob;
+ YarnConfiguration yarnConf = new YarnConfiguration(job.getConfiguration());
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(yarnConf);
+ yarnClient.start();
+ HamaConfiguration testConf = new HamaConfiguration();
+ testConf.addResource(submitJobFile);
+
+ LOG.info("job configuration : " + testConf);
+ LOG.info("job InputFileFormat : " + testConf.get(Constants.INPUT_FORMAT_CLASS));
+
+ LOG.info("job configuration : " + job.getConfiguration());
+ LOG.info("job InputFileFormat : " + job.getInputFormat());
+ LOG.info("job Name : " + job.getJobName());
+
LOG.info("Submitting job...");
if (getConf().get("bsp.child.mem.in.mb") == null) {
LOG.warn("BSP Child memory has not been set, YARN will guess your needs or use default values.");
@@ -71,6 +89,8 @@
fs = FileSystem.get(getConf());
}
+ fs.copyToLocalFile(false, submitJobFile, new Path("/root"));
+
if (getConf().get("bsp.user.name") == null) {
String s = getUnixUserName();
getConf().set("bsp.user.name", s);
@@ -77,122 +97,252 @@
LOG.debug("Retrieved username: " + s);
}
- GetNewApplicationRequest request = Records
- .newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse response = job.getApplicationsManager()
- .getNewApplication(request);
- id = response.getApplicationId();
- LOG.debug("Got new ApplicationId=" + id);
+ try {
+ List listAclInfo = yarnClient.getQueueAclsInfo();
+ for (QueueUserACLInfo aclInfo : listAclInfo) {
+ for (QueueACL userAcl : aclInfo.getUserAcls()) {
+ LOG.info("User ACL Info for Queue"
+ + ", queueName=" + aclInfo.getQueueName()
+ + ", userAcl=" + userAcl.name());
+ }
+ }
+ GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
- // Create a new ApplicationSubmissionContext
- ApplicationSubmissionContext appContext = Records
- .newRecord(ApplicationSubmissionContext.class);
- // set the ApplicationId
- appContext.setApplicationId(this.id);
- // set the application name
- appContext.setApplicationName(job.getJobName());
+ GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request);
+ id = response.getApplicationId();
+ LOG.debug("Got new ApplicationId=" + id);
- // Create a new container launch context for the AM's container
- ContainerLaunchContext amContainer = Records
- .newRecord(ContainerLaunchContext.class);
+ // Create a new ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+ // set the ApplicationId
+ appContext.setApplicationId(this.id);
+ // set the application name
+ appContext.setApplicationName(job.getJobName());
- // Define the local resources required
- Map localResources = new HashMap();
- // Lets assume the jar we need for our ApplicationMaster is available in
- // HDFS at a certain known path to us and we want to make it available to
- // the ApplicationMaster in the launched container
- if (job.getJar() == null) {
- throw new IllegalArgumentException(
- "Jar must be set in order to run the application!");
- }
- Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar");
- fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath);
- LOG.debug("Copying app jar to " + jarPath);
- getConf()
- .set(
- "bsp.jar",
- jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString());
- FileStatus jarStatus = fs.getFileStatus(jarPath);
- LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
- amJarRsrc.setType(LocalResourceType.FILE);
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
- amJarRsrc.setTimestamp(jarStatus.getModificationTime());
- amJarRsrc.setSize(jarStatus.getLen());
- // this creates a symlink in the working directory
- localResources.put("AppMaster.jar", amJarRsrc);
- // Set the local resources into the launch context
- amContainer.setLocalResources(localResources);
+ // Create a new container launch context for the AM's container
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
- // Set up the environment needed for the launch context
- Map env = new HashMap();
- // Assuming our classes or jars are available as local resources in the
- // working directory from which the command will be run, we need to append
- // "." to the path.
- // By default, all the hadoop specific classpaths will already be available
- // in $CLASSPATH, so we should be careful not to overwrite it.
- String classPathEnv = "$CLASSPATH:./*:";
- env.put("CLASSPATH", classPathEnv);
- amContainer.setEnvironment(env);
+ // Define the local resources required
+ Map localResources = new HashMap();
+ // Lets assume the jar we need for our ApplicationMaster is available in
+ // HDFS at a certain known path to us and we want to make it available to
+ // the ApplicationMaster in the launched container
+ if (job.getJar() == null) {
+ throw new IllegalArgumentException("Jar must be set in order to run the application!");
+ }
+
+ //Path jarPath = new Path(fs.getHomeDirectory(), "hama-yarn.jar");
+ //LOG.info(fs.getHomeDirectory());
+ //LOG.info(job.getLocalPath("hama-yarn-0.7.0-SNAPSHOT.jar"));
- // Construct the command to be executed on the launched container
- String command = "${JAVA_HOME}"
- + "/bin/java -cp "
- + classPathEnv
- + " "
- + BSPApplicationMaster.class.getCanonicalName()
- + " "
- + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + "/stderr";
+ Path jarPath = new Path(job.getJar());
+ LOG.info("DST : " + jarPath);
+ jarPath = fs.makeQualified(jarPath);
+
+ LOG.debug("Copying app jar to " + jarPath);
+
+ //fs.copyFromLocalFile(false, true, job.getLocalPath(job.getJar()), jarPath);
+ LOG.info("DST : " + jarPath);
+ // LOG.debug("Copying app jar to " + jarPath);
+ //getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+ getConf().set("bsp.jar", jarPath.makeQualified(fs.getUri(), jarPath).toString());
- LOG.debug("Start command: " + command);
+ FileStatus jarStatus = fs.getFileStatus(jarPath);
+ LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+ amJarRsrc.setType(LocalResourceType.FILE);
+ amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+ amJarRsrc.setTimestamp(jarStatus.getModificationTime());
+ LOG.info("hama-yarn jar timestamp : " + jarStatus.getModificationTime());
+ amJarRsrc.setSize(jarStatus.getLen());
+ // this creates a symlink in the working directory
+ localResources.put("AppMaster.jar", amJarRsrc);
+ //localResources.put("package", amJarRsrc);
- amContainer.setCommands(Collections.singletonList(command));
+ String hamaYarnLocation = jarPath.toUri().toString();
+ FileStatus hamaYarnFileStatus = fs.getFileStatus(jarPath);
+ long hamaYarnLen = hamaYarnFileStatus.getLen();
+ long hamaYarnTimestamp = hamaYarnFileStatus.getModificationTime();
- Resource capability = Records.newRecord(Resource.class);
- // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
- // a base usage of 100mb
- capability.setMemory(3 * job.getNumBspTask()
- + getConf().getInt("hama.appmaster.memory.mb", 100));
- LOG.info("Set memory for the application master to "
- + capability.getMemory() + "mb!");
- amContainer.setResource(capability);
+ Path hamaCoreJarPath = new Path(fs.getHomeDirectory(), "hama-core.jar");
+ fs.copyFromLocalFile(false, true,
+ new Path("/opt/hama-0.6.4/hama-core-0.7.0-SNAPSHOT.jar"), hamaCoreJarPath);
+ LOG.info("DST : " + hamaCoreJarPath);
- // Set the container launch content into the ApplicationSubmissionContext
- appContext.setAMContainerSpec(amContainer);
+ FileStatus hamaCoreJarStatus = fs.getFileStatus(hamaCoreJarPath);
+ LocalResource hamaCoreJarRsrc = Records.newRecord(LocalResource.class);
+ hamaCoreJarRsrc.setType(LocalResourceType.FILE);
+ hamaCoreJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ hamaCoreJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(hamaCoreJarPath));
+ hamaCoreJarRsrc.setTimestamp(hamaCoreJarStatus.getModificationTime());
+ hamaCoreJarRsrc.setSize(hamaCoreJarStatus.getLen());
+ localResources.put("HamaCore.jar", hamaCoreJarRsrc);
- // Create the request to send to the ApplicationsManager
- SubmitApplicationRequest appRequest = Records
- .newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
- job.getApplicationsManager().submitApplication(appRequest);
+ // Set the local resources into the launch context
+ amContainer.setLocalResources(localResources);
- GetApplicationReportRequest reportRequest = Records
- .newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(id);
- while (report == null || report.getHost().equals("N/A")) {
- GetApplicationReportResponse reportResponse = job
- .getApplicationsManager().getApplicationReport(reportRequest);
- report = reportResponse.getApplicationReport();
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- LOG.error(
- "Got interrupted while waiting for a response report from AM.", e);
+ String hamaCoreLocation = hamaCoreJarPath.toUri().toString();
+ FileStatus hamaCoreFileStatus = fs.getFileStatus(hamaCoreJarPath);
+ long hamaCoreLen = hamaCoreFileStatus.getLen();
+ long hamaCoreTimestamp = hamaCoreFileStatus.getModificationTime();
+
+ // Set up the environment needed for the launch context
+ Map env = new HashMap();
+ // Assuming our classes or jars are available as local resources in the
+ // working directory from which the command will be run, we need to append
+ // "." to the path.
+ // By default, all the hadoop specific classpaths will already be available
+ // in $CLASSPATH, so we should be careful not to overwrite it.
+ StringBuilder classPathEnv = new StringBuilder(
+ ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+ .append("./*");
+ for (String c : yarnConf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
}
+
+ env.put("HAMAYARNJARLOCATION", hamaYarnLocation);
+ env.put("HAMAYARNJARSIZE", Long.toString(hamaYarnLen));
+ env.put("HAMAYARNJARTIMESTAMP", Long.toString(hamaYarnTimestamp));
+ LOG.info("hamayarn>>>>>>>>>>" + Long.toString(hamaYarnTimestamp));
+
+ env.put("HAMACOREJARLOCATION", hamaCoreLocation);
+ env.put("HAMACOREJARSIZE", Long.toString(hamaCoreLen));
+ env.put("HAMACOREJARTIMESTAMP", Long.toString(hamaCoreTimestamp));
+ LOG.info("hamacore>>>>>>>>>>" + Long.toString(hamaCoreTimestamp));
+
+ env.put("CLASSPATH", classPathEnv.toString());
+ amContainer.setEnvironment(env);
+
+ // Construct the command to be executed on the launched container
+ String command = "${JAVA_HOME}/bin/java -classpath " + classPathEnv + " "
+ + BSPApplicationMaster.class.getCanonicalName() + " "
+ + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+ //+ jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()
+ + " 1>/var/log/hadoop/yarn/hama-yarn.stdout"
+ + " 2>/var/log/hadoop/yarn/hama-yarn.stderr";
+
+ LOG.info("Start command: " + command);
+
+ amContainer.setCommands(Collections.singletonList(command));
+
+ Resource capability = Records.newRecord(Resource.class);
+ // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
+ // a base usage of 100mb
+ capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100));
+ LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!");
+ //amContainer.setResource(capability);
+
+ // Set the container launch content into the ApplicationSubmissionContext
+ appContext.setResource(capability);
+
+
+ // Setup security tokens
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
+ LOG.info("SecurityEnable!!!");
+ Credentials credentials = new Credentials();
+ String tokenRenewer = yarnConf.get(YarnConfiguration.RM_PRINCIPAL);
+ if (tokenRenewer == null || tokenRenewer.length() == 0) {
+ throw new IOException(
+ "Can't get Master Kerberos principal for the RM to use as renewer");
+ }
+
+ // For now, only getting tokens for the default file-system.
+ final Token> tokens[] =
+ fs.addDelegationTokens(tokenRenewer, credentials);
+ if (tokens != null) {
+ for (Token> token : tokens) {
+ LOG.info("Got dt for " + fs.getUri() + "; " + token);
+ }
+ }
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ amContainer.setTokens(fsTokens);
+ }
+
+ appContext.setAMContainerSpec(amContainer);
+
+ // Create the request to send to the ApplicationsManager
+// SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
+// appRequest.setApplicationSubmissionContext(appContext);
+ //job.getApplicationsManager().submitApplication(appRequest);
+ ApplicationId appId = appContext.getApplicationId();
+ LOG.info("Submitting application " + appId);
+ yarnClient.submitApplication(appContext);
+
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.debug("Thread sleep in monitoring loop interrupted");
+ }
+
+ // Get application report for the appId we are interested in
+ ApplicationReport report = yarnClient.getApplicationReport(appId);
+
+ LOG.info("Got application report from ASM for" + ", appId="
+ + appId.getId() + ", clientToAMToken="
+ + report.getClientToAMToken() + ", appDiagnostics="
+ + report.getDiagnostics() + ", appMasterHost="
+ + report.getHost() + ", appQueue=" + report.getQueue()
+ + ", appMasterRpcPort=" + report.getRpcPort()
+ + ", appStartTime=" + report.getStartTime()
+ + ", yarnAppState="
+ + report.getYarnApplicationState().toString()
+ + ", distributedFinalState="
+ + report.getFinalApplicationStatus().toString()
+ + ", appTrackingUrl=" + report.getTrackingUrl()
+ + ", appUser=" + report.getUser());
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ FinalApplicationStatus dsStatus = report
+ .getFinalApplicationStatus();
+ if (YarnApplicationState.FINISHED == state) {
+ if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+ LOG.info("Application has completed successfully. Breaking monitoring loop");
+ return new NetworkedJob();
+ } else {
+ LOG.info("Application did finished unsuccessfully."
+ + " YarnState=" + state.toString()
+ + ", DSFinalStatus=" + dsStatus.toString()
+ + ". Breaking monitoring loop");
+ return null;
+ }
+ } else if (YarnApplicationState.KILLED == state
+ || YarnApplicationState.FAILED == state) {
+ LOG.info("Application did not finish." + " YarnState="
+ + state.toString() + ", DSFinalStatus="
+ + dsStatus.toString() + ". Breaking monitoring loop");
+ return null;
+ }
+ }
+// GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
+// reportRequest.setApplicationId(id);
+// while (report == null || report.getHost().equals("N/A")) {
+// GetApplicationReportResponse reportResponse = job.getApplicationsManager().getApplicationReport(reportRequest);
+// report = reportResponse.getApplicationReport();
+// try {
+// Thread.sleep(1000L);
+// } catch (InterruptedException e) {
+// LOG.error("Got interrupted while waiting for a response report from AM.", e);
+// }
+// }
+// LOG.info("Got report: " + report.getApplicationId() + " " + report.getHost() + ":" + report.getRpcPort());
+ //return new NetworkedJob();
+ } catch (YarnException e) {
+ e.printStackTrace();
+ return null;
}
- LOG.info("Got report: " + report.getApplicationId() + " "
- + report.getHost() + ":" + report.getRpcPort());
- return new NetworkedJob();
}
- @Override
- protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
- return Math.max(1, limitTasks);
- }
+// @Override
+// protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
+// return Math.max(1, limitTasks);
+// }
@Override
public Path getSystemDir() {
Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
===================================================================
--- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1659147)
+++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy)
@@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
@@ -57,15 +58,25 @@
InterruptedException, ClassNotFoundException {
HamaConfiguration conf = new HamaConfiguration();
// TODO some keys that should be within a conf
- conf.set("yarn.resourcemanager.address", "0.0.0.0:8040");
- conf.set("bsp.local.dir", "/tmp/bsp-yarn/");
+ //conf.set("yarn.resourcemanager.address", "slave1.hama.com:8050");
+ conf.set("yarn.resourcemanager.address", "localhost:8032");
+ conf.set("bsp.user.name", "root");
+ conf.set("bsp.groom.report.address", "127.0.0.1:50001");
+ conf.set("hama.zookeeper.quorum", "localhost");
+ conf.set("hama.zookeeper.property.clientPort", "2181");
+ conf.setInt(Constants.MAX_TASKS, 10);
+
YARNBSPJob job = new YARNBSPJob(conf);
job.setBspClass(HelloBSP.class);
job.setJarByClass(HelloBSP.class);
- job.setJobName("Serialize Printing");
+ job.setJobName("Test");
+ job.setInputFormat(NullInputFormat.class);
+ job.setOutputFormat(NullOutputFormat.class);
job.setMemoryUsedPerTaskInMb(50);
job.setNumBspTask(2);
+ System.out.println("job conf ; " + job.getConfiguration().get("hama.zookeeper.quorum"));
+ System.out.println("job conf ; " + conf.get("hama.zookeeper.quorum"));
job.waitForCompletion(true);
}
}