diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 1281c24..ecaa5b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,13 +35,14 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -52,6 +55,10 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; /** * Embedded Yarn minicluster for testcases that need to interact with a cluster. @@ -82,6 +89,9 @@ private ResourceManagerWrapper resourceManagerWrapper; + private ConcurrentMap appMasters = + new ConcurrentHashMap(16, 0.75f, 2); + private File testWorkDir; // Number of nm-local-dirs per nodemanager @@ -211,9 +221,19 @@ public synchronized void serviceStart() throws Exception { @Override protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. - }; + } }; resourceManager.init(getConfig()); + resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class, + new EventHandler() { + public void handle(RMAppAttemptEvent event) { + if (event instanceof RMAppAttemptRegistrationEvent) { + appMasters.put(event.getApplicationAttemptId(), event.getTimestamp()); + } else if (event instanceof RMAppAttemptUnregistrationEvent) { + appMasters.remove(event.getApplicationAttemptId()); + } + } + }); new Thread() { public void run() { resourceManager.start(); @@ -241,9 +261,22 @@ public void run() { getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS)); } + private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException { + long started = System.currentTimeMillis(); + synchronized (appMasters) { + while (!appMasters.isEmpty() && System.currentTimeMillis() - started < timeoutMillis) { + appMasters.wait(1000); + } + } + if (!appMasters.isEmpty()) { + LOG.warn("Stopping RM while some app masters are still alive"); + } + } + @Override public synchronized void serviceStop() throws Exception { if (resourceManager != null) { + waitForAppMastersToFinish(5000); resourceManager.stop(); } super.serviceStop();