diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index 82464cf..38e9089 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -112,6 +112,8 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { this.tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + + setLocalSecretManagerAndServiceAddr(); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue(); renewalTimer = new Timer(true); @@ -134,6 +136,13 @@ protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) { return pool; } + // enable RM to short-circuit token operations directly to itself + private void setLocalSecretManagerAndServiceAddr() { + RMDelegationTokenIdentifier.Renewer.setSecretManager(rmContext + .getRMDelegationTokenSecretManager(), rmContext.getClientRMService() + .getBindAddress()); + } + @Override protected void serviceStart() throws Exception { dtCancelThread.start(); @@ -143,10 +152,8 @@ protected void serviceStart() throws Exception { "DelayedTokenCanceller"); delayedRemovalThread.start(); } - // enable RM to short-circuit token operations directly to itself - RMDelegationTokenIdentifier.Renewer.setSecretManager( - rmContext.getRMDelegationTokenSecretManager(), - rmContext.getClientRMService().getBindAddress()); + + setLocalSecretManagerAndServiceAddr(); serviceStateLock.writeLock().lock(); isServiceStarted = true; serviceStateLock.writeLock().unlock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index ffb6fd9..49eff8b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -394,7 +394,7 @@ public void testRMRestart() throws Exception { Assert.assertEquals(4, rmAppState.size()); } - @Test + @Test (timeout = 60000) public void testRMRestartAppRunningAMFailed() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -440,7 +440,7 @@ public void testRMRestartAppRunningAMFailed() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMRestartWaitForPreviousAMToFinish() throws Exception { // testing 3 cases // After RM restarts @@ -607,7 +607,7 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception { // store but before the RMAppAttempt notifies RMApp that it has succeeded. On // recovery, RMAppAttempt should send the AttemptFinished event to RMApp so // that RMApp can recover its state. - @Test + @Test (timeout = 60000) public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); MemoryRMStateStore memStore = new MemoryRMStateStore() { @@ -660,7 +660,7 @@ public void updateApplicationStateInternal(ApplicationId appId, rmAppState.get(app0.getApplicationId()).getState()); } - @Test + @Test (timeout = 60000) public void testRMRestartFailedApp() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); MemoryRMStateStore memStore = new MemoryRMStateStore(); @@ -709,7 +709,7 @@ public void testRMRestartFailedApp() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMRestartKilledApp() throws Exception{ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -757,7 +757,7 @@ public void testRMRestartKilledApp() throws Exception{ rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMRestartKilledAppWithNoAttempts() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore() { @Override @@ -797,7 +797,7 @@ public synchronized void updateApplicationAttemptStateInternal( Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0); } - @Test + @Test (timeout = 60000) public void testRMRestartSucceededApp() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -849,7 +849,7 @@ public void testRMRestartSucceededApp() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMRestartGetApplicationList() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); MemoryRMStateStore memStore = new MemoryRMStateStore(); @@ -997,7 +997,7 @@ private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, appState.getAttempt(am.getApplicationAttemptId()).getState()); } - @Test + @Test (timeout = 60000) public void testRMRestartOnMaxAppAttempts() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -1071,7 +1071,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testDelegationTokenRestoredInDelegationTokenRenewer() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -1171,7 +1171,7 @@ private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { } } - @Test + @Test (timeout = 60000) public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, @@ -1261,7 +1261,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set( @@ -1414,7 +1414,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // This is to test submit an application to the new RM with the old delegation // token got from previous RM. - @Test + @Test (timeout = 60000) public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -1449,7 +1449,7 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); } - @Test + @Test (timeout = 60000) public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore() { volatile boolean wait = true; @@ -1508,7 +1508,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { Assert.assertTrue(rmAppState.size() == NUM_APPS); } - @Test + @Test (timeout = 60000) public void testFinishedAppRemovalAfterRMRestart() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1); @@ -1580,7 +1580,7 @@ public synchronized void checkVersion() // This is to test Killing application should be able to wait until app // reaches killed state and also check that attempt state is saved before app // state is saved. - @Test + @Test (timeout = 60000) public void testClientRetryOnKillingApplication() throws Exception { MemoryRMStateStore memStore = new TestMemoryRMStateStore(); memStore.init(conf); @@ -1738,7 +1738,7 @@ private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted, appsCompleted + appsCompletedCarryOn); } - @Test + @Test (timeout = 60000) public void testDecomissionedNMsMetricsOnRMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, @@ -1876,6 +1876,13 @@ public TestSecurityMockRM(Configuration conf, RMStateStore store) { } @Override + public void init(Configuration conf) { + // reset localServiceAddress. + RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null); + super.init(conf); + } + + @Override protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), getResourceScheduler(), rmAppManager, applicationACLsManager, null, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 203e716..31eaa71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -43,8 +43,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -82,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -179,7 +178,6 @@ public void setUp() throws Exception { dispatcher = new AsyncDispatcher(eventQueue); Renewer.reset(); delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter); - delegationTokenRenewer.init(conf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -190,6 +188,7 @@ public void setUp() throws Exception { InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); delegationTokenRenewer.setRMContext(mockContext); + delegationTokenRenewer.init(conf); delegationTokenRenewer.start(); } @@ -515,7 +514,6 @@ public void testDTKeepAlive1 () throws Exception { 1000l); DelegationTokenRenewer localDtr = createNewDelegationTokenRenewer(lconf, counter); - localDtr.init(lconf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); @@ -526,6 +524,7 @@ public void testDTKeepAlive1 () throws Exception { InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); localDtr.setRMContext(mockContext); + localDtr.init(lconf); localDtr.start(); MyFS dfs = (MyFS)FileSystem.get(lconf); @@ -592,7 +591,6 @@ public void testDTKeepAlive2() throws Exception { 1000l); DelegationTokenRenewer localDtr = createNewDelegationTokenRenewer(conf, counter); - localDtr.init(lconf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); @@ -603,6 +601,7 @@ public void testDTKeepAlive2() throws Exception { InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); localDtr.setRMContext(mockContext); + localDtr.init(lconf); localDtr.start(); MyFS dfs = (MyFS)FileSystem.get(lconf); @@ -704,7 +703,6 @@ public Long answer(InvocationOnMock invocation) // fire up the renewer final DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter); - dtr.init(conf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); @@ -713,6 +711,7 @@ public Long answer(InvocationOnMock invocation) when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); dtr.setRMContext(mockContext); when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); + dtr.init(conf); dtr.start(); // submit a job that blocks during renewal Thread submitThread = new Thread() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewerLifecycle.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewerLifecycle.java index d061d79..637bf37 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewerLifecycle.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewerLifecycle.java @@ -19,7 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.junit.Test; /** @@ -32,9 +37,13 @@ @Test public void testStartupFailure() throws Exception { Configuration conf = new Configuration(); - DelegationTokenRenewer delegationTokenRenewer = new DelegationTokenRenewer(); + DelegationTokenRenewer delegationTokenRenewer = + new DelegationTokenRenewer(); + RMContext mockContext = mock(RMContext.class); + ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + delegationTokenRenewer.setRMContext(mockContext); delegationTokenRenewer.init(conf); delegationTokenRenewer.stop(); } - }