diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 55b748d..caac1a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -225,8 +225,9 @@ protected synchronized void checkAppNumCompletedLimit() { ApplicationId removeId = completedApps.remove(); LOG.info("Application should be expired, max # apps" + " met. Removing app: " + removeId); - rmContext.getRMApps().remove(removeId); + RMApp removeApp = rmContext.getRMApps().remove(removeId); this.applicationACLsManager.removeApplication(removeId); + rmContext.getStateStore().removeApplication(removeApp); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 46a58fc..2a82993 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -63,7 +63,7 @@ public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); - private static final String ROOT_DIR_NAME = "FSRMStateRoot"; + protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; protected FileSystem fs; @@ -321,7 +321,7 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationState(ApplicationState appState) + public synchronized void removeApplicationStateInternal(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); Path nodeRemovePath = getAppDir(rmAppRoot, appId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 495c292..c7dc4e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -167,8 +167,8 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationState(ApplicationState appState) - throws Exception { + public synchronized void removeApplicationStateInternal( + ApplicationState appState) throws Exception { ApplicationId appId = appState.getAppId(); ApplicationState removed = state.appState.remove(appId); if (removed == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index c8ad1c4..5fed191 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -62,7 +62,7 @@ protected void storeApplicationAttemptStateInternal(String attemptId, } @Override - protected void removeApplicationState(ApplicationState appState) + protected void removeApplicationStateInternal(ApplicationState appState) throws Exception { // Do nothing } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 911107d..f161fd4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -457,6 +457,7 @@ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey) * This does not block the dispatcher threads * There is no notification of completion for this operation. */ + @SuppressWarnings("unchecked") public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getStartTime(), @@ -470,14 +471,6 @@ public synchronized void removeApplication(RMApp app) { appState.attempts.put(attemptState.getAttemptId(), attemptState); } - removeApplication(appState); - } - - @SuppressWarnings("unchecked") - /** - * Non-Blocking API - */ - public synchronized void removeApplication(ApplicationState appState) { dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); } @@ -486,8 +479,8 @@ public synchronized void removeApplication(ApplicationState appState) { * Derived classes must implement this method to remove the state of an * application and its attempts */ - protected abstract void removeApplicationState(ApplicationState appState) - throws Exception; + protected abstract void removeApplicationStateInternal( + ApplicationState appState) throws Exception; // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See // YARN-986 @@ -615,7 +608,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { Exception removedException = null; LOG.info("Removing info for app: " + appId); try { - removeApplicationState(appState); + removeApplicationStateInternal(appState); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); removedException = e; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 628d260..6992981 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -65,7 +65,7 @@ public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); - private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; + protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; private int numRetries; private String zkHostPort = null; @@ -342,7 +342,7 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationState(ApplicationState appState) + public synchronized void removeApplicationStateInternal(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); String nodeRemovePath = getNodePath(rmAppRoot, appId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 6698412..336390f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import java.util.HashMap; import java.util.List; @@ -43,6 +47,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -99,7 +104,7 @@ public static RMContext mockRMContext(int n, long time) { rmDispatcher); AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor( rmDispatcher); - return new RMContextImpl(rmDispatcher, + return new RMContextImpl(rmDispatcher, mock(RMStateStore.class), containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, null, null, null, null) { @Override @@ -243,6 +248,8 @@ public void testRMAppRetireNone() throws Exception { rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 10, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), never()).removeApplication( + isA(RMApp.class)); } @Test @@ -266,6 +273,8 @@ public void testRMAppRetireSome() throws Exception { rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 3, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), times(7)).removeApplication( + isA(RMApp.class)); } @Test @@ -282,6 +291,7 @@ public void testRMAppRetireSomeDifferentStates() throws Exception { rmContext.getRMApps().clear(); Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size()); + // 6 applications are in final state, 4 are not in final state. // / set with various finished states RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED); rmContext.getRMApps().put(app.getApplicationId(), app); @@ -318,7 +328,9 @@ public void testRMAppRetireSomeDifferentStates() throws Exception { rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 2, appMonitor.getCompletedAppsListSize()); - + // 6 applications in final state, 4 of them are removed + verify(rmContext.getStateStore(), times(4)).removeApplication( + isA(RMApp.class)); } @Test @@ -360,6 +372,8 @@ public void testRMAppRetireZeroSetting() throws Exception { rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 0, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), times(10)).removeApplication( + isA(RMApp.class)); } protected void setupDispatcher(RMContext rmContext, Configuration conf) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index f87f689..4255d25 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -396,6 +396,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception { rm2.getRMContext().getRMApps().get(app0.getApplicationId()); Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState()); + rm1.stop(); + rm2.stop(); } @Test @@ -443,6 +445,8 @@ public void testRMRestartFailedApp() throws Exception { .contains("Failing the application.")); // failed diagnostics from attempt is lost because the diagnostics from // attempt is not yet available by the time app is saving the app state. + rm1.stop(); + rm2.stop(); } @Test @@ -489,6 +493,8 @@ public void testRMRestartKilledApp() throws Exception{ ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2); Assert.assertEquals(app0.getDiagnostics().toString(), appReport.getDiagnostics()); + rm1.stop(); + rm2.stop(); } @Test @@ -538,6 +544,9 @@ public void testRMRestartSucceededApp() throws Exception { Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, appReport.getFinalApplicationStatus()); Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl()); + + rm1.stop(); + rm2.stop(); } @Test @@ -620,6 +629,9 @@ public void testRMRestartGetApplicationList() throws Exception { rm2.getClientRMService().getApplications(request2); List appList2 = response2.getApplicationList(); Assert.assertTrue(3 == appList2.size()); + + rm1.stop(); + rm2.stop(); } private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) @@ -1122,6 +1134,52 @@ protected void handleStoreEvent(RMStateStoreEvent event) { Assert.assertTrue(rmAppState.size() == NUM_APPS); } + @Test + public void testFinishedAppRemovalAfterRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1); + memStore.init(conf); + RMState rmState = memStore.getState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create an app and finish the app. + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + finishApplicationMaster(app0, rm1, nm1, am0); + + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1 = rm2.registerNode("127.0.0.1:1234", 15120); + + Map rmAppState = + rmState.getApplicationState(); + + // app0 exits in both state store and rmContext + Assert.assertEquals(RMAppState.FINISHED, + rmAppState.get(app0.getApplicationId()).getState()); + rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED); + + // create one more app and finish the app. + RMApp app1 = rm2.submitApp(200); + MockAM am1 = launchAM(app1, rm2, nm1); + finishApplicationMaster(app1, rm2, nm1, am1); + + // the first app0 get kicked out from both rmContext and state store + Assert.assertNull(rm2.getRMContext().getRMApps() + .get(app0.getApplicationId())); + Assert.assertNull(rmAppState.get(app0.getApplicationId())); + + rm1.stop(); + rm2.stop(); + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 95c14bf..047895c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -106,6 +107,7 @@ public EventHandler getEventHandler() { interface RMStateStoreHelper { RMStateStore getRMStateStore() throws Exception; boolean isFinalStateValid() throws Exception; + boolean appExists(RMApp app) throws Exception; } void waitNotify(TestDispatcher dispatcher) { @@ -125,7 +127,7 @@ void waitNotify(TestDispatcher dispatcher) { dispatcher.notified = false; } - void storeApp(RMStateStore store, ApplicationId appId, long submitTime, + RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime, long startTime) throws Exception { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); @@ -138,6 +140,7 @@ void storeApp(RMStateStore store, ApplicationId appId, long submitTime, when(mockApp.getApplicationSubmissionContext()).thenReturn(context); when(mockApp.getUser()).thenReturn("test"); store.storeNewApplication(mockApp); + return mockApp; } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, @@ -367,6 +370,7 @@ public void testRMDTSecretManagerStateStore( Assert.assertEquals(keySet, secretManagerState.getMasterKeyState()); Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + store.close(); } private Token generateAMRMToken( @@ -379,4 +383,32 @@ public void testRMDTSecretManagerStateStore( appToken.setService(new Text("appToken service")); return appToken; } + + public void testAppDeletion(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + + // create and store apps + ArrayList appList = new ArrayList(); + int NUM_APPS = 5; + for (int i = 0; i < NUM_APPS; i++) { + ApplicationId appId = ApplicationId.newInstance(1383183338, i); + RMApp app = storeApp(store, appId, 123456789, 987654321); + appList.add(app); + } + + Assert.assertEquals(NUM_APPS, appList.size()); + for (RMApp app : appList) { + // wait for app to be stored. + while (!stateStoreHelper.appExists(app)); + } + + for (RMApp app : appList) { + // remove the app + store.removeApplication(app); + // wait for app to be removed. + while (stateStoreHelper.appExists(app)); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index a1a6eab..bffb94b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import static org.junit.Assert.assertTrue; + +import java.io.IOException; + import junit.framework.Assert; import org.apache.commons.logging.Log; @@ -32,6 +35,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -42,7 +46,7 @@ class TestFSRMStateStoreTester implements RMStateStoreHelper { Path workingDirPathURI; - FileSystemRMStateStore store; + TestFileSystemRMStore store; MiniDFSCluster cluster; class TestFileSystemRMStore extends FileSystemRMStateStore { @@ -54,6 +58,13 @@ start(); Assert.assertNotNull(fs); } + + public Path getAppDir(String appId) { + Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME); + Path appRootDir = new Path(rootDir, RM_APP_ROOT); + Path appDir = new Path(appRootDir, appId); + return appDir; + } } public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { @@ -81,9 +92,17 @@ public boolean isFinalStateValid() throws Exception { FileStatus[] files = fs.listStatus(workingDirPathURI); return files.length == 1; } + + @Override + public boolean appExists(RMApp app) throws IOException { + FileSystem fs = cluster.getFileSystem(); + Path nodePath = + store.getAppDir(app.getApplicationId().toString()); + return fs.exists(nodePath); + } } - @Test + @Test(timeout = 60000) public void testFSRMStateStore() throws Exception { HdfsConfiguration conf = new HdfsConfiguration(); MiniDFSCluster cluster = @@ -98,11 +117,8 @@ public void testFSRMStateStore() throws Exception { String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003"; ApplicationAttemptId attemptId3 = ConverterUtils.toApplicationAttemptId(appAttemptIdStr3); - Path rootDir = - new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot"); - Path appRootDir = new Path(rootDir, "RMAppRoot"); Path appDir = - new Path(appRootDir, attemptId3.getApplicationId().toString()); + fsTester.store.getAppDir(attemptId3.getApplicationId().toString()); Path tempAppAttemptFile = new Path(appDir, attemptId3.toString() + ".tmp"); fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false); @@ -110,9 +126,10 @@ public void testFSRMStateStore() throws Exception { fsOut.close(); testRMAppStateStore(fsTester); - Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath + Assert.assertFalse(fsTester.workingDirPathURI .getFileSystem(conf).exists(tempAppAttemptFile)); testRMDTSecretManagerStateStore(fsTester); + testAppDeletion(fsTester); } finally { cluster.shutdown(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index a6929a8..6f580e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -27,7 +27,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.Test; public class TestZKRMStateStore extends RMStateStoreTestBase { @@ -37,7 +39,8 @@ class TestZKRMStateStoreTester implements RMStateStoreHelper { ZooKeeper client; - ZKRMStateStore store; + TestZKRMStateStoreInternal store; + String workingZnode; class TestZKRMStateStoreInternal extends ZKRMStateStore { @@ -52,11 +55,16 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) public ZooKeeper getNewZooKeeper() throws IOException { return client; } + + public String getAppNode(String appId) { + return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/" + + appId; + } } public RMStateStore getRMStateStore() throws Exception { - String workingZnode = "/Test"; YarnConfiguration conf = new YarnConfiguration(); + workingZnode = "/Test"; conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); this.client = createClient(); @@ -69,12 +77,21 @@ public boolean isFinalStateValid() throws Exception { List nodes = client.getChildren(store.znodeWorkingPath, false); return nodes.size() == 1; } + + @Override + public boolean appExists(RMApp app) throws Exception { + Stat node = + client.exists(store.getAppNode(app.getApplicationId().toString()), + false); + return node !=null; + } } - @Test + @Test (timeout = 60000) public void testZKRMStateStoreRealZK() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); + testAppDeletion(zkTester); } }