diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 1281c24..b2e3366 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,13 +35,16 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -50,6 +55,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; @@ -82,6 +88,8 @@ private ResourceManagerWrapper resourceManagerWrapper; + private ConcurrentMap appMasters = new ConcurrentHashMap(); + private File testWorkDir; // Number of nm-local-dirs per nodemanager @@ -211,7 +219,36 @@ public synchronized void serviceStart() throws Exception { @Override protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. + } + + @Override + protected ApplicationMasterService createApplicationMasterService() { + return new ApplicationMasterService(this.rmContext, scheduler) { + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + RegisterApplicationMasterResponse response = super.registerApplicationMaster(request); + appMasters.put(request.getTrackingUrl(), request.getTrackingUrl()); + return response; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + FinishApplicationMasterResponse response = super.finishApplicationMaster(request); + appMasters.remove(request.getTrackingUrl()); + synchronized (appMasters) { + appMasters.notify(); + } + return response; + } + + }; }; + }; resourceManager.init(getConfig()); new Thread() { @@ -241,9 +278,22 @@ public void run() { getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS)); } + private void waitAppMastersToFinish(long timeoutMillis) throws InterruptedException { + long started = System.currentTimeMillis(); + while (!appMasters.isEmpty() && System.currentTimeMillis() - started < timeoutMillis) { + synchronized (appMasters) { + appMasters.wait(1000); + } + } + if (!appMasters.isEmpty()) { + LOG.warn("Stopping RM while some app masters are still alive"); + } + } + @Override public synchronized void serviceStop() throws Exception { if (resourceManager != null) { + waitAppMastersToFinish(5000); resourceManager.stop(); } super.serviceStop();