diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java index cbb8e72..946d9c6 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; @@ -62,6 +61,4 @@ Set getBlacklistedNodes(); ClientToAMTokenSecretManager getClientToAMTokenSecretManager(); - - Map getNMTokens(); } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 71d5b52..8abd58d 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -886,8 +886,6 @@ protected void serviceStop() throws Exception { private final Configuration conf; private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; - private final ConcurrentHashMap nmTokens = - new ConcurrentHashMap(); public RunningAppContext(Configuration config) { this.conf = config; @@ -954,11 +952,6 @@ public ClusterInfo getClusterInfo() { public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { return clientToAMTokenSecretManager; } - - @Override - public Map getNMTokens() { - return this.nmTokens; - } } @SuppressWarnings("unchecked") diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index b2732c6..28508a9 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -235,8 +235,7 @@ protected void serviceInit(Configuration conf) throws Exception { MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); super.serviceInit(conf); - cmProxy = - new ContainerManagementProtocolProxy(conf, context.getNMTokens()); + cmProxy = new ContainerManagementProtocolProxy(conf); } protected void serviceStart() throws Exception { diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 1263582..dc134eb 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -588,7 +589,7 @@ public void rampDownReduces(int rampDown) { // Setting NMTokens if (response.getNMTokens() != null) { for (NMToken nmToken : response.getNMTokens()) { - getContext().getNMTokens().put(nmToken.getNodeId().toString(), + NMTokenCache.setNMToken(nmToken.getNodeId().toString(), nmToken.getToken()); } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java index 521e282..4b07236 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java @@ -26,10 +26,9 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; import com.google.common.collect.Maps; @@ -131,10 +130,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { // Not implemented return null; } - - @Override - public Map getNMTokens() { - // Not Implemented - return null; - } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 1742d90..762dd57 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -862,11 +862,5 @@ public ClusterInfo getClusterInfo() { public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { return null; } - - @Override - public Map getNMTokens() { - // Not Implemented - return null; - } } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index 0033490..563c31b 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -376,7 +376,7 @@ public ContainerManagementProtocolProxyData getCMProxy( containerId.getApplicationAttemptId(), NodeId.newInstance(addr.getHostName(), addr.getPort()), "user"); ContainerManagementProtocolProxy cmProxy = - new ContainerManagementProtocolProxy(conf, context.getNMTokens()); + new ContainerManagementProtocolProxy(conf); ContainerManagementProtocolProxyData proxy = cmProxy.new ContainerManagementProtocolProxyData( YarnRPC.create(conf), containerManagerBindAddr, containerId, diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 8605bb4..2c1f3a2 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -44,7 +44,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -316,10 +315,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { // Not implemented. return null; } - - @Override - public Map getNMTokens() { - // Not Implemented. - return null; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 0517486..9c817b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -36,8 +36,9 @@ * *

The response contains critical details such as: *

    - *
  • Minimum capability for allocated resources in the cluster.
  • *
  • Maximum capability for allocated resources in the cluster.
  • + *
  • ApplicationACLs for the application.
  • + *
  • ClientToAMToken master key.
  • *
*

* @@ -50,11 +51,12 @@ @Unstable public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, - Map acls) { + Map acls, ByteBuffer key) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); response.setApplicationACLs(acls); + response.setClientToAMTokenMasterKey(key); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index cb8d04b..5cff2ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -44,12 +44,10 @@ *
  • HTTP uri of the node.
  • *
  • {@link Resource} allocated to the container.
  • *
  • {@link Priority} at which the container was allocated.
  • - *
  • {@link ContainerState} of the container.
  • *
  • * Container {@link Token} of the container, used to securely verify * authenticity of the allocation. *
  • - *
  • {@link ContainerStatus} of the container.
  • * *

    * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 638ce13..012af3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -448,8 +448,7 @@ public boolean run() throws YarnException, IOException { resourceManager.start(); containerListener = new NMCallbackHandler(); - nmClientAsync = - new NMClientAsyncImpl(containerListener, resourceManager.getNMTokens()); + nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index df333d2..8b05aa1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -100,7 +100,7 @@ public static void tearDown() throws IOException { } } - @Test(timeout=30000) + @Test(timeout=90000) public void testDSShell() throws Exception { String[] args = { @@ -128,7 +128,7 @@ public void testDSShell() throws Exception { } - @Test(timeout=30000) + @Test(timeout=90000) public void testDSShellWithInvalidArgs() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index f1890a8..bd0f16b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -35,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -249,14 +247,4 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu Priority priority, String resourceName, Resource capability); - - /** - * It returns the NMToken received on allocate call. It will not communicate - * with RM to get NMTokens. On allocate call whenever we receive new token - * along with container AMRMClient will cache this NMToken per node manager. - * This map returned should be shared with any application which is - * communicating with NodeManager (ex. NMClient) using NMTokens. If a new - * NMToken is received for the same node manager then it will be replaced. - */ - public abstract ConcurrentMap getNMTokens(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 00e513d..57e7db5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -22,21 +22,17 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -46,30 +42,19 @@ /** * Create a new instance of NMClient. - * @param nmTokens need to pass map of NMTokens which are received on - * {@link AMRMClient#allocate(float)} call as a part of - * {@link AllocateResponse}. - * key :- NodeAddr (host:port) - * Value :- Token {@link NMToken#getToken()} */ @Public - public static NMClient createNMClient(ConcurrentMap nmTokens) { - NMClient client = new NMClientImpl(nmTokens); + public static NMClient createNMClient() { + NMClient client = new NMClientImpl(); return client; } /** * Create a new instance of NMClient. - * @param nmTokens need to pass map of NMTokens which are received on - * {@link AMRMClient#allocate(float)} call as a part of - * {@link AllocateResponse}. - * key :- NodeAddr (host:port) - * Value :- Token {@link NMToken#getToken()} */ @Public - public static NMClient createNMClient(String name, - ConcurrentMap nmTokens) { - NMClient client = new NMClientImpl(name, nmTokens); + public static NMClient createNMClient(String name) { + NMClient client = new NMClientImpl(name); return client; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java new file mode 100644 index 0000000..433d546 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java @@ -0,0 +1,100 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client.api; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.Token; + +/** + * It manages NMTokens required for communicating with Node manager. Its a + * static token cache. + */ +@Public +@Evolving +public class NMTokenCache { + private static ConcurrentHashMap nmTokens; + + + static { + nmTokens = new ConcurrentHashMap(); + } + + /** + * Returns NMToken, null if absent + * @param nodeAddr + * @return {@link Token} NMToken required for communicating with node + * manager + */ + @Public + @Evolving + public static Token getNMToken(String nodeAddr) { + return nmTokens.get(nodeAddr); + } + + /** + * Sets the NMToken for node address + * @param nodeAddr node address (host:port) + * @param token NMToken + */ + @Public + @Evolving + public static void setNMToken(String nodeAddr, Token token) { + nmTokens.put(nodeAddr, token); + } + + /** + * Returns true if NMToken is present in cache. + */ + @Public + @Evolving + public static boolean containsNMToken(String nodeAddr) { + return nmTokens.containsKey(nodeAddr); + } + + /** + * Returns the number of NMTokens present in cache. + */ + @Public + @Evolving + public static int numberOfNMTokensInCache() { + return nmTokens.size(); + } + + /** + * Removes NMToken for specified node manager + * @param nodeAddr node address (host:port) + */ + @Public + @Evolving + public static void removeNMToken(String nodeAddr) { + nmTokens.remove(nodeAddr); + } + + /** + * It will remove all the nm tokens from its cache + */ + @Public + @Evolving + public static void clearCache() { + nmTokens.clear(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 8d551dc..ae781b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; @@ -198,17 +196,6 @@ public abstract void unregisterApplicationMaster( */ public abstract int getClusterNodeCount(); - /** - * It returns the NMToken received on allocate call. It will not communicate - * with RM to get NMTokens. On allocate call whenever we receive new token - * along with new container AMRMClientAsync will cache this NMToken per node - * manager. This map returned should be shared with any application which is - * communicating with NodeManager (ex. NMClient / NMClientAsync) using - * NMTokens. If a new NMToken is received for the same node manager - * then it will be replaced. - */ - public abstract ConcurrentMap getNMTokens(); - public interface CallbackHandler { /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index 507f8d9..5cb504d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -112,18 +112,16 @@ protected CallbackHandler callbackHandler; public static NMClientAsync createNMClientAsync( - CallbackHandler callbackHandler, ConcurrentMap nmTokens) { - return new NMClientAsyncImpl(callbackHandler, nmTokens); + CallbackHandler callbackHandler) { + return new NMClientAsyncImpl(callbackHandler); } - protected NMClientAsync(CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this (NMClientAsync.class.getName(), callbackHandler, nmTokens); + protected NMClientAsync(CallbackHandler callbackHandler) { + this (NMClientAsync.class.getName(), callbackHandler); } - protected NMClientAsync(String name, CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this (name, new NMClientImpl(nmTokens), callbackHandler); + protected NMClientAsync(String name, CallbackHandler callbackHandler) { + this (name, new NMClientImpl(), callbackHandler); } @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index e667e37..cc3969d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; @@ -40,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -215,19 +213,6 @@ public Resource getAvailableResources() { public int getClusterNodeCount() { return client.getClusterNodeCount(); } - - /** - * It returns the NMToken received on allocate call. It will not communicate - * with RM to get NMTokens. On allocate call whenever we receive new token - * along with new container AMRMClientAsync will cache this NMToken per node - * manager. This map returned should be shared with any application which is - * communicating with NodeManager (ex. NMClient / NMClientAsync) using - * NMTokens. If a new NMToken is received for the same node manager - * then it will be replaced. - */ - public ConcurrentMap getNMTokens() { - return client.getNMTokens(); - } private class HeartbeatThread extends Thread { public HeartbeatThread() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 7f7df1a..700a509 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -82,14 +82,12 @@ protected ConcurrentMap containers = new ConcurrentHashMap(); - public NMClientAsyncImpl(CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this(NMClientAsync.class.getName(), callbackHandler, nmTokens); + public NMClientAsyncImpl(CallbackHandler callbackHandler) { + this(NMClientAsync.class.getName(), callbackHandler); } - public NMClientAsyncImpl(String name, CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this(name, new NMClientImpl(nmTokens), callbackHandler); + public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) { + this(name, new NMClientImpl(), callbackHandler); } @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 74c86b9..68cc287 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -34,7 +34,6 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,8 +55,8 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -82,7 +81,6 @@ RecordFactoryProvider.getRecordFactory(null); private int lastResponseId = 0; - private ConcurrentHashMap nmTokens; protected ApplicationMasterProtocol rmClient; protected final ApplicationAttemptId appAttemptId; @@ -158,7 +156,6 @@ static boolean canFit(Resource arg0, Resource arg1) { public AMRMClientImpl(ApplicationAttemptId appAttemptId) { super(AMRMClientImpl.class.getName()); this.appAttemptId = appAttemptId; - this.nmTokens = new ConcurrentHashMap(); } @Override @@ -285,12 +282,12 @@ public AllocateResponse allocate(float progressIndicator) protected void populateNMTokens(AllocateResponse allocateResponse) { for (NMToken token : allocateResponse.getNMTokens()) { String nodeId = token.getNodeId().toString(); - if (nmTokens.containsKey(nodeId)) { + if (NMTokenCache.containsNMToken(nodeId)) { LOG.debug("Replacing token for : " + nodeId); } else { LOG.debug("Received new token for : " + nodeId); } - nmTokens.put(nodeId, token.getToken()); + NMTokenCache.setNMToken(nodeId, token.getToken()); } } @@ -577,9 +574,4 @@ private void decResourceRequest(Priority priority, + " #asks=" + ask.size()); } } - - @Override - public ConcurrentHashMap getNMTokens() { - return nmTokens; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java index a22e200..4ca44e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -54,13 +54,10 @@ private final int maxConnectedNMs; private final LinkedHashMap cmProxy; - private Map nmTokens; private final Configuration conf; private final YarnRPC rpc; - public ContainerManagementProtocolProxy(Configuration conf, - Map nmTokens) { - this.nmTokens = nmTokens; + public ContainerManagementProtocolProxy(Configuration conf) { this.conf = conf; maxConnectedNMs = @@ -86,10 +83,10 @@ public synchronized ContainerManagementProtocolProxyData getProxy( // This get call will update the map which is working as LRU cache. ContainerManagementProtocolProxyData proxy = cmProxy.get(containerManagerBindAddr); - + while (proxy != null && !proxy.token.getIdentifier().equals( - nmTokens.get(containerManagerBindAddr).getIdentifier())) { + NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) { LOG.info("Refreshing proxy as NMToken got updated for node : " + containerManagerBindAddr); // Token is updated. check if anyone has already tried closing it. @@ -112,7 +109,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy( if (proxy == null) { proxy = new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, - containerId, nmTokens.get(containerManagerBindAddr)); + containerId, NMTokenCache.getNMToken(containerManagerBindAddr)); if (cmProxy.size() > maxConnectedNMs) { // Number of existing proxy exceed the limit. String cmAddr = cmProxy.keySet().iterator().next(); @@ -172,10 +169,6 @@ public synchronized void stopAllProxies() { cmProxy.clear(); } - public synchronized void setNMTokens(Map nmTokens) { - this.nmTokens = nmTokens; - } - public class ContainerManagementProtocolProxyData { private final String containerManagerBindAddr; private final ContainerManagementProtocol proxy; @@ -201,10 +194,12 @@ public ContainerManagementProtocolProxyData(YarnRPC rpc, protected ContainerManagementProtocol newProxy(final YarnRPC rpc, String containerManagerBindAddr, ContainerId containerId, Token token) throws InvalidToken { + if (token == null) { throw new InvalidToken("No NMToken sent for " + containerManagerBindAddr); } + final InetSocketAddress cmAddr = NetUtils.createSocketAddr(containerManagerBindAddr); LOG.info("Opening proxy : " + containerManagerBindAddr); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index 02cfbfb..54a73fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -81,18 +81,15 @@ new ConcurrentHashMap(); //enabled by default - private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); - private ContainerManagementProtocolProxy cmProxy; - private ConcurrentMap nmTokens; + private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); + private ContainerManagementProtocolProxy cmProxy; - public NMClientImpl(ConcurrentMap nmTokens) { + public NMClientImpl() { super(NMClientImpl.class.getName()); - this.nmTokens = nmTokens; } - public NMClientImpl(String name, ConcurrentMap nmTokens) { + public NMClientImpl(String name) { super(name); - this.nmTokens = nmTokens; } @Override @@ -126,8 +123,7 @@ protected synchronized void cleanupRunningContainers() { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); - cmProxy = - new ContainerManagementProtocolProxy(conf, nmTokens); + cmProxy = new ContainerManagementProtocolProxy(conf); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 5955f26..4c034ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -26,11 +26,9 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import junit.framework.Assert; @@ -50,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest; @@ -488,8 +488,8 @@ private void testAllocation(final AMRMClientImpl amClient) int iterationsLeft = 2; Set releases = new TreeSet(); - ConcurrentHashMap nmTokens = amClient.getNMTokens(); - Assert.assertEquals(0, nmTokens.size()); + NMTokenCache.clearCache(); + Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache()); HashMap receivedNMTokens = new HashMap(); while (allocatedContainerCount < containersRequestedAny @@ -505,19 +505,13 @@ private void testAllocation(final AMRMClientImpl amClient) releases.add(rejectContainerId); amClient.releaseAssignedContainer(rejectContainerId); } - Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size()); - Iterator nodeI = nmTokens.keySet().iterator(); - while (nodeI.hasNext()) { - String nodeId = nodeI.next(); - if (!receivedNMTokens.containsKey(nodeId)) { - receivedNMTokens.put(nodeId, nmTokens.get(nodeId)); - } else { - Assert.fail("Received token again for : " + nodeId); + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + if (receivedNMTokens.containsKey(nodeID)) { + Assert.fail("Received token again for : " + nodeID); } - } - nodeI = receivedNMTokens.keySet().iterator(); - while (nodeI.hasNext()) { - nmTokens.remove(nodeI.next()); + receivedNMTokens.put(nodeID, token.getToken()); } if(allocatedContainerCount < containersRequestedAny) { @@ -526,7 +520,6 @@ private void testAllocation(final AMRMClientImpl amClient) } } - Assert.assertEquals(0, amClient.getNMTokens().size()); // Should receive atleast 1 token Assert.assertTrue(receivedNMTokens.size() > 0 && receivedNMTokens.size() <= nodeCount); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 5bcb428..dc6367b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -74,7 +75,6 @@ List nodeReports = null; ApplicationAttemptId attemptId = null; int nodeCount = 3; - ConcurrentHashMap nmTokens; @Before public void setup() throws YarnException, IOException { @@ -136,7 +136,6 @@ public void setup() throws YarnException, IOException { if (iterationsLeft == 0) { fail("Application hasn't bee started"); } - nmTokens = new ConcurrentHashMap(); // start am rm client rmClient = @@ -148,7 +147,7 @@ public void setup() throws YarnException, IOException { assertEquals(STATE.STARTED, rmClient.getServiceState()); // start am nm client - nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens); + nmClient = (NMClientImpl) NMClient.createNMClient(); nmClient.init(conf); nmClient.start(); assertNotNull(nmClient); @@ -173,7 +172,7 @@ private void stopNmClient(boolean stopContainers) { nmClient.stop(); } - @Test (timeout = 60000) + @Test (timeout = 180000) public void testNMClientNoCleanupOnStop() throws YarnException, IOException { @@ -241,7 +240,8 @@ public void testNMClient() } if (!allocResponse.getNMTokens().isEmpty()) { for (NMToken token : allocResponse.getNMTokens()) { - nmTokens.put(token.getNodeId().toString(), token.getToken()); + NMTokenCache.setNMToken(token.getNodeId().toString(), + token.getToken()); } } if(allocatedContainerCount < containersRequestedAny) {