diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java index 6f036c4..cdc9eb0 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; @@ -66,4 +67,6 @@ boolean hasSuccessfullyUnregistered(); + NMTokenCache getNMTokenCache(); + } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index b60b647..ef5744f 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -126,6 +126,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -908,11 +909,13 @@ protected void serviceStop() throws Exception { private final Configuration conf; private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; + private final NMTokenCache nmTokenCache; public RunningAppContext(Configuration config) { this.conf = config; this.clientToAMTokenSecretManager = new ClientToAMTokenSecretManager(appAttemptID, null); + this.nmTokenCache = new NMTokenCache(); } @Override @@ -992,6 +995,11 @@ public void markSuccessfulUnregistration() { public void computeIsLastAMRetry() { isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; } + + public NMTokenCache getNMTokenCache() { + return nmTokenCache; + } + } @SuppressWarnings("unchecked") diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 666f757..8073aba 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -248,6 +248,7 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); super.serviceInit(conf); cmProxy = new ContainerManagementProtocolProxy(conf); + cmProxy.setNMTokenCache(context.getNMTokenCache()); } protected void serviceStart() throws Exception { diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index f09ac74..c3128d2 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -117,6 +118,10 @@ protected void serviceStart() throws Exception { super.serviceStart(); } + protected NMTokenCache getNMTokenCache() { + return context.getNMTokenCache(); + } + protected AppContext getContext() { return context; } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index d6e4593..035a48a 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -68,7 +68,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -589,7 +588,7 @@ public void rampDownReduces(int rampDown) { // Setting NMTokens if (response.getNMTokens() != null) { for (NMToken nmToken : response.getNMTokens()) { - NMTokenCache.setNMToken(nmToken.getNodeId().toString(), + getNMTokenCache().setNMToken(nmToken.getNodeId().toString(), nmToken.getToken()); } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java index d33e734..2b56419 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; @@ -39,7 +40,8 @@ final Map jobs; final long startTime = System.currentTimeMillis(); Set blacklistedNodes; - + final NMTokenCache nmTokenCache = NMTokenCache.getSingleton(); + public MockAppContext(int appid) { appID = MockJobs.newAppID(appid); appAttemptID = ApplicationAttemptId.newInstance(appID, 0); @@ -142,4 +144,8 @@ public boolean hasSuccessfullyUnregistered() { return true; } + @Override + public NMTokenCache getNMTokenCache() { + return nmTokenCache; + } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index ee0544a..9449fa2 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; @@ -1397,6 +1398,8 @@ public void handle(Event event) { } } }); + NMTokenCache nmTokenCache = new NMTokenCache(); + when(context.getNMTokenCache()).thenReturn(nmTokenCache); return context; } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 3d555f2..cd7fce6 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -789,6 +790,7 @@ public MyAppMaster(Clock clock) { private final ApplicationId myApplicationID; private final JobId myJobID; private final Map allJobs; + private final NMTokenCache nmTokenCache; MyAppContext(int numberMaps, int numberReduces) { myApplicationID = ApplicationId.newInstance(clock.getTime(), 1); @@ -801,6 +803,8 @@ public MyAppMaster(Clock clock) { = new MyJobImpl(myJobID, numberMaps, numberReduces); allJobs = Collections.singletonMap(myJobID, myJob); + + nmTokenCache = new NMTokenCache(); } @Override @@ -874,5 +878,9 @@ public boolean hasSuccessfullyUnregistered() { return true; } + @Override + public NMTokenCache getNMTokenCache() { + return nmTokenCache; + } } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index b7823a0..e75c286 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -45,6 +45,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -394,4 +395,9 @@ public boolean hasSuccessfullyUnregistered() { return true; } + @Override + public NMTokenCache getNMTokenCache() { + // bogus - Not Required + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 3644761..029c2d6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -479,6 +479,7 @@ public boolean run() throws YarnException, IOException { containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); + nmClientAsync.setNMTokenCache(amRMClient.getNMTokenCache()); nmClientAsync.init(conf); nmClientAsync.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index f4913cd..23f4ea1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -59,9 +59,12 @@ return client; } + private NMTokenCache nmTokenCache; + @Private protected AMRMClient(String name) { super(name); + nmTokenCache = NMTokenCache.getSingleton(); } /** @@ -297,4 +300,33 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu */ public abstract void updateBlacklist(List blacklistAdditions, List blacklistRemovals); + + /** + * Set the NM token cache for the AMRMClient. This cache must + * be shared with the {@link NMClient} used to manage containers for the + * AMRMClient + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @param nmTokenCache the NM token cache to use. + */ + public void setNMTokenCache(NMTokenCache nmTokenCache) { + this.nmTokenCache = nmTokenCache; + } + + /** + * Get the NM token cache of the AMRMClient. This cache must be + * shared with the {@link NMClient} used to manage containers for the + * AMRMClient. + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @return the NM token cache. + */ + public NMTokenCache getNMTokenCache() { + return nmTokenCache; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 57e7db5..fbb2baf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -58,6 +58,8 @@ public static NMClient createNMClient(String name) { return client; } + private NMTokenCache nmTokenCache = NMTokenCache.getSingleton(); + @Private protected NMClient(String name) { super(name); @@ -118,4 +120,34 @@ public abstract ContainerStatus getContainerStatus(ContainerId containerId, * @param enabled whether the feature is enabled or not */ public abstract void cleanupRunningContainersOnStop(boolean enabled); + + + /** + * Set the NM Token cache of the NMClient. This cache must be + * shared with the {@link AMRMClient} that requested the containers managed + * by this NMClient + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @param nmTokenCache the NM token cache to use. + */ + public void setNMTokenCache(NMTokenCache nmTokenCache) { + this.nmTokenCache = nmTokenCache; + } + + /** + * Get the NM token cache of the NMClient. This cache must be + * shared with the {@link AMRMClient} that requested the containers managed + * by this NMClient + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @return the NM token cache + */ + public NMTokenCache getNMTokenCache() { + return nmTokenCache; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java index c14a12c..61525fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java @@ -28,16 +28,27 @@ import com.google.common.annotations.VisibleForTesting; /** - * It manages NMTokens required for communicating with Node manager. Its a - * static token cache. + * It manages NMTokens required for communicating with Node manager. */ @Public @Evolving public class NMTokenCache { - private static ConcurrentHashMap nmTokens; - - - static { + private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache(); + + /** + * Returns the singleton NM token cache. + * @return + */ + public static NMTokenCache getSingleton() { + return NM_TOKEN_CACHE; + } + + private ConcurrentHashMap nmTokens; + + /** + * Creates a NM token cache instance. + */ + public NMTokenCache() { nmTokens = new ConcurrentHashMap(); } @@ -49,7 +60,7 @@ */ @Public @Evolving - public static Token getNMToken(String nodeAddr) { + public Token getNMToken(String nodeAddr) { return nmTokens.get(nodeAddr); } @@ -60,7 +71,7 @@ public static Token getNMToken(String nodeAddr) { */ @Public @Evolving - public static void setNMToken(String nodeAddr, Token token) { + public void setNMToken(String nodeAddr, Token token) { nmTokens.put(nodeAddr, token); } @@ -69,7 +80,7 @@ public static void setNMToken(String nodeAddr, Token token) { */ @Private @VisibleForTesting - public static boolean containsNMToken(String nodeAddr) { + public boolean containsNMToken(String nodeAddr) { return nmTokens.containsKey(nodeAddr); } @@ -78,7 +89,7 @@ public static boolean containsNMToken(String nodeAddr) { */ @Private @VisibleForTesting - public static int numberOfNMTokensInCache() { + public int numberOfNMTokensInCache() { return nmTokens.size(); } @@ -88,7 +99,7 @@ public static int numberOfNMTokensInCache() { */ @Private @VisibleForTesting - public static void removeNMToken(String nodeAddr) { + public void removeNMToken(String nodeAddr) { nmTokens.remove(nodeAddr); } @@ -97,7 +108,7 @@ public static void removeNMToken(String nodeAddr) { */ @Private @VisibleForTesting - public static void clearCache() { + public void clearCache() { nmTokens.clear(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index e726b73..71586c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -119,6 +120,15 @@ protected AMRMClientAsync(AMRMClient client, int intervalMs, this.heartbeatIntervalMs.set(intervalMs); this.handler = callbackHandler; } + + /** + * Returns the NM token cache of the AM. + * + * @return the NM token cache of the AM. + */ + public NMTokenCache getNMTokenCache() { + return client.getNMTokenCache(); + } public void setHeartbeatInterval(int interval) { heartbeatIntervalMs.set(interval); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index 5cb504d..8067478 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -20,7 +20,6 @@ import java.nio.ByteBuffer; import java.util.Map; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -31,8 +30,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -150,6 +149,23 @@ public void setClient(NMClient client) { this.client = client; } + /** + * Set the NM Token cache of the NMClient. This cache must be + * shared with the {@link org.apache.hadoop.yarn.client.api.AMRMClient} that + * requested the containers managed by this NMClientAsync + *

+ * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + * singleton instance will be used. + * + * @param nmTokenCache the NM token cache to use. + */ + public void setNMTokenCache(NMTokenCache nmTokenCache) { + if (client == null) { + throw new IllegalStateException("NMClient cannot be NULL"); + } + client.setNMTokenCache(nmTokenCache); + } + public CallbackHandler getCallbackHandler() { return callbackHandler; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 3922571..afcd97d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -288,12 +288,12 @@ public AllocateResponse allocate(float progressIndicator) protected void populateNMTokens(AllocateResponse allocateResponse) { for (NMToken token : allocateResponse.getNMTokens()) { String nodeId = token.getNodeId().toString(); - if (NMTokenCache.containsNMToken(nodeId)) { + if (getNMTokenCache().containsNMToken(nodeId)) { LOG.debug("Replacing token for : " + nodeId); } else { LOG.debug("Received new token for : " + nodeId); } - NMTokenCache.setNMToken(nodeId, token.getToken()); + getNMTokenCache().setNMToken(nodeId, token.getToken()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java index 4ca44e1..295445f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java @@ -56,6 +56,7 @@ private final LinkedHashMap cmProxy; private final Configuration conf; private final YarnRPC rpc; + private NMTokenCache nmTokenCache; public ContainerManagementProtocolProxy(Configuration conf) { this.conf = conf; @@ -75,7 +76,11 @@ public ContainerManagementProtocolProxy(Configuration conf) { new LinkedHashMap(); rpc = YarnRPC.create(conf); } - + + public void setNMTokenCache(NMTokenCache nmTokenCache) { + this.nmTokenCache = nmTokenCache; + } + public synchronized ContainerManagementProtocolProxyData getProxy( String containerManagerBindAddr, ContainerId containerId) throws InvalidToken { @@ -86,7 +91,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy( while (proxy != null && !proxy.token.getIdentifier().equals( - NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) { + nmTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) { LOG.info("Refreshing proxy as NMToken got updated for node : " + containerManagerBindAddr); // Token is updated. check if anyone has already tried closing it. @@ -109,7 +114,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy( if (proxy == null) { proxy = new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, - containerId, NMTokenCache.getNMToken(containerManagerBindAddr)); + containerId, nmTokenCache.getNMToken(containerManagerBindAddr)); if (cmProxy.size() > maxConnectedNMs) { // Number of existing proxy exceed the limit. String cmAddr = cmProxy.keySet().iterator().next(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index b5f0be1..2c896c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -130,7 +130,11 @@ protected synchronized void cleanupRunningContainers() { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + if (getNMTokenCache() == null) { + throw new IllegalStateException("NMTokenCache has not been set"); + } cmProxy = new ContainerManagementProtocolProxy(conf); + cmProxy.setNMTokenCache(getNMTokenCache()); } @Override @@ -188,6 +192,7 @@ private void addStartingContainer(StartedContainer startedContainer) proxy = cmProxy.getProxy(container.getNodeId().toString(), container.getId()); + cmProxy.setNMTokenCache(getNMTokenCache()); StartContainerRequest scRequest = StartContainerRequest.newInstance(containerLaunchContext, container.getContainerToken()); @@ -261,6 +266,7 @@ public ContainerStatus getContainerStatus(ContainerId containerId, containerIds.add(containerId); try { proxy = cmProxy.getProxy(nodeId.toString(), containerId); + cmProxy.setNMTokenCache(getNMTokenCache()); GetContainerStatusesResponse response = proxy.getContainerManagementProtocol().getContainerStatuses( GetContainerStatusesRequest.newInstance(containerIds)); @@ -286,6 +292,7 @@ private void stopContainerInternal(ContainerId containerId, NodeId nodeId) containerIds.add(containerId); try { proxy = cmProxy.getProxy(nodeId.toString(), containerId); + cmProxy.setNMTokenCache(getNMTokenCache()); StopContainersResponse response = proxy.getContainerManagementProtocol().stopContainers( StopContainersRequest.newInstance(containerIds)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 58ef215..b6dde38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -681,8 +681,8 @@ private void testAllocation(final AMRMClientImpl amClient) int iterationsLeft = 3; Set releases = new TreeSet(); - NMTokenCache.clearCache(); - Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache()); + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, amClient.getNMTokenCache().numberOfNMTokensInCache()); HashMap receivedNMTokens = new HashMap(); while (allocatedContainerCount < containersRequestedAny diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 76e87f5..ba86141 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.NMClient; -import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -166,6 +165,7 @@ public void setup() throws YarnException, IOException { // start am nm client nmClient = (NMClientImpl) NMClient.createNMClient(); + nmClient.setNMTokenCache(rmClient.getNMTokenCache()); nmClient.init(conf); nmClient.start(); assertNotNull(nmClient); @@ -258,7 +258,7 @@ public void testNMClient() } if (!allocResponse.getNMTokens().isEmpty()) { for (NMToken token : allocResponse.getNMTokens()) { - NMTokenCache.setNMToken(token.getNodeId().toString(), + rmClient.getNMTokenCache().setNMToken(token.getNodeId().toString(), token.getToken()); } }