diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
index 8503f5e..f7a9d17 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
@@ -19,9 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.launcher;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -35,7 +33,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -44,21 +41,15 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnRuntimeException;
-import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.client.ContainerManagerProxy;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -73,22 +64,22 @@
private ConcurrentHashMap containers =
new ConcurrentHashMap();
- private AppContext context;
+ private final AppContext context;
protected ThreadPoolExecutor launcherPool;
protected static final int INITIAL_POOL_SIZE = 10;
private int limitOnPoolSize;
private Thread eventHandlingThread;
protected BlockingQueue eventQueue =
new LinkedBlockingQueue();
- YarnRPC rpc;
private final AtomicBoolean stopped;
+ private ContainerManagerProxy cmProxy;
private Container getContainer(ContainerLauncherEvent event) {
ContainerId id = event.getContainerID();
Container c = containers.get(id);
if(c == null) {
c = new Container(event.getTaskAttemptID(), event.getContainerID(),
- event.getContainerMgrAddress(), event.getContainerToken());
+ event.getContainerMgrAddress());
Container old = containers.putIfAbsent(id, c);
if(old != null) {
c = old;
@@ -114,16 +105,13 @@ private void removeContainerIfDone(ContainerId id) {
private TaskAttemptId taskAttemptID;
private ContainerId containerID;
final private String containerMgrAddress;
- private org.apache.hadoop.yarn.api.records.Token containerToken;
public Container(TaskAttemptId taId, ContainerId containerID,
- String containerMgrAddress,
- org.apache.hadoop.yarn.api.records.Token containerToken) {
+ String containerMgrAddress) {
this.state = ContainerState.PREP;
this.taskAttemptID = taId;
this.containerMgrAddress = containerMgrAddress;
this.containerID = containerID;
- this.containerToken = containerToken;
}
public synchronized boolean isCompletelyDone() {
@@ -140,11 +128,10 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
return;
}
- ContainerManager proxy = null;
+ ContainerManagerProxy.ContainerManagerProxyData proxy = null;
try {
- proxy = getCMProxy(containerID, containerMgrAddress,
- containerToken);
+ proxy = getCMProxy(containerMgrAddress, containerID);
// Construct the actual Container
ContainerLaunchContext containerLaunchContext =
@@ -155,7 +142,8 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
startRequest.setContainerToken(event.getContainerToken());
- StartContainerResponse response = proxy.startContainer(startRequest);
+ StartContainerResponse response =
+ proxy.getContainerManager().startContainer(startRequest);
ByteBuffer portInfo =
response.getAllServiceResponse().get(
@@ -185,7 +173,7 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally {
if (proxy != null) {
- ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+ cmProxy.closeProxy(proxy);
}
}
}
@@ -198,16 +186,15 @@ public synchronized void kill() {
} else if (!isCompletelyDone()) {
LOG.info("KILLING " + taskAttemptID);
- ContainerManager proxy = null;
+ ContainerManagerProxy.ContainerManagerProxyData proxy = null;
try {
- proxy = getCMProxy(this.containerID, this.containerMgrAddress,
- this.containerToken);
+ proxy = getCMProxy(containerMgrAddress, containerID);
// kill the remote container if already launched
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(this.containerID);
- proxy.stopContainer(stopRequest);
+ proxy.getContainerManager().stopContainer(stopRequest);
} catch (Throwable t) {
@@ -220,7 +207,7 @@ public synchronized void kill() {
LOG.warn(message);
} finally {
if (proxy != null) {
- ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+ cmProxy.closeProxy(proxy);
}
}
this.state = ContainerState.DONE;
@@ -239,22 +226,13 @@ public ContainerLauncherImpl(AppContext context) {
}
@Override
- protected void serviceInit(Configuration config) throws Exception {
- Configuration conf = new Configuration(config);
- conf.setInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
- 0);
+ protected void serviceInit(Configuration conf) throws Exception {
this.limitOnPoolSize = conf.getInt(
MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
- this.rpc = createYarnRPC(conf);
super.serviceInit(conf);
}
-
- protected YarnRPC createYarnRPC(Configuration conf) {
- return YarnRPC.create(conf);
- }
protected void serviceStart() throws Exception {
@@ -348,34 +326,6 @@ protected EventProcessor createEventProcessor(ContainerLauncherEvent event) {
return new EventProcessor(event);
}
- protected ContainerManager getCMProxy(ContainerId containerID,
- final String containerManagerBindAddr,
- org.apache.hadoop.yarn.api.records.Token containerToken)
- throws IOException {
-
- final InetSocketAddress cmAddr =
- NetUtils.createSocketAddr(containerManagerBindAddr);
-
- // the user in createRemoteUser in this context has to be ContainerID
- UserGroupInformation user =
- UserGroupInformation.createRemoteUser(containerID.toString());
-
- Token token =
- ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
- user.addToken(token);
-
- ContainerManager proxy = user
- .doAs(new PrivilegedAction() {
- @Override
- public ContainerManager run() {
- return (ContainerManager) rpc.getProxy(ContainerManager.class,
- cmAddr, getConfig());
- }
- });
- return proxy;
- }
-
-
/**
* Setup and start the container on remote nodemanager.
*/
@@ -410,7 +360,7 @@ public void run() {
removeContainerIfDone(containerID);
}
}
-
+
@SuppressWarnings("unchecked")
void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID,
String message) {
@@ -430,4 +380,9 @@ public void handle(ContainerLauncherEvent event) {
throw new YarnRuntimeException(e);
}
}
+
+ public ContainerManagerProxy.ContainerManagerProxyData getCMProxy(
+ String containerMgrBindAddr, ContainerId containerId) throws IOException {
+ return cmProxy.getProxy(containerMgrBindAddr, containerId);
+ }
}
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
index 2eddbbf..05f4f1a 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.mapreduce.v2.app;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
@@ -44,9 +43,8 @@
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ContainerManagerProxy;
import org.junit.Test;
/**
@@ -225,9 +223,8 @@ public void handle(ContainerLauncherEvent event) {
}
@Override
- protected ContainerManager getCMProxy(ContainerId contianerID,
- String containerManagerBindAddr, Token containerToken)
- throws IOException {
+ public ContainerManagerProxy.ContainerManagerProxyData getCMProxy(
+ String containerManagerBindAddr, ContainerId contianerID) {
try {
synchronized (this) {
wait(); // Just hang the thread simulating a very slow NM.
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
index 3089997..87c5eb1 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
@@ -62,12 +62,15 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ContainerManagerProxy;
+import org.apache.hadoop.yarn.client.ContainerManagerProxy.ContainerManagerProxyData;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.junit.Test;
public class TestContainerLauncher {
@@ -342,16 +345,25 @@ public MRAppWithSlowNM() {
}
@Override
- protected ContainerLauncher createContainerLauncher(AppContext context) {
+ protected ContainerLauncher
+ createContainerLauncher(final AppContext context) {
return new ContainerLauncherImpl(context) {
+
@Override
- protected ContainerManager getCMProxy(ContainerId containerID,
- String containerManagerBindAddr, Token containerToken)
+ public ContainerManagerProxyData getCMProxy(
+ String containerMgrBindAddr, ContainerId containerId)
throws IOException {
- // make proxy connect to our local containerManager server
- ContainerManager proxy = (ContainerManager) rpc.getProxy(
- ContainerManager.class,
- NetUtils.getConnectAddress(server), conf);
+ Token dummyToken =
+ Token.newInstance("NMTokenIdentifier".getBytes(),
+ NMTokenIdentifier.KIND.toString(), "password".getBytes(),
+ "NMToken");
+ ContainerManagerProxy cmProxy =
+ new ContainerManagerProxy(conf, context.getNMTokens());
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ ContainerManagerProxyData proxy =
+ cmProxy.new ContainerManagerProxyData(YarnRPC.create(conf),
+ addr.getHostName() + ":" + addr.getPort(), containerId,
+ dummyToken);
return proxy;
}
};
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
index 141e0b0..00cb76f 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
@@ -95,11 +95,6 @@ public ContainerLauncherImplUnderTest(AppContext context, YarnRPC rpc) {
this.rpc = rpc;
}
- @Override
- protected YarnRPC createYarnRPC(Configuration conf) {
- return rpc;
- }
-
public void waitForPoolToIdle() throws InterruptedException {
//I wish that we did not need the sleep, but it is here so that we are sure
// That the other thread had time to insert the event into the queue and
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6853534..8a1689d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -729,6 +729,14 @@
YARN_PREFIX + "client.nodemanager-client-async.thread-pool-max-size";
public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500;
+ /**
+ * Maximum number of proxy connections for node manager. It should always be
+ * more than 1.
+ */
+ public static final String NM_CLIENT_MAX_NM_PROXY_CONNECTIONS =
+ YARN_PREFIX + "client.maximum-number-nm-proxy-connections";
+ public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXY_CONNECTIONS = 10;
+
public YarnConfiguration() {
super();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ContainerManagerProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ContainerManagerProxy.java
new file mode 100644
index 0000000..d7b4046
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ContainerManagerProxy.java
@@ -0,0 +1,209 @@
+package org.apache.hadoop.yarn.client;
+
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.yarn.YarnRuntimeException;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+
+/**
+ * Helper class to manage container manager proxies
+ */
+public class ContainerManagerProxy {
+ static final Log LOG = LogFactory.getLog(ContainerManagerProxy.class);
+
+ private final int maxNumOfProxies;
+ private final LinkedHashMap cmProxy;
+ private Map nmTokens;
+ private final Configuration conf;
+ private final YarnRPC rpc;
+
+ public ContainerManagerProxy(Configuration conf,
+ Map nmTokens) {
+ this.nmTokens = nmTokens;
+ this.conf = conf;
+
+ maxNumOfProxies =
+ conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXY_CONNECTIONS,
+ YarnConfiguration.DEFAULT_NM_CLIENT_MAX_NM_PROXY_CONNECTIONS);
+ if (maxNumOfProxies < 1) {
+ throw new YarnRuntimeException(
+ YarnConfiguration.NM_CLIENT_MAX_NM_PROXY_CONNECTIONS
+ + " (" + maxNumOfProxies + ") can not be less than 1.");
+ }
+ LOG.info(YarnConfiguration.NM_CLIENT_MAX_NM_PROXY_CONNECTIONS + " : "
+ + maxNumOfProxies);
+
+ // Using default capacity and load factor.
+ // We need LRU so setting accessOrder = True
+ cmProxy =
+ new LinkedHashMap(16, 0.75f, true);
+
+ rpc = YarnRPC.create(conf);
+ }
+
+ public synchronized ContainerManagerProxyData getProxy(
+ String containerManagerBindAddr, ContainerId containerId)
+ throws InvalidToken {
+
+ // This get call will update the map which is working as LRU cache.
+ ContainerManagerProxyData proxy = cmProxy.get(containerManagerBindAddr);
+
+ while (proxy != null
+ && !proxy.token.getIdentifier().equals(
+ nmTokens.get(containerManagerBindAddr))) {
+ LOG.info("Refreshing proxy as NMToken got updated for node : "
+ + containerManagerBindAddr);
+ // Token is updated. check if anyone has already tried closing it.
+ if (!proxy.isClosed) {
+ // try closing the proxy. Here if someone is already using it
+ // then we might not close it. In which case we will wait.
+ removeProxy(proxy);
+ } else {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (proxy.count < 0) {
+ // This get call is avoided as it will update LRU.
+ proxy = cmProxy.get(containerManagerBindAddr);
+ }
+ }
+
+ if (proxy == null) {
+ proxy =
+ new ContainerManagerProxyData(rpc, containerManagerBindAddr,
+ containerId, nmTokens.get(containerManagerBindAddr));
+ if (cmProxy.size() > maxNumOfProxies) {
+ // Number of existing proxy exceed the limit.
+ String cmAddr = cmProxy.keySet().iterator().next();
+ removeProxy(cmProxy.get(cmAddr));
+ }
+
+ cmProxy.put(containerManagerBindAddr, proxy);
+ }
+ // This is to track active users of this proxy.
+ proxy.count++;
+
+ return proxy;
+ }
+
+ public synchronized void closeProxy(ContainerManagerProxyData proxy) {
+ proxy.count--;
+ if (proxy.isClosed && proxy.count < 0) {
+ LOG.info("Closing proxy : " + proxy.containerManagerBindAddr);
+ cmProxy.remove(proxy.containerManagerBindAddr);
+ try {
+ rpc.stopProxy(proxy.getContainerManager(), conf);
+ } finally {
+ this.notifyAll();
+ }
+ }
+ }
+
+ private synchronized void removeProxy(ContainerManagerProxyData proxy) {
+ proxy.isClosed = true;
+ closeProxy(proxy);
+ }
+
+ public synchronized void stopAllProxy() {
+ List nodeIds = new ArrayList();
+ nodeIds.addAll(this.cmProxy.keySet());
+ for (String nodeId : nodeIds) {
+ ContainerManagerProxyData proxy = cmProxy.get(nodeId);
+ // Explicitly reducing the proxy count to allow stopping proxy.
+ proxy.count = 0;
+ try {
+ removeProxy(proxy);
+ } catch (Throwable t) {
+ LOG.error("Error closing connection", t);
+ }
+ }
+ cmProxy.clear();
+ }
+
+ public void setNMTokens(Map nmTokens) {
+ this.nmTokens = nmTokens;
+ }
+
+ public class ContainerManagerProxyData {
+ private final String containerManagerBindAddr;
+ private final ContainerManager proxy;
+ private int count;
+ private boolean isClosed;
+ private final Token token;
+
+ @Private
+ @VisibleForTesting
+ public ContainerManagerProxyData(YarnRPC rpc,
+ String containerManagerBindAddr,
+ ContainerId containerId, Token token) throws InvalidToken {
+ this.containerManagerBindAddr = containerManagerBindAddr;
+ ;
+ this.count = 0;
+ this.isClosed = false;
+ this.token = token;
+ this.proxy = newProxy(rpc, containerManagerBindAddr, containerId, token);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected ContainerManager newProxy(final YarnRPC rpc,
+ String containerManagerBindAddr, ContainerId containerId, Token token)
+ throws InvalidToken {
+ if (token == null) {
+ throw new InvalidToken("No NMToken sent for "
+ + containerManagerBindAddr);
+ }
+ final InetSocketAddress cmAddr =
+ NetUtils.createSocketAddr(containerManagerBindAddr);
+ LOG.info("Opening proxy : " + containerManagerBindAddr);
+ // the user in createRemoteUser in this context has to be ContainerID
+ UserGroupInformation user =
+ UserGroupInformation.createRemoteUser(containerId
+ .getApplicationAttemptId().toString());
+
+ org.apache.hadoop.security.token.Token nmToken =
+ ProtoUtils.convertFromProtoFormat(token, cmAddr);
+ user.addToken(nmToken);
+
+ ContainerManager proxy = user
+ .doAs(new PrivilegedAction() {
+
+ @Override
+ public ContainerManager run() {
+ return (ContainerManager) rpc.getProxy(ContainerManager.class,
+ cmAddr, conf);
+ }
+ });
+ return proxy;
+ }
+
+ public ContainerManager getContainerManager() {
+ return proxy;
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
index 0e45aa6..1b57be7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -63,35 +64,44 @@
*
* @param containerId the Id of the started container
* @param nodeId the Id of the NodeManager
- * @param containerToken the security token to verify authenticity of the
- * started container
+ *
* @throws YarnException
* @throws IOException
*/
- void stopContainer(ContainerId containerId, NodeId nodeId,
- Token containerToken) throws YarnException, IOException;
+ void stopContainer(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException;
/**
* Query the status of a container.
*
* @param containerId the Id of the started container
* @param nodeId the Id of the NodeManager
- * @param containerToken the security token to verify authenticity of the
- * started container
+ *
* @return the status of a container
* @throws YarnException
* @throws IOException
*/
- ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
- Token containerToken) throws YarnException, IOException;
+ ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException;
/**
* Set whether the containers that are started by this client, and are
* still running should be stopped when the client stops. By default, the
- * feature should be enabled.
- *
+ * feature should be enabled.
However, containers will be stopped only
+ * when service is stopped. i.e. after {@link NMClient#stop()}.
+ *
* @param enabled whether the feature is enabled or not
*/
void cleanupRunningContainersOnStop(boolean enabled);
+ /**
+ * The map of NMTokens. New NMToken is issued on
+ * {@link AMRMClient#allocate(float)} response. Client is expected to add
+ * this token into the map passed here. This Token is required for
+ * communicating with Node Manager for start / stop / get container status.
+ * key :- Node host address (:)
+ * Value :- Token issued on {@link AMRMClient#allocate(float)} call.
+ * @param nmTokens NMToken map.
+ */
+ void setNMTokenMap(ConcurrentHashMap nmTokens);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
index 653a2dc..e461ed6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
@@ -488,7 +488,7 @@ public ContainerState transition(
ContainerId containerId = event.getContainerId();
try {
container.nmClientAsync.client.stopContainer(
- containerId, event.getNodeId(), event.getContainerToken());
+ containerId, event.getNodeId());
try {
container.nmClientAsync.callbackHandler.onContainerStopped(
event.getContainerId());
@@ -600,7 +600,7 @@ public void run() {
if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
try {
ContainerStatus containerStatus = client.getContainerStatus(
- containerId, event.getNodeId(), event.getContainerToken());
+ containerId, event.getNodeId());
try {
callbackHandler.onContainerStatusReceived(
containerId, containerStatus);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
index b1be764..ab17e1a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
@@ -19,9 +19,7 @@
package org.apache.hadoop.yarn.client;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -29,27 +27,20 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ContainerManagerProxy.ContainerManagerProxyData;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -87,6 +78,7 @@
//enabled by default
protected final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ private ContainerManagerProxy cmProxy;
public NMClientImpl() {
super(NMClientImpl.class.getName());
@@ -103,6 +95,7 @@ protected void serviceStop() throws Exception {
if (cleanupRunningContainers.get()) {
cleanupRunningContainers();
}
+ cmProxy.stopAllProxy();
super.serviceStop();
}
@@ -110,8 +103,7 @@ protected synchronized void cleanupRunningContainers() {
for (StartedContainer startedContainer : startedContainers.values()) {
try {
stopContainer(startedContainer.getContainerId(),
- startedContainer.getNodeId(),
- startedContainer.getContainerToken());
+ startedContainer.getNodeId());
} catch (YarnException e) {
LOG.error("Failed to stop Container " +
startedContainer.getContainerId() +
@@ -125,22 +117,41 @@ protected synchronized void cleanupRunningContainers() {
}
@Override
+ public synchronized void setNMTokenMap(
+ ConcurrentHashMap nmTokens) {
+ cmProxy.setNMTokens(nmTokens);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ cmProxy =
+ new ContainerManagerProxy(conf, new ConcurrentHashMap());
+ }
+
+ @Override
public void cleanupRunningContainersOnStop(boolean enabled) {
cleanupRunningContainers.set(enabled);
}
+ public enum ContainerState {
+ // Initial state
+ INIT,
+ // It is when container start request was successful.
+ STARTED,
+ // Container needs to be stopped
+ STOPPED
+ }
protected static class StartedContainer {
private ContainerId containerId;
private NodeId nodeId;
- private Token containerToken;
- private boolean stopped;
-
+ private ContainerState state;
+
+
public StartedContainer(ContainerId containerId, NodeId nodeId,
Token containerToken) {
this.containerId = containerId;
this.nodeId = nodeId;
- this.containerToken = containerToken;
- stopped = false;
+ state = ContainerState.INIT;
}
public ContainerId getContainerId() {
@@ -151,172 +162,6 @@ public NodeId getNodeId() {
return nodeId;
}
- public Token getContainerToken() {
- return containerToken;
- }
- }
-
- protected static final class NMCommunicator extends AbstractService {
- private ContainerId containerId;
- private NodeId nodeId;
- private Token containerToken;
- private ContainerManager containerManager;
-
- public NMCommunicator(ContainerId containerId, NodeId nodeId,
- Token containerToken) {
- super(NMCommunicator.class.getName());
- this.containerId = containerId;
- this.nodeId = nodeId;
- this.containerToken = containerToken;
- }
-
- @Override
- protected void serviceStart() throws Exception {
- final YarnRPC rpc = YarnRPC.create(getConfig());
-
- final InetSocketAddress containerAddress =
- NetUtils.createSocketAddr(nodeId.toString());
-
- // the user in createRemoteUser in this context has to be ContainerId
- UserGroupInformation currentUser =
- UserGroupInformation.createRemoteUser(containerId.toString());
-
- org.apache.hadoop.security.token.Token token =
- ProtoUtils.convertFromProtoFormat(containerToken, containerAddress);
- currentUser.addToken(token);
-
- containerManager = currentUser
- .doAs(new PrivilegedAction() {
- @Override
- public ContainerManager run() {
- return (ContainerManager) rpc.getProxy(ContainerManager.class,
- containerAddress, getConfig());
- }
- });
-
- LOG.debug("Connecting to ContainerManager at " + containerAddress);
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- if (this.containerManager != null) {
- RPC.stopProxy(this.containerManager);
-
- if (LOG.isDebugEnabled()) {
- InetSocketAddress containerAddress =
- NetUtils.createSocketAddr(nodeId.toString());
- LOG.debug("Disconnecting from ContainerManager at " +
- containerAddress);
- }
- }
- super.serviceStop();
- }
-
- public synchronized Map startContainer(
- Container container, ContainerLaunchContext containerLaunchContext)
- throws YarnException, IOException {
- if (!container.getId().equals(containerId)) {
- throw new IllegalArgumentException(
- "NMCommunicator's containerId mismatches the given Container's");
- }
- StartContainerResponse startResponse = null;
- try {
- StartContainerRequest startRequest =
- Records.newRecord(StartContainerRequest.class);
- startRequest.setContainerToken(container.getContainerToken());
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startResponse = containerManager.startContainer(startRequest);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Started Container " + containerId);
- }
- } catch (YarnException e) {
- LOG.warn("Container " + containerId + " failed to start", e);
- throw e;
- } catch (IOException e) {
- LOG.warn("Container " + containerId + " failed to start", e);
- throw e;
- }
- return startResponse.getAllServiceResponse();
- }
-
- public synchronized void stopContainer() throws YarnException,
- IOException {
- try {
- StopContainerRequest stopRequest =
- Records.newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(containerId);
- containerManager.stopContainer(stopRequest);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopped Container " + containerId);
- }
- } catch (YarnException e) {
- LOG.warn("Container " + containerId + " failed to stop", e);
- throw e;
- } catch (IOException e) {
- LOG.warn("Container " + containerId + " failed to stop", e);
- throw e;
- }
- }
-
- public synchronized ContainerStatus getContainerStatus()
- throws YarnException, IOException {
- GetContainerStatusResponse statusResponse = null;
- try {
- GetContainerStatusRequest statusRequest =
- Records.newRecord(GetContainerStatusRequest.class);
- statusRequest.setContainerId(containerId);
- statusResponse = containerManager.getContainerStatus(statusRequest);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got the status of Container " + containerId);
- }
- } catch (YarnException e) {
- LOG.warn(
- "Unable to get the status of Container " + containerId, e);
- throw e;
- } catch (IOException e) {
- LOG.warn(
- "Unable to get the status of Container " + containerId, e);
- throw e;
- }
- return statusResponse.getStatus();
- }
- }
-
- @Override
- public Map startContainer(
- Container container, ContainerLaunchContext containerLaunchContext)
- throws YarnException, IOException {
- // Do synchronization on StartedContainer to prevent race condition
- // between startContainer and stopContainer
- synchronized (addStartedContainer(container)) {
- Map allServiceResponse;
- NMCommunicator nmCommunicator = null;
- try {
- nmCommunicator = new NMCommunicator(container.getId(),
- container.getNodeId(), container.getContainerToken());
- nmCommunicator.init(getConfig());
- nmCommunicator.start();
- allServiceResponse =
- nmCommunicator.startContainer(container, containerLaunchContext);
- } catch (YarnException e) {
- // Remove the started container if it failed to start
- removeStartedContainer(container.getId());
- throw e;
- } catch (IOException e) {
- removeStartedContainer(container.getId());
- throw e;
- } catch (Throwable t) {
- removeStartedContainer(container.getId());
- throw RPCUtil.getRemoteException(t);
- } finally {
- if (nmCommunicator != null) {
- nmCommunicator.stop();
- }
- }
- return allServiceResponse;
- }
-
// Three choices:
// 1. starting and releasing the proxy before and after each interaction
// 2. starting the proxy when starting the container and releasing it when
@@ -326,70 +171,109 @@ public synchronized ContainerStatus getContainerStatus()
// Adopt 1 currently
}
+ private void addStartingContainer(StartedContainer startedContainer)
+ throws YarnException {
+ if (startedContainers.putIfAbsent(startedContainer.containerId,
+ startedContainer) != null) {
+ throw RPCUtil.getRemoteException("Container "
+ + startedContainer.containerId.toString() + " is already started");
+ }
+ startedContainers
+ .put(startedContainer.getContainerId(), startedContainer);
+ }
+
+ private StartContainerRequest
+ createStartContainerRequest(Container container,
+ ContainerLaunchContext containerLaunchContext) {
+ StartContainerRequest request =
+ Records.newRecord(StartContainerRequest.class);
+ request.setContainerLaunchContext(containerLaunchContext);
+ request.setContainerToken(container.getContainerToken());
+ return request;
+ }
+
@Override
- public void stopContainer(ContainerId containerId, NodeId nodeId,
- Token containerToken) throws YarnException, IOException {
+ public void stopContainer(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException {
StartedContainer startedContainer = getStartedContainer(containerId);
- if (startedContainer == null) {
- throw RPCUtil.getRemoteException("Container " + containerId +
- " is either not started yet or already stopped");
- }
+
// Only allow one request of stopping the container to move forward
// When entering the block, check whether the precursor has already stopped
// the container
- synchronized (startedContainer) {
- if (startedContainer.stopped) {
- return;
- }
- NMCommunicator nmCommunicator = null;
- try {
- nmCommunicator =
- new NMCommunicator(containerId, nodeId, containerToken);
- nmCommunicator.init(getConfig());
- nmCommunicator.start();
- nmCommunicator.stopContainer();
- } finally {
- if (nmCommunicator != null) {
- nmCommunicator.stop();
+ if (startedContainer != null) {
+ synchronized (startedContainer) {
+ if (startedContainer.state != ContainerState.STARTED) {
+ return;
}
- startedContainer.stopped = true;
- removeStartedContainer(containerId);
+ stopContainerInternal(containerId, nodeId);
+ // Only after successful
+ startedContainer.state = ContainerState.STOPPED;
+ removeStartedContainer(startedContainer);
+ }
+ } else {
+ stopContainerInternal(containerId, nodeId);
+ }
+
+ }
+
+ private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
+ throws IOException, YarnException {
+ ContainerManagerProxyData proxy = null;
+ try {
+ proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+ proxy.getContainerManager().stopContainer(
+ createStopContainerRequest(containerId));
+ } finally {
+ if (proxy != null) {
+ cmProxy.closeProxy(proxy);
}
}
}
+ private StopContainerRequest createStopContainerRequest(
+ ContainerId containerId) {
+ StopContainerRequest request =
+ Records.newRecord(StopContainerRequest.class);
+ request.setContainerId(containerId);
+ return request;
+ }
+
@Override
public ContainerStatus getContainerStatus(ContainerId containerId,
- NodeId nodeId, Token containerToken)
- throws YarnException, IOException {
- NMCommunicator nmCommunicator = null;
+ NodeId nodeId) throws YarnException, IOException {
+
+ ContainerManagerProxyData proxy = null;
try {
- nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
- nmCommunicator.init(getConfig());
- nmCommunicator.start();
- ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
+ proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+ ContainerStatus containerStatus =
+ proxy.getContainerManager().getContainerStatus(
+ createGetContainerRequest(containerId)).getStatus();
return containerStatus;
} finally {
- if (nmCommunicator != null) {
- nmCommunicator.stop();
+ if (proxy != null) {
+ cmProxy.closeProxy(proxy);
}
}
}
- protected synchronized StartedContainer addStartedContainer(
+ private GetContainerStatusRequest createGetContainerRequest(
+ ContainerId containerId) {
+ GetContainerStatusRequest request =
+ Records.newRecord(GetContainerStatusRequest.class);
+ request.setContainerId(containerId);
+ return request;
+ }
+
+ protected synchronized StartedContainer createStartedContainer(
Container container) throws YarnException, IOException {
- if (startedContainers.containsKey(container.getId())) {
- throw RPCUtil.getRemoteException("Container " + container.getId() +
- " is already started");
- }
StartedContainer startedContainer = new StartedContainer(container.getId(),
container.getNodeId(), container.getContainerToken());
- startedContainers.put(startedContainer.getContainerId(), startedContainer);
return startedContainer;
}
- protected synchronized void removeStartedContainer(ContainerId containerId) {
- startedContainers.remove(containerId);
+ protected synchronized void
+ removeStartedContainer(StartedContainer container) {
+ startedContainers.remove(container.containerId);
}
protected synchronized StartedContainer getStartedContainer(
@@ -397,4 +281,44 @@ protected synchronized StartedContainer getStartedContainer(
return startedContainers.get(containerId);
}
+ @Override
+ public Map startContainer(
+ Container container, ContainerLaunchContext containerLaunchContext)
+ throws YarnException, IOException {
+ // Do synchronization on StartedContainer to prevent race condition
+ // between startContainer and stopContainer only when startContainer is
+ // in progress for a given container.
+ StartedContainer startingContainer = createStartedContainer(container);
+ synchronized (startingContainer) {
+ addStartingContainer(startingContainer);
+
+ Map allServiceResponse;
+ ContainerManagerProxyData proxy = null;
+ try {
+ proxy =
+ cmProxy.getProxy(container.getNodeId().toString(),
+ container.getId());
+ allServiceResponse =
+ proxy.getContainerManager().startContainer(
+ createStartContainerRequest(container,
+ containerLaunchContext)).getAllServiceResponse();
+ startingContainer.state = ContainerState.STARTED;
+ } catch (YarnException e) {
+ // Remove the started container if it failed to start
+ removeStartedContainer(startingContainer);
+ throw e;
+ } catch (IOException e) {
+ removeStartedContainer(startingContainer);
+ throw e;
+ } catch (Throwable t) {
+ removeStartedContainer(startingContainer);
+ throw RPCUtil.getRemoteException(t);
+ } finally {
+ if (proxy != null) {
+ cmProxy.closeProxy(proxy);
+ }
+ }
+ return allServiceResponse;
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
index 86a3eb6..7516de0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
@@ -29,12 +29,14 @@
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NMToken;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -49,6 +51,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -69,7 +72,8 @@
List nodeReports = null;
ApplicationAttemptId attemptId = null;
int nodeCount = 3;
-
+ ConcurrentHashMap nmTokens;
+
@Before
public void setup() throws YarnException, IOException {
// start minicluster
@@ -134,6 +138,7 @@ public void setup() throws YarnException, IOException {
if (iterationsLeft == 0) {
fail("Application hasn't bee started");
}
+ nmTokens = new ConcurrentHashMap();
// start am rm client
rmClient = new AMRMClientImpl(attemptId);
@@ -146,6 +151,7 @@ public void setup() throws YarnException, IOException {
nmClient = new NMClientImpl();
nmClient.init(conf);
nmClient.start();
+ nmClient.setNMTokenMap(nmTokens);
assertNotNull(nmClient);
assertEquals(STATE.STARTED, nmClient.getServiceState());
}
@@ -186,14 +192,13 @@ public void testNMClientNoCleanupOnStop()
assertEquals(0, nmClient.startedContainers.size());
}
- @Test (timeout = 60000)
+ @Test
public void testNMClient()
throws YarnException, IOException {
-
rmClient.registerApplicationMaster("Host", 10000, "");
testContainerManagement(nmClient, allocateContainers(rmClient, 5));
-
+
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
// stop the running containers on close
@@ -235,6 +240,11 @@ public void testNMClient()
for(Container container : allocResponse.getAllocatedContainers()) {
containers.add(container);
}
+ if (!allocResponse.getNMTokens().isEmpty()) {
+ for (NMToken token : allocResponse.getNMTokens()) {
+ nmTokens.put(token.getNodeId().toString(), token.getToken());
+ }
+ }
if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000);
@@ -253,8 +263,7 @@ private void testContainerManagement(NMClientImpl nmClient,
// getContainerStatus shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
- nmClient.getContainerStatus(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ nmClient.getContainerStatus(container.getId(), container.getNodeId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
@@ -264,12 +273,11 @@ private void testContainerManagement(NMClientImpl nmClient,
// stopContainer shouldn't be called before startContainer,
// otherwise, an exception will be thrown
try {
- nmClient.stopContainer(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ nmClient.stopContainer(container.getId(), container.getNodeId());
fail("Exception is expected");
} catch (YarnException e) {
if (!e.getMessage()
- .contains("is either not started yet or already stopped")) {
+ .contains("is not handled by this NodeManager")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e).initCause(
e));
@@ -298,8 +306,7 @@ private void testContainerManagement(NMClientImpl nmClient,
-1000);
try {
- nmClient.stopContainer(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ nmClient.stopContainer(container.getId(), container.getNodeId());
} catch (YarnException e) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
@@ -327,8 +334,7 @@ private void testGetContainerStatus(Container container, int index,
while (true) {
try {
ContainerStatus status = nmClient.getContainerStatus(
- container.getId(), container.getNodeId(),
- container.getContainerToken());
+ container.getId(), container.getNodeId());
// NodeManager may still need some time to get the stable
// container status
if (status.getState() == state) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
index 939072d..8823095 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
@@ -378,33 +378,30 @@ private NMClient mockNMClient(int mode)
when(client.startContainer(any(Container.class),
any(ContainerLaunchContext.class))).thenReturn(
Collections.emptyMap());
- when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
- any(Token.class))).thenReturn(
+ when(client.getContainerStatus(any(ContainerId.class),
+ any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class));
doNothing().when(client).stopContainer(any(ContainerId.class),
- any(NodeId.class), any(Token.class));
+ any(NodeId.class));
break;
case 1:
doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
.startContainer(any(Container.class),
any(ContainerLaunchContext.class));
doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
- .getContainerStatus(any(ContainerId.class), any(NodeId.class),
- any(Token.class));
+ .getContainerStatus(any(ContainerId.class), any(NodeId.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
- .stopContainer(any(ContainerId.class), any(NodeId.class),
- any(Token.class));
+ .stopContainer(any(ContainerId.class), any(NodeId.class));
break;
case 2:
when(client.startContainer(any(Container.class),
any(ContainerLaunchContext.class))).thenReturn(
Collections.emptyMap());
- when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
- any(Token.class))).thenReturn(
+ when(client.getContainerStatus(any(ContainerId.class),
+ any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
- .stopContainer(any(ContainerId.class), any(NodeId.class),
- any(Token.class));
+ .stopContainer(any(ContainerId.class), any(NodeId.class));
}
return client;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
index c7112e3..f79539a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
@@ -51,7 +51,7 @@ public TokenInfo getTokenInfo(Class> protocol, Configuration conf) {
@Override
public Class extends TokenSelector extends TokenIdentifier>>
value() {
- return ContainerTokenSelector.class;
+ return NMTokenSelector.class;
}
};
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java
new file mode 100644
index 0000000..c57e4a2
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.security;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class NMTokenSelector implements
+ TokenSelector {
+
+ private static final Log LOG = LogFactory
+ .getLog(NMTokenSelector.class);
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Token selectToken(Text service,
+ Collection> tokens) {
+ if (service == null) {
+ return null;
+ }
+ for (Token extends TokenIdentifier> token : tokens) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Looking for service: " + service + ". Current token is "
+ + token);
+ }
+ if (NMTokenIdentifier.KIND.equals(token.getKind()) &&
+ service.equals(token.getService())) {
+ return (Token) token;
+ }
+ }
+ return null;
+ }
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index a2edf0b..c17d095 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -152,7 +152,8 @@ public static Token newContainerToken(ContainerId cId, String host,
int port, String user, Resource r, long expiryTime, int masterKeyId,
byte[] password, long rmIdentifier) throws IOException {
ContainerTokenIdentifier identifier =
- new ContainerTokenIdentifier(cId, host, user, r, expiryTime,
+ new ContainerTokenIdentifier(cId, host + ":" + port, user, r,
+ expiryTime,
masterKeyId, rmIdentifier);
return newContainerToken(BuilderUtils.newNodeId(host, port), password,
identifier);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 8a34ad5..a63bd82 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +39,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
@@ -62,6 +64,7 @@
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -291,91 +294,74 @@ private UserGroupInformation getRemoteUgi()
// Obtain the needed ContainerTokenIdentifier from the remote-UGI. RPC layer
// currently sets only the required id, but iterate through anyways just to
// be sure.
- private ContainerTokenIdentifier selectContainerTokenIdentifier(
+ @Private
+ @VisibleForTesting
+ protected NMTokenIdentifier selectNMTokenIdentifier(
UserGroupInformation remoteUgi) {
Set tokenIdentifiers = remoteUgi.getTokenIdentifiers();
- ContainerTokenIdentifier resultId = null;
+ NMTokenIdentifier resultId = null;
for (TokenIdentifier id : tokenIdentifiers) {
- if (id instanceof ContainerTokenIdentifier) {
- resultId = (ContainerTokenIdentifier) id;
+ if (id instanceof NMTokenIdentifier) {
+ resultId = (NMTokenIdentifier) id;
break;
}
}
return resultId;
}
- @Private
- @VisibleForTesting
- protected ContainerTokenIdentifier getContainerTokenIdentifier(
- UserGroupInformation remoteUgi,
- ContainerTokenIdentifier containerTokenIdentifier)
- throws YarnException {
- if (UserGroupInformation.isSecurityEnabled()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
- + remoteUgi.getTokenIdentifiers().size());
- }
- // Get the tokenId from the remote user ugi
- return selectContainerTokenIdentifier(remoteUgi);
- } else {
- return containerTokenIdentifier;
- }
- }
-
/**
- * Authorize the request.
- *
- * @param containerIDStr
- * of the container
- * @param launchContext
- * passed if verifying the startContainer, null otherwise.
- * @param remoteUgi
+ * @param containerTokenIdentifier
+ * of the container to be started
+ * @param ugi
* ugi corresponding to the remote end making the api-call
* @throws YarnException
*/
@Private
@VisibleForTesting
- protected void authorizeRequest(String containerIDStr,
- ContainerLaunchContext launchContext,
- UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
- throws YarnException {
+ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ UserGroupInformation ugi) throws YarnException {
+ ContainerId containerId = containerTokenIdentifier.getContainerID();
+ String containerIDStr = containerId.toString();
boolean unauthorized = false;
StringBuilder messageBuilder =
new StringBuilder("Unauthorized request to start container. ");
-
- if (!remoteUgi.getUserName().equals(containerIDStr)) {
+ if (nmTokenIdentifier.getApplicationAttemptId().equals(
+ containerId.getApplicationAttemptId())) {
+ messageBuilder.append("\nNMToken for application attempt : ")
+ .append(nmTokenIdentifier.getApplicationAttemptId())
+ .append(" was used for start container with container token")
+ .append(" for application attempt : ")
+ .append(containerId.getApplicationAttemptId())
+ .append(" for container Id : ")
+ .append(containerId);
+ } else if (!ugi.getUserName().equals(
+ containerId.getApplicationAttemptId().toString())) {
unauthorized = true;
- messageBuilder.append("\nExpected containerId: "
- + remoteUgi.getUserName() + " Found: " + containerIDStr);
- } else if (launchContext != null) {
- // Verify other things also for startContainer() request.
-
-
- if (tokenId == null) {
- unauthorized = true;
- messageBuilder
- .append("\nNo ContainerToken found for " + containerIDStr);
- } else {
-
- // Is the container being relaunched? Or RPC layer let startCall with
- // tokens generated off old-secret through?
- if (!this.context.getContainerTokenSecretManager()
- .isValidStartContainerRequest(tokenId.getContainerID())) {
- unauthorized = true;
- messageBuilder.append("\n Attempt to relaunch the same "
- + "container with id " + containerIDStr + ".");
- }
-
- // Ensure the token is not expired.
- // Token expiry is not checked for stopContainer/getContainerStatus
- if (tokenId.getExpiryTimeStamp() < System.currentTimeMillis()) {
- unauthorized = true;
- messageBuilder.append("\nThis token is expired. current time is "
- + System.currentTimeMillis() + " found "
- + tokenId.getExpiryTimeStamp());
- }
- }
+ messageBuilder.append("\nExpected applicationAttemptId: ")
+ .append(ugi.getUserName()).append(" Found: ")
+ .append(containerId.getApplicationAttemptId().toString());
+ } else if (!this.context.getContainerTokenSecretManager()
+ .isValidStartContainerRequest(containerId)) {
+ // Is the container being relaunched? Or RPC layer let startCall with
+ // tokens generated off old-secret through?
+ unauthorized = true;
+ messageBuilder.append("\n Attempt to relaunch the same ")
+ .append("container with id ").append(containerIDStr).append(".");
+ } else if (containerTokenIdentifier.getExpiryTimeStamp() < System
+ .currentTimeMillis()) {
+ // Ensure the token is not expired.
+ unauthorized = true;
+ messageBuilder.append("\nThis token is expired. current time is ")
+ .append(System.currentTimeMillis()).append(" found ")
+ .append(containerTokenIdentifier.getExpiryTimeStamp());
+ } else if (containerTokenIdentifier.getRMIdentifer() != nodeStatusUpdater
+ .getRMIdentifier()) {
+ // Is the container coming from unknown RM
+ unauthorized = true;
+ messageBuilder.append("\nContainer ").append(containerIDStr)
+ .append(" rejected as it is allocated by a previous RM");
}
if (unauthorized) {
@@ -384,7 +370,7 @@ protected void authorizeRequest(String containerIDStr,
throw RPCUtil.getRemoteException(msg);
}
}
-
+
/**
* Start a container on this NodeManager.
*/
@@ -394,45 +380,124 @@ public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnException, IOException {
if (blockNewContainerRequests.get()) {
- throw new NMNotYetReadyException(
- "Rejecting new containers as NodeManager has not" +
- " yet connected with ResourceManager");
+ throw RPCUtil.getRemoteException(new NMNotYetReadyException(
+ "Rejecting new containers as NodeManager has not"
+ + " yet connected with ResourceManager"));
}
+ /*
+ * 1) It should save the NMToken into NMTokenSecretManager. This is done
+ * here instead of RPC layer because at the time of opening/authenticating
+ * the connection it doesn't know what all RPC calls user will make on it.
+ * Also new NMToken is issued only at startContainer (once it gets renewed).
+ *
+ * 2) It should validate containerToken. Need to check below things. a) It
+ * is signed by correct master key (part of retrieve password). b) It
+ * belongs to correct Node Manager (part of retrieve password). c) It has
+ * correct RMIdentifier. d) It is not expired.
+ */
+ // update NMToken
+
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
+ updateNMTokenIdentifier(nmTokenIdentifier);
+
+ // Validate containerToken
+ ContainerTokenIdentifier containerTokenIdentifier =
+ verifyAndGetContainerTokenIdentifier(request.getContainerToken());
+
+ authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier,
+ remoteUgi);
+
+ ContainerId containerId = containerTokenIdentifier.getContainerID();
+ String containerIdStr = containerId.toString();
+ String user = containerTokenIdentifier.getApplicationSubmitter();
+
+ LOG.info("Start request for " + containerIdStr + " by user " + user);
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
- org.apache.hadoop.yarn.api.records.Token token = request.getContainerToken();
- ContainerTokenIdentifier tokenIdentifier = null;
- try {
- tokenIdentifier = BuilderUtils.newContainerTokenIdentifier(token);
- } catch (IOException e) {
- throw RPCUtil.getRemoteException(e);
+ Credentials credentials = parseCredentials(launchContext);
+
+ Container container =
+ new ContainerImpl(getConfig(), this.dispatcher, launchContext,
+ credentials, metrics, containerTokenIdentifier);
+ ApplicationId applicationID =
+ containerId.getApplicationAttemptId().getApplicationId();
+ if (context.getContainers().putIfAbsent(containerId, container) != null) {
+ NMAuditLogger.logFailure(user, AuditConstants.START_CONTAINER,
+ "ContainerManagerImpl", "Container already running on this node!",
+ applicationID, containerId);
+ throw RPCUtil.getRemoteException("Container " + containerIdStr
+ + " already is running on this node!!");
}
- UserGroupInformation remoteUgi = getRemoteUgi();
- ContainerTokenIdentifier tokenId =
- getContainerTokenIdentifier(remoteUgi, tokenIdentifier);
+ // Create the application
+ Application application =
+ new ApplicationImpl(dispatcher, this.aclsManager, user, applicationID,
+ credentials, context);
+ if (null == context.getApplications().putIfAbsent(applicationID,
+ application)) {
+ LOG.info("Creating a new application reference for app " + applicationID);
- ContainerId containerID = tokenId.getContainerID();
- String containerIDStr = containerID.toString();
+ dispatcher.getEventHandler().handle(
+ new ApplicationInitEvent(applicationID, container.getLaunchContext()
+ .getApplicationACLs()));
+ }
- authorizeRequest(containerIDStr, launchContext, remoteUgi, tokenId);
+ dispatcher.getEventHandler().handle(
+ new ApplicationContainerInitEvent(container));
- // Is the container coming from unknown RM
- if (tokenId.getRMIdentifer() != nodeStatusUpdater
- .getRMIdentifier()) {
- String msg = "\nContainer "+ containerIDStr
- + " rejected as it is allocated by a previous RM";
- LOG.error(msg);
- throw new InvalidContainerException(msg);
+ this.context.getContainerTokenSecretManager().startContainerSuccessful(
+ containerTokenIdentifier);
+ NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
+ "ContainerManageImpl", applicationID, containerId);
+ StartContainerResponse response =
+ recordFactory.newRecordInstance(StartContainerResponse.class);
+ response.setAllServiceResponse(auxiliaryServices.getMeta());
+ // TODO launchedContainer misplaced -> doesn't necessarily mean a container
+ // launch. A finished Application will not launch containers.
+ metrics.launchedContainer();
+ metrics.allocateContainer(containerTokenIdentifier.getResource());
+ return response;
+ }
+
+ protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
+ org.apache.hadoop.yarn.api.records.Token token) throws YarnException,
+ InvalidToken {
+ ContainerTokenIdentifier containerTokenIdentifier = null;
+ try {
+ containerTokenIdentifier =
+ BuilderUtils.newContainerTokenIdentifier(token);
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
}
+ byte[] password =
+ context.getContainerTokenSecretManager().retrievePassword(
+ containerTokenIdentifier);
+ byte[] tokenPass = token.getPassword().array();
+ if (password == null || tokenPass == null
+ || !Arrays.equals(password, tokenPass)) {
+ throw new InvalidToken(
+ "Invalid container token used for starting container on : "
+ + context.getNodeId().toString());
+ }
+ return containerTokenIdentifier;
+ }
- LOG.info("Start request for " + containerIDStr + " by user "
- + tokenId.getApplicationSubmitter());
+ @Private
+ @VisibleForTesting
+ protected void updateNMTokenIdentifier(NMTokenIdentifier nmTokenIdentifier)
+ throws InvalidToken {
+ context.getNMTokenSecretManager().appAttemptStartContainer(
+ nmTokenIdentifier);
+ }
+ private Credentials parseCredentials(ContainerLaunchContext launchContext)
+ throws YarnException {
+ Credentials credentials = new Credentials();
// //////////// Parse credentials
ByteBuffer tokens = launchContext.getTokens();
- Credentials credentials = new Credentials();
+
if (tokens != null) {
DataInputByteBuffer buf = new DataInputByteBuffer();
tokens.rewind();
@@ -440,8 +505,7 @@ public StartContainerResponse startContainer(StartContainerRequest request)
try {
credentials.readTokenStorageStream(buf);
if (LOG.isDebugEnabled()) {
- for (Token extends TokenIdentifier> tk : credentials
- .getAllTokens()) {
+ for (Token extends TokenIdentifier> tk : credentials.getAllTokens()) {
LOG.debug(tk.getService() + " = " + tk.toString());
}
}
@@ -450,54 +514,7 @@ public StartContainerResponse startContainer(StartContainerRequest request)
}
}
// //////////// End of parsing credentials
- String user = tokenId.getApplicationSubmitter();
-
- Container container =
- new ContainerImpl(getConfig(), this.dispatcher, launchContext,
- credentials, metrics, tokenId);
- ApplicationId applicationID =
- containerID.getApplicationAttemptId().getApplicationId();
- if (context.getContainers().putIfAbsent(containerID, container) != null) {
- NMAuditLogger.logFailure(user,
- AuditConstants.START_CONTAINER, "ContainerManagerImpl",
- "Container already running on this node!",
- applicationID, containerID);
- throw RPCUtil.getRemoteException("Container " + containerIDStr
- + " already is running on this node!!");
- }
-
- // Create the application
- Application application =
- new ApplicationImpl(dispatcher, this.aclsManager,
- user, applicationID, credentials,
- context);
- if (null ==
- context.getApplications().putIfAbsent(applicationID, application)) {
- LOG.info("Creating a new application reference for app "
- + applicationID);
- dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, container
- .getLaunchContext().getApplicationACLs()));
- }
-
- // TODO: Validate the request
- dispatcher.getEventHandler().handle(
- new ApplicationContainerInitEvent(container));
-
- this.context.getContainerTokenSecretManager().startContainerSuccessful(
- tokenId);
- NMAuditLogger.logSuccess(user,
- AuditConstants.START_CONTAINER, "ContainerManageImpl",
- applicationID, containerID);
-
- StartContainerResponse response =
- recordFactory.newRecordInstance(StartContainerResponse.class);
- response.setAllServiceResponse(auxiliaryServices.getMeta());
- // TODO launchedContainer misplaced -> doesn't necessarily mean a container
- // launch. A finished Application will not launch containers.
- metrics.launchedContainer();
- metrics.allocateContainer(tokenId.getResource());
- return response;
+ return credentials;
}
/**
@@ -510,34 +527,20 @@ public StopContainerResponse stopContainer(StopContainerRequest request)
ContainerId containerID = request.getContainerId();
String containerIDStr = containerID.toString();
-
- // TODO: Only the container's owner can kill containers today.
-
- UserGroupInformation remoteUgi = getRemoteUgi();
Container container = this.context.getContainers().get(containerID);
+ LOG.info("Getting container-status for " + containerIDStr);
+ authorizeGetStopRequest(containerID, container, true);
+
StopContainerResponse response =
recordFactory.newRecordInstance(StopContainerResponse.class);
- if (container == null) {
- LOG.warn("Trying to stop unknown container " + containerID);
- NMAuditLogger.logFailure("UnknownUser",
- AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
- "Trying to stop unknown container!",
- containerID.getApplicationAttemptId().getApplicationId(),
- containerID);
- return response; // Return immediately.
- }
- authorizeRequest(containerIDStr, null, remoteUgi,
- getContainerTokenIdentifier(remoteUgi, container.getContainerTokenIdentifier()));
-
dispatcher.getEventHandler().handle(
- new ContainerKillEvent(containerID,
- "Container killed by the ApplicationMaster."));
-
- NMAuditLogger.logSuccess(container.getUser(),
- AuditConstants.STOP_CONTAINER, "ContainerManageImpl",
- containerID.getApplicationAttemptId().getApplicationId(),
- containerID);
+ new ContainerKillEvent(containerID,
+ "Container killed by the ApplicationMaster."));
+
+ NMAuditLogger.logSuccess(container.getUser(),
+ AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
+ .getApplicationAttemptId().getApplicationId(), containerID);
// TODO: Move this code to appropriate place once kill_container is
// implemented.
@@ -548,23 +551,14 @@ public StopContainerResponse stopContainer(StopContainerRequest request)
@Override
public GetContainerStatusResponse getContainerStatus(
- GetContainerStatusRequest request) throws YarnException,
- IOException {
+ GetContainerStatusRequest request) throws YarnException, IOException {
ContainerId containerID = request.getContainerId();
String containerIDStr = containerID.toString();
+ Container container = this.context.getContainers().get(containerID);
- // TODO: Only the container's owner can get containers' status today.
-
- UserGroupInformation remoteUgi = getRemoteUgi();
LOG.info("Getting container-status for " + containerIDStr);
- Container container = this.context.getContainers().get(containerID);
- if (container == null) {
- throw RPCUtil.getRemoteException("Container " + containerIDStr
- + " is not handled by this NodeManager");
- }
- authorizeRequest(containerIDStr, null, remoteUgi,
- getContainerTokenIdentifier(remoteUgi, container.getContainerTokenIdentifier()));
+ authorizeGetStopRequest(containerID, container, false);
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
LOG.info("Returning " + containerStatus);
@@ -574,6 +568,48 @@ public GetContainerStatusResponse getContainerStatus(
return response;
}
+ @Private
+ @VisibleForTesting
+ protected void authorizeGetStopRequest(ContainerId containerId,
+ Container container, boolean stopRequest) throws YarnException {
+
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+
+ /*
+ * For get/stop container status; we need to verify that 1) User (NMToken)
+ * application attempt only has started container. 2) Requested containerId
+ * belongs to the same application attempt (NMToken) which was used. (Note:-
+ * This will prevent user in knowing another application's containers).
+ */
+
+ if ((!identifier.getApplicationAttemptId().equals(
+ containerId.getApplicationAttemptId()))
+ || (container != null && !identifier.getApplicationAttemptId().equals(
+ container.getContainerId().getApplicationAttemptId()))) {
+ if (stopRequest) {
+ LOG.warn(identifier.getApplicationAttemptId()
+ + " attempted to stop non-application container : "
+ + container.getContainerId().toString());
+ NMAuditLogger.logFailure("UnknownUser", AuditConstants.STOP_CONTAINER,
+ "ContainerManagerImpl", "Trying to stop unknown container!",
+ identifier.getApplicationAttemptId().getApplicationId(),
+ container.getContainerId());
+ } else {
+ LOG.warn(identifier.getApplicationAttemptId()
+ + " attempted to get get status for non-application container : "
+ + container.getContainerId().toString());
+ }
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " is not started by this application attempt.");
+ }
+
+ if (container == null) {
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " is not handled by this NodeManager");
+ }
+ }
+
class ContainerEventDispatcher implements EventHandler {
@Override
public void handle(ContainerEvent event) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
index c5e00f2..bc349f4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
@@ -30,13 +30,12 @@
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* The NM maintains only two master-keys. The current key that RM knows and the
* key from the previous rolling-interval.
@@ -51,6 +50,7 @@
private MasterKeyData previousMasterKey;
private final Map> oldMasterKeys;
+ private String nodeHostAddr;
public NMContainerTokenSecretManager(Configuration conf) {
super(conf);
@@ -122,6 +122,15 @@ public synchronized void setMasterKey(MasterKey masterKeyRecord) {
masterKeyToUse = this.oldMasterKeys.get(appId).get(containerId);
}
+ if (nodeHostAddr != null
+ && !identifier.getNmHostAddress().equals(nodeHostAddr)) {
+ // Valid container token used for incorrect node.
+ throw new SecretManager.InvalidToken("Given Container "
+ + identifier.getContainerID().toString()
+ + " identifier is not valid for current Node manager. Expected : "
+ + nodeHostAddr + " Found : " + identifier.getNmHostAddress());
+ }
+
if (masterKeyToUse != null) {
return retrievePasswordInternal(identifier, masterKeyToUse);
}
@@ -186,4 +195,9 @@ private synchronized void addKeyForContainerId(ContainerId containerId,
public synchronized void appFinished(ApplicationId appId) {
this.oldMasterKeys.remove(appId);
}
+
+ public synchronized void setNodeId(NodeId nodeId) {
+ nodeHostAddr = nodeId.toString();
+ LOG.info("Updating node address : " + nodeHostAddr);
+ }
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
index 28c40e6..89ac742 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
@@ -18,13 +18,17 @@
package org.apache.hadoop.yarn.server.nodemanager.security;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
@@ -38,10 +42,15 @@
private MasterKeyData previousMasterKey;
private final Map oldMasterKeys;
+ private final Map> appToAppAttemptMap;
+ private NodeId nodeId;
+
public NMTokenSecretManagerInNM() {
this.oldMasterKeys =
new HashMap();
+ appToAppAttemptMap =
+ new HashMap>();
}
/**
@@ -69,46 +78,104 @@ public synchronized void setMasterKey(MasterKey masterKey) {
}
/**
- * This method will be used to verify NMTokens generated by different
- * master keys.
+ * This method will be used to verify NMTokens generated by different master
+ * keys.
*/
@Override
- public synchronized byte[] retrievePassword(
- NMTokenIdentifier identifier) throws InvalidToken {
+ public synchronized byte[] retrievePassword(NMTokenIdentifier identifier)
+ throws InvalidToken {
int keyId = identifier.getMastKeyId();
ApplicationAttemptId appAttemptId = identifier.getApplicationAttemptId();
-
+
/*
- * MasterKey used for retrieving password will be as follows.
- * 1) By default older saved master key will be used.
- * 2) If identifier's master key id matches that of previous master key
- * id then previous key will be used.
- * 3) If identifier's master key id matches that of current master key
- * id then current key will be used.
+ * MasterKey used for retrieving password will be as follows. 1) By default
+ * older saved master key will be used. 2) If identifier's master key id
+ * matches that of previous master key id then previous key will be used. 3)
+ * If identifier's master key id matches that of current master key id then
+ * current key will be used.
*/
MasterKeyData oldMasterKey = oldMasterKeys.get(appAttemptId);
MasterKeyData masterKeyToUse = oldMasterKey;
if (previousMasterKey != null
&& keyId == previousMasterKey.getMasterKey().getKeyId()) {
masterKeyToUse = previousMasterKey;
- } else if ( keyId == currentMasterKey.getMasterKey().getKeyId()) {
+ } else if (keyId == currentMasterKey.getMasterKey().getKeyId()) {
masterKeyToUse = currentMasterKey;
}
+ if (nodeId != null && !identifier.getNodeId().equals(nodeId)) {
+ throw new InvalidToken("Given NMToken for application : "
+ + appAttemptId.toString() + " is not valid for current node manager."
+ + "expected : " + nodeId.toString() + " found : "
+ + identifier.getNodeId().toString());
+ }
+
if (masterKeyToUse != null) {
byte[] password = retrivePasswordInternal(identifier, masterKeyToUse);
- if (masterKeyToUse.getMasterKey().getKeyId() != oldMasterKey
- .getMasterKey().getKeyId()) {
- oldMasterKeys.put(appAttemptId, masterKeyToUse);
- }
+ LOG.debug("NMToken password retrieved successfully!!");
return password;
}
-
+
throw new InvalidToken("Given NMToken for application : "
+ appAttemptId.toString() + " seems to have been generated illegally.");
}
+
+ public synchronized void appFinished(ApplicationId appId) {
+ List appAttemptList = appToAppAttemptMap.get(appId);
+ if (appAttemptList != null) {
+ LOG.debug("Removing application attempts NMToken keys for application "
+ + appId);
+ for (ApplicationAttemptId appAttemptId : appAttemptList) {
+ this.oldMasterKeys.remove(appAttemptId);
+ }
+ appToAppAttemptMap.remove(appId);
+ } else {
+ LOG.error("No application Attempt for application : " + appId
+ + " started on this NM.");
+ }
+ }
+
+ /**
+ * This will be called by startContainer. It will add the master key into
+ * the cache used for starting this container. This should be called before
+ * validating the startContainer request.
+ */
+ public synchronized void appAttemptStartContainer(
+ NMTokenIdentifier identifier)
+ throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
+ ApplicationAttemptId appAttemptId = identifier.getApplicationAttemptId();
+ if (!appToAppAttemptMap.containsKey(appAttemptId.getApplicationId())) {
+ // First application attempt for the given application
+ appToAppAttemptMap.put(appAttemptId.getApplicationId(),
+ new ArrayList());
+ }
+ MasterKeyData oldKey = oldMasterKeys.get(appAttemptId);
+
+ if (oldKey == null) {
+ // This is a new application attempt.
+ appToAppAttemptMap.get(appAttemptId.getApplicationId()).add(appAttemptId);
+ }
+ if (oldKey == null
+ || oldKey.getMasterKey().getKeyId() != identifier.getMastKeyId()) {
+ // Update key only if it is modified.
+ LOG.debug("NMToken key updated for application attempt : "
+ + identifier.getApplicationAttemptId().toString());
+ if (identifier.getMastKeyId() == currentMasterKey.getMasterKey()
+ .getKeyId()) {
+ oldMasterKeys.put(appAttemptId, currentMasterKey);
+ } else if (previousMasterKey != null
+ && identifier.getMastKeyId() == previousMasterKey.getMasterKey()
+ .getKeyId()) {
+ oldMasterKeys.put(appAttemptId, previousMasterKey);
+ } else {
+ throw new InvalidToken(
+ "Older NMToken should not be used while starting the container.");
+ }
+ }
+ }
- public synchronized void appFinished(ApplicationAttemptId appAttemptId) {
- this.oldMasterKeys.remove(appAttemptId);
+ public void setNodeId(NodeId nodeId) {
+ LOG.debug("updating nodeId : " + nodeId);
+ this.nodeId = nodeId;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index d83f9b6..5261781 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -27,10 +27,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -126,6 +128,19 @@ public void handle(LocalizationEvent event) {
}
@Override
+ protected UserGroupInformation getRemoteUgi() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, getContext()
+ .getNodeId(), "testuser", getContext().getNMTokenSecretManager().getCurrentKey()
+ .getKeyId()));
+ return ugi;
+ }
+
+ @Override
@SuppressWarnings("unchecked")
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
@@ -179,17 +194,16 @@ public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
}
@Override
- protected void authorizeRequest(String containerIDStr,
- ContainerLaunchContext launchContext,
- UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
- throws YarnException {
- // do Nothing
+ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ UserGroupInformation ugi) throws YarnException {
+ // do nothing
}
-
+
@Override
- protected ContainerTokenIdentifier
- getContainerTokenIdentifier(UserGroupInformation remoteUgi,
- ContainerTokenIdentifier containerTokenId) throws YarnException {
- return containerTokenId;
+ protected void authorizeGetStopRequest(ContainerId containerId,
+ Container container, boolean stopRequest) throws YarnException {
+ // do nothing
}
+
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
index 9946704..668b85b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -133,18 +134,13 @@ public long getRMIdentifier() {
ApplicationAttemptId.newInstance(applicationId, 0);
ContainerId cID = ContainerId.newInstance(applicationAttemptId, 0);
- Resource r = BuilderUtils.newResource(1024, 1);
String user = "testing";
- String host = "127.0.0.1";
- int port = 1234;
- Token containerToken =
- BuilderUtils.newContainerToken(cID, host, port, user, r,
- System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
- SIMULATED_RM_IDENTIFIER);
StartContainerRequest request =
recordFactory.newRecordInstance(StartContainerRequest.class);
request.setContainerLaunchContext(launchContext);
- request.setContainerToken(containerToken);
+ request.setContainerToken(TestContainerManager.createContainerToken(cID,
+ SIMULATED_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
containerManager.startContainer(request);
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
index 3cf4601..e3acb91 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
@@ -50,17 +50,16 @@
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
@@ -131,24 +130,23 @@ public void testClearLocalDirWhenNodeReboot() throws IOException,
containerLaunchContext.setLocalResources(localResources);
List commands = new ArrayList();
containerLaunchContext.setCommands(commands);
- Resource resource = Records.newRecord(Resource.class);
- resource.setMemory(1024);
- NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 12345);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
- user, resource, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), 0);
final StartContainerRequest startRequest =
Records.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ NodeId nodeId = nm.getNMContext().getNodeId();
+ startRequest.setContainerToken(TestContainerManager.createContainerToken(
+ cId, 0, nodeId, destinationFile, nm.getNMContext()
+ .getContainerTokenSecretManager()));
final UserGroupInformation currentUser = UserGroupInformation
- .createRemoteUser(cId.toString());
+ .createRemoteUser(cId.getApplicationAttemptId().toString());
+ NMTokenIdentifier nmIdentifier =
+ new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 123);
+ currentUser.addTokenIdentifier(nmIdentifier);
currentUser.doAs(new PrivilegedExceptionAction() {
@Override
public Void run() throws YarnException, IOException {
- containerManager.startContainer(startRequest);
+ nm.getContainerManager().startContainer(startRequest);
return null;
}
});
@@ -208,8 +206,6 @@ public Void run() throws YarnException, IOException {
ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
== 0);
- verify(delService, times(1)).delete(eq(user),
- argThat(new PathInclude(user)));
verify(delService, times(1)).delete(
(String) isNull(),
argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
index 37568a3..92deff8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
@@ -61,9 +61,12 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -161,7 +164,7 @@ public static void startContainer(NodeManager nm, ContainerId cId,
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
- NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
+ NodeId nodeId = BuilderUtils.newNodeId("localhost", 12345);
URL localResourceUri =
ConverterUtils.getYarnUrlFromPath(localFS
@@ -180,17 +183,22 @@ public static void startContainer(NodeManager nm, ContainerId cId,
containerLaunchContext.setLocalResources(localResources);
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource resource = BuilderUtils.newResource(1024, 1);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
- user, resource, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), 0);
StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest
+ .setContainerToken(TestContainerManager.createContainerToken(cId, 0,
+ nodeId, user, nm.getNMContext().getContainerTokenSecretManager()));
+ final InetSocketAddress containerManagerBindAddress =
+ NetUtils.createSocketAddrForHost("127.0.0.1", 12345);
UserGroupInformation currentUser = UserGroupInformation
.createRemoteUser(cId.toString());
+ org.apache.hadoop.security.token.Token nmToken =
+ ProtoUtils.convertFromProtoFormat(
+ nm.getNMContext().getNMTokenSecretManager()
+ .createNMToken(cId.getApplicationAttemptId(), nodeId, user),
+ containerManagerBindAddress);
+ currentUser.addToken(nmToken);
ContainerManager containerManager =
currentUser.doAs(new PrivilegedAction() {
@@ -198,8 +206,6 @@ public static void startContainer(NodeManager nm, ContainerId cId,
public ContainerManager run() {
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
- InetSocketAddress containerManagerBindAddress =
- NetUtils.createSocketAddrForHost("127.0.0.1", 12345);
return (ContainerManager) rpc.getProxy(ContainerManager.class,
containerManagerBindAddress, conf);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 46cbff6..acef7c0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -201,7 +201,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
String user = "testUser";
ContainerTokenIdentifier containerToken =
BuilderUtils.newContainerTokenIdentifier(BuilderUtils
- .newContainerToken(firstContainerID, "127.0.0.1", 1234, user,
+ .newContainerToken(firstContainerID, "localhost", 1234, user,
resource, currentTime + 10000, 123, "password".getBytes(),
currentTime));
Container container =
@@ -232,7 +232,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Resource resource = BuilderUtils.newResource(3, 1);
ContainerTokenIdentifier containerToken =
BuilderUtils.newContainerTokenIdentifier(BuilderUtils
- .newContainerToken(secondContainerID, "127.0.0.1", 1234, user,
+ .newContainerToken(secondContainerID, "localhost", 1234, user,
resource, currentTime + 10000, 123,
"password".getBytes(), currentTime));
Container container =
@@ -1168,8 +1168,8 @@ private void verifyNodeStartFailure(String errMessage) throws Exception {
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
- conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
- conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
+ conf.set(YarnConfiguration.NM_ADDRESS, "localhost:12345");
+ conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "localhost:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
.getPath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index e1806c1..1fd044d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -30,20 +30,20 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -150,7 +151,7 @@ public void setup() throws IOException {
LOG.info("Created localDir in " + localDir.getAbsolutePath());
LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
- String bindAddress = "0.0.0.0:5555";
+ String bindAddress = "0.0.0.0:12345";
conf.set(YarnConfiguration.NM_ADDRESS, bindAddress);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
@@ -173,6 +174,7 @@ public void setup() throws IOException {
protected ContainerManagerImpl
createContainerManager(DeletionService delSrvc) {
+
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, new ApplicationACLsManager(conf), dirsHandler) {
@Override
@@ -182,11 +184,24 @@ public void setup() throws IOException {
}
@Override
- protected void authorizeRequest(String containerIDStr,
- ContainerLaunchContext launchContext, UserGroupInformation remoteUgi,
- ContainerTokenIdentifier tokenId) throws YarnException {
- // do nothing
- }
+ protected void authorizeGetStopRequest(ContainerId containerId,
+ Container container, boolean stopRequest) throws YarnException {
+ // do nothing
+ }
+
+ @Override
+ protected void authorizeStartRequest(
+ NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ UserGroupInformation ugi) throws YarnException {
+ // do nothing
+ }
+
+ @Override
+ protected void updateNMTokenIdentifier(
+ NMTokenIdentifier nmTokenIdentifier) throws InvalidToken {
+ // Do nothing
+ }
};
}
@@ -242,7 +257,7 @@ static void waitForApplicationState(ContainerManagerImpl containerManager,
throws InterruptedException {
// Wait for app-finish
Application app =
- containerManager.context.getApplications().get(appID);
+ containerManager.getContext().getApplications().get(appID);
int timeout = 0;
while (!(app.getApplicationState().equals(finalState))
&& timeout++ < 15) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 5fb8812..70a5907 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -47,10 +48,13 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
@@ -59,8 +63,11 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Before;
import org.junit.Test;
public class TestContainerManager extends BaseContainerManagerTest {
@@ -72,6 +79,12 @@ public TestContainerManager() throws UnsupportedFileSystemException {
static {
LOG = LogFactory.getLog(TestContainerManager.class);
}
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+ }
private ContainerId createContainerId() {
ApplicationId appId = ApplicationId.newInstance(0, 0);
@@ -81,6 +94,32 @@ private ContainerId createContainerId() {
return containerId;
}
+ @Override
+ protected ContainerManagerImpl
+ createContainerManager(DeletionService delSrvc) {
+ return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
+ metrics, new ApplicationACLsManager(conf), dirsHandler) {
+ @Override
+ public void
+ setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+ // do nothing
+ }
+
+ @Override
+ protected UserGroupInformation getRemoteUgi() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+ .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+ .getKeyId()));
+ return ugi;
+ }
+ };
+ }
+
@Test
public void testContainerManagerInitialization() throws IOException {
@@ -134,16 +173,12 @@ public void testContainerSetup() throws IOException, InterruptedException,
new HashMap();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
- Resource r = BuilderUtils.newResource(512, 1);
- int port = 12345;
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(createContainerToken(cId,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
containerManager.startContainer(startRequest);
@@ -227,16 +262,12 @@ public void testContainerLaunchAndStop() throws IOException,
containerLaunchContext.setLocalResources(localResources);
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(100, 1);
- int port = 12345;
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(createContainerToken(cId,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
containerManager.startContainer(startRequest);
int timeoutSecs = 0;
@@ -335,15 +366,12 @@ private void testContainerLaunchAndExit(int exitCode) throws IOException,
containerLaunchContext.setLocalResources(localResources);
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(100, 1);
- int port = 12345;
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
+
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(createContainerToken(cId,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
containerManager.startContainer(startRequest);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
@@ -423,16 +451,10 @@ public void testLocalFilesCleanup() throws InterruptedException,
new HashMap();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
- Resource r = BuilderUtils.newResource(100, 1);
- int port = 12345;
-
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
request.setContainerLaunchContext(containerLaunchContext);
- request.setContainerToken(containerToken);
+ request.setContainerToken(createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(), user, context.getContainerTokenSecretManager()));
containerManager.startContainer(request);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
@@ -503,24 +525,19 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
- String host = "127.0.0.1";
- int port = 1234;
ContainerId cId1 = createContainerId();
ContainerId cId2 = createContainerId();
containerLaunchContext
.setLocalResources(new HashMap());
- Resource mockResource = BuilderUtils.newResource(1024, 1);
// Construct the Container with Invalid RMIdentifier
StartContainerRequest startRequest1 =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest1.setContainerLaunchContext(containerLaunchContext);
- Token containerToken1 =
- BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
- System.currentTimeMillis() + 10000, 123, "password".getBytes(),
- (long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
- startRequest1.setContainerToken(containerToken1);
+ startRequest1.setContainerToken(createContainerToken(cId1,
+ ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager()));
boolean catchException = false;
try {
containerManager.startContainer(startRequest1);
@@ -528,8 +545,8 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
catchException = true;
Assert.assertTrue(e.getMessage().contains(
"Container " + cId1 + " rejected as it is allocated by a previous RM"));
- Assert.assertEquals(InvalidContainerException.class.getName(), e
- .getClass().getName());
+ Assert.assertTrue(e.getMessage().contains(
+ "Unauthorized request to start container."));
}
// Verify that startContainer fail because of invalid container request
@@ -539,11 +556,9 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
StartContainerRequest startRequest2 =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest2.setContainerLaunchContext(containerLaunchContext);
- Token containerToken2 =
- BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
- System.currentTimeMillis() + 10000, 123, "password".getBytes(),
- super.DUMMY_RM_IDENTIFIER);
- startRequest2.setContainerToken(containerToken2);
+ startRequest2.setContainerToken(createContainerToken(cId2,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
boolean noException = true;
try {
containerManager.startContainer(startRequest2);
@@ -553,4 +568,20 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
// Verify that startContainer get no YarnException
Assert.assertTrue(noException);
}
+
+ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+ NodeId nodeId, String user,
+ NMContainerTokenSecretManager containerTokenSecretManager)
+ throws IOException {
+ Resource r = BuilderUtils.newResource(1024, 1);
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
+ System.currentTimeMillis() + 100000L, 123, rmIdentifier);
+ Token containerToken =
+ BuilderUtils
+ .newContainerToken(nodeId, containerTokenSecretManager
+ .retrievePassword(containerTokenIdentifier),
+ containerTokenIdentifier);
+ return containerToken;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
index 35630e6..aec5286 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -56,6 +57,7 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
@@ -228,14 +230,9 @@ public void testContainerEnvVariables() throws Exception {
// set up the rest of the container
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(1024, 1);
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 1234,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(createContainerToken(cId));
containerManager.startContainer(startRequest);
int timeoutSecs = 0;
@@ -366,12 +363,9 @@ public void testDelayedKill() throws Exception {
// set up the rest of the container
List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(1024, 1);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
- port, user, r, System.currentTimeMillis() + 10000L, 123,
- "password".getBytes(), super.DUMMY_RM_IDENTIFIER);
- StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+ Token containerToken = createContainerToken(cId);
+ StartContainerRequest startRequest =
+ recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
startRequest.setContainerToken(containerToken);
containerManager.startContainer(startRequest);
@@ -429,4 +423,17 @@ public void testDelayedKill() throws Exception {
}
}
+ protected Token createContainerToken(ContainerId cId) throws InvalidToken {
+ Resource r = BuilderUtils.newResource(1024, 1);
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(cId, context.getNodeId().toString(), user,
+ r, System.currentTimeMillis() + 10000L, 123, DUMMY_RM_IDENTIFIER);
+ Token containerToken =
+ BuilderUtils.newContainerToken(
+ context.getNodeId(),
+ context.getContainerTokenSecretManager().retrievePassword(
+ containerTokenIdentifier), containerTokenIdentifier);
+ return containerToken;
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index dd47703..39aa8d6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -23,9 +23,9 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -74,8 +74,6 @@
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -94,6 +92,7 @@
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
@@ -810,15 +809,12 @@ public void testLogAggregationForRealContainerLaunch() throws IOException,
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
- Resource r = BuilderUtils.newResource(100 * 1024 * 1024, 1);
- Token containerToken =
- BuilderUtils.newContainerToken(cId, "127.0.0.1", 1234, user, r,
- System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
- super.DUMMY_RM_IDENTIFIER);
StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
+ startRequest.setContainerToken(TestContainerManager.createContainerToken(
+ cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
this.containerManager.startContainer(startRequest);
BaseContainerManagerTest.waitForContainerState(this.containerManager,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/core-site.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/core-site.xml
new file mode 100644
index 0000000..f0d3085
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/core-site.xml
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+
+
+ hadoop.security.token.service.use_ip
+ false
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index 4ec82e4..4f031c5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -53,7 +53,6 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -135,13 +134,14 @@ protected ContainerManager getContainerMgrProxy(
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
UserGroupInformation currentUser = UserGroupInformation
- .createRemoteUser(containerId.toString());
- if (UserGroupInformation.isSecurityEnabled()) {
- Token token =
- ProtoUtils.convertFromProtoFormat(masterContainer
- .getContainerToken(), containerManagerBindAddress);
- currentUser.addToken(token);
- }
+ .createRemoteUser(containerId.getApplicationAttemptId().toString());
+ String user = rmContext.getRMApps().get(
+ containerId.getApplicationAttemptId().getApplicationId()).getUser();
+ org.apache.hadoop.yarn.api.records.Token token = rmContext.
+ getNMTokenSecretManager().createNMToken(
+ containerId.getApplicationAttemptId(), node, user);
+ currentUser.addToken(ProtoUtils.convertFromProtoFormat(token,
+ containerManagerBindAddress));
return currentUser.doAs(new PrivilegedAction() {
@Override
public ContainerManager run() {
@@ -246,7 +246,13 @@ public void run() {
} catch(IOException ie) {
LOG.info("Error cleaning master ", ie);
} catch (YarnException e) {
- LOG.info("Error cleaning master ", e);
+ StringBuilder sb = new StringBuilder("Container ");
+ sb.append(masterContainer.getId().toString());
+ sb.append(" is not handled by this NodeManager");
+ if (!e.getMessage().contains(sb.toString())) {
+ // Ignoring if container is already killed by Node Manager.
+ LOG.info("Error cleaning master ", e);
+ }
}
break;
default:
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
index a1ecddb..3dfd767 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
@@ -189,7 +189,8 @@ public void run() {
for (Container container : containers) {
if (!nodeSet.contains(container.getNodeId())) {
LOG.debug("Sending NMToken for nodeId : "
- + container.getNodeId().toString());
+ + container.getNodeId().toString()
+ + " for application attempt : " + appAttemptId.toString());
Token token = createNMToken(appAttemptId, container.getNodeId(),
applicationSubmitter);
NMToken nmToken = new NMTokenPBImpl();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index c9dd8a2..623ee52 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -59,9 +59,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@@ -310,6 +308,7 @@ protected ResourceTrackerService createResourceTrackerService() {
Configuration conf = new Configuration();
containerTokenSecretManager.rollMasterKey();
+ nmTokenSecretManager.rollMasterKey();
return new ResourceTrackerService(getRMContext(), nodesListManager,
this.nmLivelinessMonitor, containerTokenSecretManager,
nmTokenSecretManager) {