diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java index cbb8e72..946d9c6 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; @@ -62,6 +61,4 @@ Set getBlacklistedNodes(); ClientToAMTokenSecretManager getClientToAMTokenSecretManager(); - - Map getNMTokens(); } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 71d5b52..8abd58d 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -886,8 +886,6 @@ protected void serviceStop() throws Exception { private final Configuration conf; private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; - private final ConcurrentHashMap nmTokens = - new ConcurrentHashMap(); public RunningAppContext(Configuration config) { this.conf = config; @@ -954,11 +952,6 @@ public ClusterInfo getClusterInfo() { public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { return clientToAMTokenSecretManager; } - - @Override - public Map getNMTokens() { - return this.nmTokens; - } } @SuppressWarnings("unchecked") diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 9417181..ba0923b 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -242,8 +242,7 @@ protected void serviceInit(Configuration conf) throws Exception { MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); super.serviceInit(conf); - cmProxy = - new ContainerManagementProtocolProxy(conf, context.getNMTokens()); + cmProxy = new ContainerManagementProtocolProxy(conf); } protected void serviceStart() throws Exception { diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 1263582..74f7290 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.client.api.impl.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -588,7 +589,7 @@ public void rampDownReduces(int rampDown) { // Setting NMTokens if (response.getNMTokens() != null) { for (NMToken nmToken : response.getNMTokens()) { - getContext().getNMTokens().put(nmToken.getNodeId().toString(), + NMTokenCache.setNMToken(nmToken.getNodeId().toString(), nmToken.getToken()); } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java index 521e282..4b07236 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java @@ -26,10 +26,9 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; import com.google.common.collect.Maps; @@ -131,10 +130,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { // Not implemented return null; } - - @Override - public Map getNMTokens() { - // Not Implemented - return null; - } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 1742d90..762dd57 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -862,11 +862,5 @@ public ClusterInfo getClusterInfo() { public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { return null; } - - @Override - public Map getNMTokens() { - // Not Implemented - return null; - } } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index 6ea0f67e..60cdf4c 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -358,7 +358,7 @@ public ContainerManagementProtocolProxyData getCMProxy( NMTokenIdentifier.KIND.toString(), "password".getBytes(), "NMToken"); ContainerManagementProtocolProxy cmProxy = - new ContainerManagementProtocolProxy(conf, context.getNMTokens()); + new ContainerManagementProtocolProxy(conf); InetSocketAddress addr = NetUtils.getConnectAddress(server); ContainerManagementProtocolProxyData proxy = cmProxy.new ContainerManagementProtocolProxyData( diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 8605bb4..2c1f3a2 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -44,7 +44,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -316,10 +315,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { // Not implemented. return null; } - - @Override - public Map getNMTokens() { - // Not Implemented. - return null; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 638ce13..012af3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -448,8 +448,7 @@ public boolean run() throws YarnException, IOException { resourceManager.start(); containerListener = new NMCallbackHandler(); - nmClientAsync = - new NMClientAsyncImpl(containerListener, resourceManager.getNMTokens()); + nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index f1890a8..bd0f16b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -35,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -249,14 +247,4 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu Priority priority, String resourceName, Resource capability); - - /** - * It returns the NMToken received on allocate call. It will not communicate - * with RM to get NMTokens. On allocate call whenever we receive new token - * along with container AMRMClient will cache this NMToken per node manager. - * This map returned should be shared with any application which is - * communicating with NodeManager (ex. NMClient) using NMTokens. If a new - * NMToken is received for the same node manager then it will be replaced. - */ - public abstract ConcurrentMap getNMTokens(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 00e513d..57e7db5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -22,21 +22,17 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -46,30 +42,19 @@ /** * Create a new instance of NMClient. - * @param nmTokens need to pass map of NMTokens which are received on - * {@link AMRMClient#allocate(float)} call as a part of - * {@link AllocateResponse}. - * key :- NodeAddr (host:port) - * Value :- Token {@link NMToken#getToken()} */ @Public - public static NMClient createNMClient(ConcurrentMap nmTokens) { - NMClient client = new NMClientImpl(nmTokens); + public static NMClient createNMClient() { + NMClient client = new NMClientImpl(); return client; } /** * Create a new instance of NMClient. - * @param nmTokens need to pass map of NMTokens which are received on - * {@link AMRMClient#allocate(float)} call as a part of - * {@link AllocateResponse}. - * key :- NodeAddr (host:port) - * Value :- Token {@link NMToken#getToken()} */ @Public - public static NMClient createNMClient(String name, - ConcurrentMap nmTokens) { - NMClient client = new NMClientImpl(name, nmTokens); + public static NMClient createNMClient(String name) { + NMClient client = new NMClientImpl(name); return client; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 8d551dc..ae781b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; @@ -198,17 +196,6 @@ public abstract void unregisterApplicationMaster( */ public abstract int getClusterNodeCount(); - /** - * It returns the NMToken received on allocate call. It will not communicate - * with RM to get NMTokens. On allocate call whenever we receive new token - * along with new container AMRMClientAsync will cache this NMToken per node - * manager. This map returned should be shared with any application which is - * communicating with NodeManager (ex. NMClient / NMClientAsync) using - * NMTokens. If a new NMToken is received for the same node manager - * then it will be replaced. - */ - public abstract ConcurrentMap getNMTokens(); - public interface CallbackHandler { /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index 507f8d9..5cb504d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -112,18 +112,16 @@ protected CallbackHandler callbackHandler; public static NMClientAsync createNMClientAsync( - CallbackHandler callbackHandler, ConcurrentMap nmTokens) { - return new NMClientAsyncImpl(callbackHandler, nmTokens); + CallbackHandler callbackHandler) { + return new NMClientAsyncImpl(callbackHandler); } - protected NMClientAsync(CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this (NMClientAsync.class.getName(), callbackHandler, nmTokens); + protected NMClientAsync(CallbackHandler callbackHandler) { + this (NMClientAsync.class.getName(), callbackHandler); } - protected NMClientAsync(String name, CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this (name, new NMClientImpl(nmTokens), callbackHandler); + protected NMClientAsync(String name, CallbackHandler callbackHandler) { + this (name, new NMClientImpl(), callbackHandler); } @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index e667e37..cc3969d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; @@ -40,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -215,19 +213,6 @@ public Resource getAvailableResources() { public int getClusterNodeCount() { return client.getClusterNodeCount(); } - - /** - * It returns the NMToken received on allocate call. It will not communicate - * with RM to get NMTokens. On allocate call whenever we receive new token - * along with new container AMRMClientAsync will cache this NMToken per node - * manager. This map returned should be shared with any application which is - * communicating with NodeManager (ex. NMClient / NMClientAsync) using - * NMTokens. If a new NMToken is received for the same node manager - * then it will be replaced. - */ - public ConcurrentMap getNMTokens() { - return client.getNMTokens(); - } private class HeartbeatThread extends Thread { public HeartbeatThread() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 7f7df1a..700a509 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -82,14 +82,12 @@ protected ConcurrentMap containers = new ConcurrentHashMap(); - public NMClientAsyncImpl(CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this(NMClientAsync.class.getName(), callbackHandler, nmTokens); + public NMClientAsyncImpl(CallbackHandler callbackHandler) { + this(NMClientAsync.class.getName(), callbackHandler); } - public NMClientAsyncImpl(String name, CallbackHandler callbackHandler, - ConcurrentMap nmTokens) { - this(name, new NMClientImpl(nmTokens), callbackHandler); + public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) { + this(name, new NMClientImpl(), callbackHandler); } @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 74c86b9..acb65aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -34,7 +34,6 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -82,7 +80,6 @@ RecordFactoryProvider.getRecordFactory(null); private int lastResponseId = 0; - private ConcurrentHashMap nmTokens; protected ApplicationMasterProtocol rmClient; protected final ApplicationAttemptId appAttemptId; @@ -158,7 +155,6 @@ static boolean canFit(Resource arg0, Resource arg1) { public AMRMClientImpl(ApplicationAttemptId appAttemptId) { super(AMRMClientImpl.class.getName()); this.appAttemptId = appAttemptId; - this.nmTokens = new ConcurrentHashMap(); } @Override @@ -285,12 +281,12 @@ public AllocateResponse allocate(float progressIndicator) protected void populateNMTokens(AllocateResponse allocateResponse) { for (NMToken token : allocateResponse.getNMTokens()) { String nodeId = token.getNodeId().toString(); - if (nmTokens.containsKey(nodeId)) { + if (NMTokenCache.containsNMToken(nodeId)) { LOG.debug("Replacing token for : " + nodeId); } else { LOG.debug("Received new token for : " + nodeId); } - nmTokens.put(nodeId, token.getToken()); + NMTokenCache.setNMToken(nodeId, token.getToken()); } } @@ -577,9 +573,4 @@ private void decResourceRequest(Priority priority, + " #asks=" + ask.size()); } } - - @Override - public ConcurrentHashMap getNMTokens() { - return nmTokens; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java index a22e200..987248b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,13 +53,10 @@ private final int maxConnectedNMs; private final LinkedHashMap cmProxy; - private Map nmTokens; private final Configuration conf; private final YarnRPC rpc; - public ContainerManagementProtocolProxy(Configuration conf, - Map nmTokens) { - this.nmTokens = nmTokens; + public ContainerManagementProtocolProxy(Configuration conf) { this.conf = conf; maxConnectedNMs = @@ -86,10 +82,15 @@ public synchronized ContainerManagementProtocolProxyData getProxy( // This get call will update the map which is working as LRU cache. ContainerManagementProtocolProxyData proxy = cmProxy.get(containerManagerBindAddr); - + Token token = NMTokenCache.getNMToken(containerManagerBindAddr); + + if (token == null) { + throw new InvalidToken("No NMToken sent for " + + containerManagerBindAddr); + } + while (proxy != null - && !proxy.token.getIdentifier().equals( - nmTokens.get(containerManagerBindAddr).getIdentifier())) { + && !proxy.token.getIdentifier().equals(token.getIdentifier())) { LOG.info("Refreshing proxy as NMToken got updated for node : " + containerManagerBindAddr); // Token is updated. check if anyone has already tried closing it. @@ -112,7 +113,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy( if (proxy == null) { proxy = new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, - containerId, nmTokens.get(containerManagerBindAddr)); + containerId, token); if (cmProxy.size() > maxConnectedNMs) { // Number of existing proxy exceed the limit. String cmAddr = cmProxy.keySet().iterator().next(); @@ -172,10 +173,6 @@ public synchronized void stopAllProxies() { cmProxy.clear(); } - public synchronized void setNMTokens(Map nmTokens) { - this.nmTokens = nmTokens; - } - public class ContainerManagementProtocolProxyData { private final String containerManagerBindAddr; private final ContainerManagementProtocol proxy; @@ -201,10 +198,6 @@ public ContainerManagementProtocolProxyData(YarnRPC rpc, protected ContainerManagementProtocol newProxy(final YarnRPC rpc, String containerManagerBindAddr, ContainerId containerId, Token token) throws InvalidToken { - if (token == null) { - throw new InvalidToken("No NMToken sent for " - + containerManagerBindAddr); - } final InetSocketAddress cmAddr = NetUtils.createSocketAddr(containerManagerBindAddr); LOG.info("Opening proxy : " + containerManagerBindAddr); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index 02cfbfb..54a73fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -81,18 +81,15 @@ new ConcurrentHashMap(); //enabled by default - private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); - private ContainerManagementProtocolProxy cmProxy; - private ConcurrentMap nmTokens; + private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); + private ContainerManagementProtocolProxy cmProxy; - public NMClientImpl(ConcurrentMap nmTokens) { + public NMClientImpl() { super(NMClientImpl.class.getName()); - this.nmTokens = nmTokens; } - public NMClientImpl(String name, ConcurrentMap nmTokens) { + public NMClientImpl(String name) { super(name); - this.nmTokens = nmTokens; } @Override @@ -126,8 +123,7 @@ protected synchronized void cleanupRunningContainers() { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); - cmProxy = - new ContainerManagementProtocolProxy(conf, nmTokens); + cmProxy = new ContainerManagementProtocolProxy(conf); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMTokenCache.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMTokenCache.java new file mode 100644 index 0000000..8282a83 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMTokenCache.java @@ -0,0 +1,106 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.Token; + +import com.google.common.annotations.VisibleForTesting; + +/** + * It manages NMTokens required for communicating with Node manager. Its a + * static token cache. + */ +public class NMTokenCache { + private static NMTokenCache instance; + private HashMap nmTokens; + + + static { + instance = new NMTokenCache(); + } + + private NMTokenCache() { + nmTokens = new HashMap(); + } + + /** + * Returns NMToken, null if absent + * @param nodeAddr + * @return {@link Token} NMToken required for communicating with node + * manager + */ + public static synchronized Token getNMToken(String nodeAddr) { + return instance.nmTokens.get(nodeAddr); + } + + public static synchronized void setNMToken(String nodeAddr, Token token) { + instance.nmTokens.put(nodeAddr, token); + } + + /** + * Returns true if NMToken is present in cache. + */ + public static synchronized boolean containsNMToken(String nodeAddr) { + return instance.nmTokens.containsKey(nodeAddr); + } + + /** + * Returns the number of NMTokens present in cache. + */ + @Private + @VisibleForTesting + public static synchronized int numberOfNMTokensInCache() { + return instance.nmTokens.size(); + } + + /** + * Removes NMToken for specified node manger + * @param nodeAddr node address (host:port) + */ + public static synchronized void removeNMToken(String nodeAddr) { + instance.nmTokens.remove(nodeAddr); + } + + /** + * It returns all NMTokens present in cache. + */ + public static synchronized Map getAllNMTokens() { + Map nmTokens = new HashMap(); + for (String key : instance.nmTokens.keySet()) { + nmTokens.put(key, instance.nmTokens.get(key)); + } + return nmTokens; + } + + /** + * It will remove all the nm tokens from its cache + */ + public static synchronized void clearCache() { + instance.nmTokens.clear(); + } + + @Override + protected Object clone() throws CloneNotSupportedException { + throw new CloneNotSupportedException("Its a singleton!!"); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 9398f98..9affeb6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -26,11 +26,9 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import junit.framework.Assert; @@ -51,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -58,10 +57,9 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest; -import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -494,8 +492,8 @@ private void testAllocation(final AMRMClientImpl amClient) int iterationsLeft = 2; Set releases = new TreeSet(); - ConcurrentHashMap nmTokens = amClient.getNMTokens(); - Assert.assertEquals(0, nmTokens.size()); + NMTokenCache.clearCache(); + Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache()); HashMap receivedNMTokens = new HashMap(); while (allocatedContainerCount < containersRequestedAny @@ -511,19 +509,14 @@ private void testAllocation(final AMRMClientImpl amClient) releases.add(rejectContainerId); amClient.releaseAssignedContainer(rejectContainerId); } - Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size()); - Iterator nodeI = nmTokens.keySet().iterator(); - while (nodeI.hasNext()) { - String nodeId = nodeI.next(); - if (!receivedNMTokens.containsKey(nodeId)) { - receivedNMTokens.put(nodeId, nmTokens.get(nodeId)); - } else { - Assert.fail("Received token again for : " + nodeId); + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + if (receivedNMTokens.containsKey(nodeID)) { + Assert.fail("Received token again for : " + nodeID); } - } - nodeI = receivedNMTokens.keySet().iterator(); - while (nodeI.hasNext()) { - nmTokens.remove(nodeI.next()); + NMTokenCache.removeNMToken(nodeID); + receivedNMTokens.put(nodeID, token.getToken()); } if(allocatedContainerCount < containersRequestedAny) { @@ -532,7 +525,7 @@ private void testAllocation(final AMRMClientImpl amClient) } } - Assert.assertEquals(0, amClient.getNMTokens().size()); + Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache()); // Should receive atleast 1 token Assert.assertTrue(receivedNMTokens.size() > 0 && receivedNMTokens.size() <= nodeCount); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 309b5af..f2a304b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -78,7 +78,6 @@ List nodeReports = null; ApplicationAttemptId attemptId = null; int nodeCount = 3; - ConcurrentHashMap nmTokens; @Before public void setup() throws YarnException, IOException { @@ -144,7 +143,6 @@ public void setup() throws YarnException, IOException { if (iterationsLeft == 0) { fail("Application hasn't bee started"); } - nmTokens = new ConcurrentHashMap(); // start am rm client rmClient = @@ -156,7 +154,7 @@ public void setup() throws YarnException, IOException { assertEquals(STATE.STARTED, rmClient.getServiceState()); // start am nm client - nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens); + nmClient = (NMClientImpl) NMClient.createNMClient(); nmClient.init(conf); nmClient.start(); assertNotNull(nmClient); @@ -249,7 +247,8 @@ public void testNMClient() } if (!allocResponse.getNMTokens().isEmpty()) { for (NMToken token : allocResponse.getNMTokens()) { - nmTokens.put(token.getNodeId().toString(), token.getToken()); + NMTokenCache.setNMToken(token.getNodeId().toString(), + token.getToken()); } } if(allocatedContainerCount < containersRequestedAny) {