diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index f4913cd..23f4ea1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -59,9 +59,12 @@ return client; } + private NMTokenCache nmTokenCache; + @Private protected AMRMClient(String name) { super(name); + nmTokenCache = NMTokenCache.getSingleton(); } /** @@ -297,4 +300,33 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu */ public abstract void updateBlacklist(List blacklistAdditions, List blacklistRemovals); + + /** + * Set the NM token cache for the AMRMClient. This cache must + * be shared with the {@link NMClient} used to manage containers for the + * AMRMClient + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @param nmTokenCache the NM token cache to use. + */ + public void setNMTokenCache(NMTokenCache nmTokenCache) { + this.nmTokenCache = nmTokenCache; + } + + /** + * Get the NM token cache of the AMRMClient. This cache must be + * shared with the {@link NMClient} used to manage containers for the + * AMRMClient. + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @return the NM token cache. + */ + public NMTokenCache getNMTokenCache() { + return nmTokenCache; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 57e7db5..fbb2baf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -58,6 +58,8 @@ public static NMClient createNMClient(String name) { return client; } + private NMTokenCache nmTokenCache = NMTokenCache.getSingleton(); + @Private protected NMClient(String name) { super(name); @@ -118,4 +120,34 @@ public abstract ContainerStatus getContainerStatus(ContainerId containerId, * @param enabled whether the feature is enabled or not */ public abstract void cleanupRunningContainersOnStop(boolean enabled); + + + /** + * Set the NM Token cache of the NMClient. This cache must be + * shared with the {@link AMRMClient} that requested the containers managed + * by this NMClient + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @param nmTokenCache the NM token cache to use. + */ + public void setNMTokenCache(NMTokenCache nmTokenCache) { + this.nmTokenCache = nmTokenCache; + } + + /** + * Get the NM token cache of the NMClient. This cache must be + * shared with the {@link AMRMClient} that requested the containers managed + * by this NMClient + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @return the NM token cache + */ + public NMTokenCache getNMTokenCache() { + return nmTokenCache; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java index c14a12c..cb2dcac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java @@ -34,70 +34,106 @@ @Public @Evolving public class NMTokenCache { - private static ConcurrentHashMap nmTokens; - + private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache(); + + + /** + * @deprecated Replaced by {@link #getSingleton()}.{@link #getToken(String)} + */ + @Public + @Deprecated + public static Token getNMToken(String nodeAddr) { + return NM_TOKEN_CACHE.getToken(nodeAddr); + } - static { + /** + * @deprecated Replaced by {@link #getSingleton()}.{@link #setToken(String, Token)} + */ + @Public + @Deprecated + public static void setNMToken(String nodeAddr, Token token) { + NM_TOKEN_CACHE.setToken(nodeAddr, token); + } + + /** + * Returns the singleton NM token cache. + * + * @return the singleton NM token cache. + */ + public static NMTokenCache getSingleton() { + return NM_TOKEN_CACHE; + } + + private ConcurrentHashMap nmTokens; + + /** + * Creates a NM token cache instance. + */ + public NMTokenCache() { nmTokens = new ConcurrentHashMap(); } - + /** * Returns NMToken, null if absent + * * @param nodeAddr * @return {@link Token} NMToken required for communicating with node - * manager + * manager */ @Public @Evolving - public static Token getNMToken(String nodeAddr) { + public Token getToken(String nodeAddr) { return nmTokens.get(nodeAddr); } - + /** * Sets the NMToken for node address + * * @param nodeAddr node address (host:port) * @param token NMToken */ @Public @Evolving - public static void setNMToken(String nodeAddr, Token token) { + public void setToken(String nodeAddr, Token token) { nmTokens.put(nodeAddr, token); } - + /** * Returns true if NMToken is present in cache. */ @Private @VisibleForTesting - public static boolean containsNMToken(String nodeAddr) { + public boolean containsToken(String nodeAddr) { return nmTokens.containsKey(nodeAddr); } - + /** * Returns the number of NMTokens present in cache. */ @Private @VisibleForTesting - public static int numberOfNMTokensInCache() { + public int numberOfTokensInCache() { return nmTokens.size(); } - + /** * Removes NMToken for specified node manager + * * @param nodeAddr node address (host:port) */ @Private @VisibleForTesting - public static void removeNMToken(String nodeAddr) { + public void removeToken(String nodeAddr) { nmTokens.remove(nodeAddr); } - + /** * It will remove all the nm tokens from its cache */ @Private @VisibleForTesting - public static void clearCache() { + public void clearCache() { nmTokens.clear(); } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index e726b73..71586c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -119,6 +120,15 @@ protected AMRMClientAsync(AMRMClient client, int intervalMs, this.heartbeatIntervalMs.set(intervalMs); this.handler = callbackHandler; } + + /** + * Returns the NM token cache of the AM. + * + * @return the NM token cache of the AM. + */ + public NMTokenCache getNMTokenCache() { + return client.getNMTokenCache(); + } public void setHeartbeatInterval(int interval) { heartbeatIntervalMs.set(interval); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index 5cb504d..8067478 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -20,7 +20,6 @@ import java.nio.ByteBuffer; import java.util.Map; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -31,8 +30,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -150,6 +149,23 @@ public void setClient(NMClient client) { this.client = client; } + /** + * Set the NM Token cache of the NMClient. This cache must be + * shared with the {@link org.apache.hadoop.yarn.client.api.AMRMClient} that + * requested the containers managed by this NMClientAsync + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @param nmTokenCache the NM token cache to use. + */ + public void setNMTokenCache(NMTokenCache nmTokenCache) { + if (client == null) { + throw new IllegalStateException("NMClient cannot be NULL"); + } + client.setNMTokenCache(nmTokenCache); + } + public CallbackHandler getCallbackHandler() { return callbackHandler; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 3922571..061c50b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; -import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -288,12 +287,12 @@ public AllocateResponse allocate(float progressIndicator) protected void populateNMTokens(AllocateResponse allocateResponse) { for (NMToken token : allocateResponse.getNMTokens()) { String nodeId = token.getNodeId().toString(); - if (NMTokenCache.containsNMToken(nodeId)) { + if (getNMTokenCache().containsToken(nodeId)) { LOG.debug("Replacing token for : " + nodeId); } else { LOG.debug("Received new token for : " + nodeId); } - NMTokenCache.setNMToken(nodeId, token.getToken()); + getNMTokenCache().setToken(nodeId, token.getToken()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java index 4ca44e1..7061119 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java @@ -56,6 +56,7 @@ private final LinkedHashMap cmProxy; private final Configuration conf; private final YarnRPC rpc; + private NMTokenCache nmTokenCache; public ContainerManagementProtocolProxy(Configuration conf) { this.conf = conf; @@ -75,7 +76,11 @@ public ContainerManagementProtocolProxy(Configuration conf) { new LinkedHashMap(); rpc = YarnRPC.create(conf); } - + + public synchronized void setNMTokenCache(NMTokenCache nmTokenCache) { + this.nmTokenCache = nmTokenCache; + } + public synchronized ContainerManagementProtocolProxyData getProxy( String containerManagerBindAddr, ContainerId containerId) throws InvalidToken { @@ -86,7 +91,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy( while (proxy != null && !proxy.token.getIdentifier().equals( - NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) { + nmTokenCache.getToken(containerManagerBindAddr).getIdentifier())) { LOG.info("Refreshing proxy as NMToken got updated for node : " + containerManagerBindAddr); // Token is updated. check if anyone has already tried closing it. @@ -109,7 +114,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy( if (proxy == null) { proxy = new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, - containerId, NMTokenCache.getNMToken(containerManagerBindAddr)); + containerId, nmTokenCache.getToken(containerManagerBindAddr)); if (cmProxy.size() > maxConnectedNMs) { // Number of existing proxy exceed the limit. String cmAddr = cmProxy.keySet().iterator().next(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index b5f0be1..2c896c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -130,7 +130,11 @@ protected synchronized void cleanupRunningContainers() { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + if (getNMTokenCache() == null) { + throw new IllegalStateException("NMTokenCache has not been set"); + } cmProxy = new ContainerManagementProtocolProxy(conf); + cmProxy.setNMTokenCache(getNMTokenCache()); } @Override @@ -188,6 +192,7 @@ private void addStartingContainer(StartedContainer startedContainer) proxy = cmProxy.getProxy(container.getNodeId().toString(), container.getId()); + cmProxy.setNMTokenCache(getNMTokenCache()); StartContainerRequest scRequest = StartContainerRequest.newInstance(containerLaunchContext, container.getContainerToken()); @@ -261,6 +266,7 @@ public ContainerStatus getContainerStatus(ContainerId containerId, containerIds.add(containerId); try { proxy = cmProxy.getProxy(nodeId.toString(), containerId); + cmProxy.setNMTokenCache(getNMTokenCache()); GetContainerStatusesResponse response = proxy.getContainerManagementProtocol().getContainerStatuses( GetContainerStatusesRequest.newInstance(containerIds)); @@ -286,6 +292,7 @@ private void stopContainerInternal(ContainerId containerId, NodeId nodeId) containerIds.add(containerId); try { proxy = cmProxy.getProxy(nodeId.toString(), containerId); + cmProxy.setNMTokenCache(getNMTokenCache()); StopContainersResponse response = proxy.getContainerManagementProtocol().stopContainers( StopContainersRequest.newInstance(containerIds)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 58ef215..1f7565b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -626,6 +626,13 @@ public void testAMRMClient() throws YarnException, IOException { try { // start am rm client amClient = AMRMClient.createAMRMClient(); + + //setting an instance NMTokenCache + amClient.setNMTokenCache(new NMTokenCache()); + //asserting we are not using the singleton instance cache + Assert.assertNotSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + amClient.init(conf); amClient.start(); @@ -681,8 +688,8 @@ private void testAllocation(final AMRMClientImpl amClient) int iterationsLeft = 3; Set releases = new TreeSet(); - NMTokenCache.clearCache(); - Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache()); + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache()); HashMap receivedNMTokens = new HashMap(); while (allocatedContainerCount < containersRequestedAny diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 76e87f5..126dfcb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -78,6 +78,7 @@ List nodeReports = null; ApplicationAttemptId attemptId = null; int nodeCount = 3; + NMTokenCache nmTokenCache = null; @Before public void setup() throws YarnException, IOException { @@ -155,10 +156,16 @@ public void setup() throws YarnException, IOException { .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + //creating an instance NMTokenCase + nmTokenCache = new NMTokenCache(); + // start am rm client rmClient = (AMRMClientImpl) AMRMClient . createAMRMClient(); + + //setting an instance NMTokenCase + rmClient.setNMTokenCache(nmTokenCache); rmClient.init(conf); rmClient.start(); assertNotNull(rmClient); @@ -166,6 +173,9 @@ public void setup() throws YarnException, IOException { // start am nm client nmClient = (NMClientImpl) NMClient.createNMClient(); + + //propagating the AMRMClient NMTokenCache instance + nmClient.setNMTokenCache(rmClient.getNMTokenCache()); nmClient.init(conf); nmClient.start(); assertNotNull(nmClient); @@ -258,7 +268,7 @@ public void testNMClient() } if (!allocResponse.getNMTokens().isEmpty()) { for (NMToken token : allocResponse.getNMTokens()) { - NMTokenCache.setNMToken(token.getNodeId().toString(), + rmClient.getNMTokenCache().setToken(token.getNodeId().toString(), token.getToken()); } }