diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 0b3a364..6cd5edf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -27,12 +27,14 @@ import java.io.File; import java.io.FileOutputStream; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -98,6 +100,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -108,18 +114,35 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) public class TestRMRestart { private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); + private static final String FS_ALLOC_FILE = + new File(TEMP_DIR, "test-fs-queues.xml").getAbsolutePath(); private YarnConfiguration conf; // Fake rmAddr for token-renewal private static InetSocketAddress rmAddr; + private int schedulerType = 0; + + @Parameters + public static Collection configRunnings() { + return Arrays.asList(new Object[][]{{0}, {1}, {2}}); + } + + public TestRMRestart(int para) { + this.schedulerType = para; + } + @Before - public void setup() throws UnknownHostException { + public void setup() throws IOException { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); conf = new YarnConfiguration(); @@ -128,6 +151,38 @@ public void setup() throws UnknownHostException { conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); rmAddr = new InetSocketAddress("localhost", 8032); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); + + // Configure scheduler + switch (schedulerType) { + case 0: + configFifoScheduler(); + break; + case 1: + configCapacityScheduler(); + break; + case 2: + configFairScheduler(); + break; + } + } + + private void configFifoScheduler() { + conf.set(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class.getName()); + } + private void configCapacityScheduler() { + conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); + } + private void configFairScheduler() throws IOException { + // Disable queueMaxAMShare limitation for fair scheduler + PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("-1.0"); + out.println(""); + out.close(); + + conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE); } @After @@ -605,6 +660,11 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception { RMAppAttemptState.SCHEDULED); Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2 .getCurrentAppAttempt().getAppAttemptState()); + + rm1.stop(); + rm2.stop(); + rm3.stop(); + rm4.stop(); } // Test RM restarts after previous attempt succeeded and was saved into state @@ -662,6 +722,9 @@ public void updateApplicationStateInternal(ApplicationId appId, // app final state is saved via the finish event from attempt. Assert.assertEquals(RMAppState.FINISHED, rmAppState.get(app0.getApplicationId()).getState()); + + rm1.stop(); + rm2.stop(); } @Test (timeout = 60000) @@ -799,6 +862,9 @@ public synchronized void updateApplicationAttemptStateInternal( rm2.getRMContext().getRMApps().get(app0.getApplicationId()); rm2.waitForState(loadedApp0.getApplicationId(), RMAppState.KILLED); Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0); + + rm1.stop(); + rm2.stop(); } @Test (timeout = 60000) @@ -1443,6 +1509,10 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() RMApp app = rm2.submitApp(200, "name", "user", new HashMap(), false, "default", 1, ts); rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + + // stop RM + rm1.stop(); + rm2.stop(); } @Test (timeout = 60000) @@ -1611,6 +1681,8 @@ public void testClientRetryOnKillingApplication() throws Exception { rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); Assert.assertEquals(1, ((TestMemoryRMStateStore) memStore).updateAttempt); Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp); + + rm1.stop(); } // Test Application that fails on submission is saved in state store. @@ -1658,6 +1730,9 @@ protected Credentials parseCredentials( rm2.start(); // Restarted RM has the failed app info too. rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); + + rm1.stop(); + rm2.stop(); } @Test (timeout = 20000) @@ -1928,6 +2003,9 @@ protected void serviceStart() throws Exception { } MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1); MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1); + + rm1.stop(); + rm2.stop(); } private void writeToHostsFile(String... hosts) throws IOException {