diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 16c7ac7..7d122e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; @@ -180,13 +181,13 @@ protected void serviceInit(Configuration conf) throws Exception { this.conf = conf; this.rmContext = new RMContextImpl(); - rmDispatcher = createDispatcher(); + /* + * Register the handlers for all AlwaysOn services using setupDispatcher(). + */ + rmDispatcher = setupDispatcher(); addIfService(rmDispatcher); rmContext.setDispatcher(rmDispatcher); - rmDispatcher.register(RMFatalEventType.class, - new ResourceManager.RMFatalEventDispatcher(this.rmContext, this)); - adminService = createAdminService(); addService(adminService); rmContext.setRMAdminService(adminService); @@ -832,6 +833,7 @@ synchronized void transitionToStandby(boolean initialize) HAServiceProtocol.HAServiceState.ACTIVE) { stopActiveServices(); if (initialize) { + resetDispatcher(); createAndInitActiveServices(); } } @@ -994,4 +996,22 @@ private static void setHttpPolicy(Configuration conf) { YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT))); } + + /* + * Register the handlers for alwaysOn services + */ + private Dispatcher setupDispatcher() { + Dispatcher dispatcher = createDispatcher(); + dispatcher.register(RMFatalEventType.class, + new ResourceManager.RMFatalEventDispatcher(this.rmContext, this)); + return dispatcher; + } + + private void resetDispatcher() { + Dispatcher dispatcher = setupDispatcher(); + ((Service)dispatcher).init(this.conf); + ((Service)dispatcher).start(); + rmDispatcher = dispatcher; + rmContext.setDispatcher(rmDispatcher); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index dc98b7c..d7f3f42 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -26,13 +26,22 @@ import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HealthCheckFailedException; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -222,4 +231,120 @@ public void testTransitionsWhenAutomaticFailoverEnabled() throws IOException { checkMonitorHealth(); checkActiveRMFunctionality(); } + + @Test + public void testRMDispatcherForHA() throws IOException { + Configuration conf = new YarnConfiguration(configuration); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return new MyDispatcher(); + } + }; + rm.init(conf); + int expectedNumOfEventDispatcher = + ((MyDispatcher) rm.getRMContext().getDispatcher()) + .getNumOfEventDispatchers(); + assertTrue(expectedNumOfEventDispatcher != 0); + + StateChangeRequestInfo requestInfo = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + assertEquals(STATE_ERR, HAServiceState.INITIALIZING, + rm.adminService.getServiceStatus().getState()); + assertFalse("RM is ready to become active before being started", + rm.adminService.getServiceStatus().isReadyToBecomeActive()); + rm.start(); + + //call transitions to standby and active a couple of times + rm.adminService.transitionToStandby(requestInfo); + rm.adminService.transitionToActive(requestInfo); + rm.adminService.transitionToStandby(requestInfo); + rm.adminService.transitionToActive(requestInfo); + rm.adminService.transitionToStandby(requestInfo); + rm.adminService.transitionToActive(requestInfo); + rm.adminService.transitionToStandby(requestInfo); + + assertTrue(expectedNumOfEventDispatcher == + ((MyDispatcher) rm.getRMContext().getDispatcher()) + .getNumOfEventDispatchers()); + + rm.stop(); + } + + @SuppressWarnings("rawtypes") + class MyDispatcher extends AbstractService implements Dispatcher { + + protected final Map, EventHandler> eventDispatchers; + + public MyDispatcher() { + super("MyDispatcher"); + this.eventDispatchers = + new HashMap, EventHandler>(); + } + + @Override + public EventHandler getEventHandler() { + return null; + } + + @SuppressWarnings("unchecked") + @Override + public void register(Class eventType, EventHandler handler) { + EventHandler registeredHandler = (EventHandler) + eventDispatchers.get(eventType); + LOG.info("Registering " + eventType + " for " + handler.getClass()); + if (registeredHandler == null) { + eventDispatchers.put(eventType, handler); + } else if (!(registeredHandler instanceof MultiListenerHandler)) { + /* for multiple listeners of an event add the multiple listener handler */ + MultiListenerHandler multiHandler = new MultiListenerHandler(); + multiHandler.addHandler(registeredHandler); + multiHandler.addHandler(handler); + eventDispatchers.put(eventType, multiHandler); + } else { + /* already a multilistener, just add to it */ + MultiListenerHandler multiHandler = + (MultiListenerHandler) registeredHandler; + multiHandler.addHandler(handler); + } + } + + class MultiListenerHandler implements EventHandler { + List> listofHandlers; + + public MultiListenerHandler() { + listofHandlers = new ArrayList>(); + } + + @Override + public void handle(Event event) { + for (EventHandler handler: listofHandlers) { + handler.handle(event); + } + } + + void addHandler(EventHandler handler) { + listofHandlers.add(handler); + } + + public int numOfDispatchers() { + return listofHandlers.size(); + } + } + + public int getNumOfEventDispatchers() { + int num = 0; + for( Entry, EventHandler> dispatcher + : eventDispatchers.entrySet()) { + if (dispatcher.getValue() instanceof MultiListenerHandler) { + num += ((MultiListenerHandler)dispatcher.getValue()) + .numOfDispatchers(); + } else { + num ++; + } + } + return num; + } + } }