diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index f670112..b29b87b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; @@ -138,9 +139,14 @@ protected void serviceStop() throws Exception { if (drainEventsOnStop) { blockNewEvents = true; LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); + long endTime = System.currentTimeMillis() + getConfig() + .getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) / 2; + synchronized (waitForDrained) { while (!drained && eventHandlingThread != null - && eventHandlingThread.isAlive()) { + && eventHandlingThread.isAlive() + && System.currentTimeMillis() < endTime) { waitForDrained.wait(1000); LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + eventHandlingThread.getState()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index b5fd923..7b28659 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -18,18 +18,17 @@ package org.apache.hadoop.yarn.event; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; - import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.junit.Assert; import org.junit.Test; +import static org.mockito.Mockito.*; + public class TestAsyncDispatcher { /* This test checks whether dispatcher hangs on close if following two things @@ -58,5 +57,23 @@ public void testDispatcherOnCloseIfQueueEmpty() throws Exception { eventQueue.isEmpty()); disp.close(); } + + // Test dispatcher should timeout on draining events. + @Test(timeout=10000) + public void testDispatchStopOnTimeout() throws Exception { + BlockingQueue eventQueue = new LinkedBlockingQueue(); + eventQueue = spy(eventQueue); + // simulate dispatcher is not drained. + when(eventQueue.isEmpty()).thenReturn(false); + + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 2000); + DrainDispatcher disp = new DrainDispatcher(eventQueue); + disp.init(conf); + disp.setDrainEventsOnStop(); + disp.start(); + disp.waitForEventThreadToWait(); + disp.close(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 1b606b4..9cf6bf6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -410,6 +410,18 @@ protected void serviceInit(Configuration configuration) throws Exception { conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + // Add ATS writer first so that other services can be stopped first. + // This is to avoid keeping other rpc servers open while the writer + // takes long time to flush the events on stop + RMApplicationHistoryWriter rmApplicationHistoryWriter = + createRMApplicationHistoryWriter(); + addService(rmApplicationHistoryWriter); + rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + + SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher(); + addService(systemMetricsPublisher); + rmContext.setSystemMetricsPublisher(systemMetricsPublisher); + rmSecretManagerService = createRMSecretManagerService(); addService(rmSecretManagerService); @@ -466,15 +478,6 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } - RMApplicationHistoryWriter rmApplicationHistoryWriter = - createRMApplicationHistoryWriter(); - addService(rmApplicationHistoryWriter); - rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - - SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher(); - addService(systemMetricsPublisher); - rmContext.setSystemMetricsPublisher(systemMetricsPublisher); - // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); @@ -589,9 +592,11 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { + super.serviceStop(); + // Stop other services first in case metrics system and state-store takes + // long time to flush the events. DefaultMetricsSystem.shutdown(); - if (rmContext != null) { RMStateStore store = rmContext.getStateStore(); try { @@ -600,8 +605,6 @@ protected void serviceStop() throws Exception { LOG.error("Error closing store.", e); } } - - super.serviceStop(); } protected void createPolicyMonitors() { @@ -1024,12 +1027,12 @@ synchronized void transitionToStandby(boolean initialize) } LOG.info("Transitioning to standby state"); + rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) { stopActiveServices(); reinitialize(initialize); } - rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); LOG.info("Transitioned to standby state"); }