diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 6335a8b..7bd7126 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -82,24 +82,24 @@ public void run() { } @Override - public synchronized void init(Configuration conf) { + protected void innerInit(Configuration conf) throws Exception { this.exitOnDispatchException = conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); - super.init(conf); + super.innerInit(conf); } @Override - public void start() { + protected void innerStart() throws Exception { //start all the components - super.start(); + super.innerStart(); eventHandlingThread = new Thread(createThread()); eventHandlingThread.setName("AsyncDispatcher event handler"); eventHandlingThread.start(); } @Override - public void stop() { + protected void innerStop() throws Exception { stopped = true; if (eventHandlingThread != null) { eventHandlingThread.interrupt(); @@ -111,7 +111,7 @@ public void stop() { } // stop all the components - super.stop(); + super.innerStop(); } @SuppressWarnings("unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index c8603ab..bf84125 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -125,8 +125,9 @@ private static void logIOException(String comment, IOException e) { public AggregatedLogDeletionService() { super(AggregatedLogDeletionService.class.getName()); } - - public void start() { + + @Override + protected void innerStart() throws Exception { Configuration conf = getConfig(); if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { @@ -150,14 +151,14 @@ public void start() { TimerTask task = new LogDeletionTask(conf, retentionSecs); timer = new Timer(); timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); - super.start(); + super.innerStart(); } @Override - public void stop() { + protected void innerStop() throws Exception { if(timer != null) { timer.cancel(); } - super.stop(); + super.innerStop(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java index eeea1e1..51b657c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java @@ -18,26 +18,33 @@ package org.apache.hadoop.yarn.service; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +/** + * This is the base implementation class for YARN services. + */ public abstract class AbstractService implements Service { private static final Log LOG = LogFactory.getLog(AbstractService.class); /** - * Service state: initially {@link STATE#NOTINITED}. - */ - private STATE state = STATE.NOTINITED; - - /** * Service name. */ private final String name; + + /** service state */ + private final ServiceStateModel stateModel; + /** * Service start time. Will be zero until the service is started. */ @@ -46,14 +53,49 @@ /** * The configuration. Will be null until the service is initialized. */ - private Configuration config; + private volatile Configuration config; /** * List of state change listeners; it is final to ensure * that it will never be null. */ - private List listeners = - new ArrayList(); + private final ServiceOperations.ServiceListeners listeners + = new ServiceOperations.ServiceListeners(); + /** + * Static listeners to all events across all services + */ + private static ServiceOperations.ServiceListeners globalListeners + = new ServiceOperations.ServiceListeners(); + + /** + * The cause of any failure -will be null. + * if a service did not stop due to a failure. + */ + private Exception failureCause; + + /** + * the state in which the service was when it failed. + * Only valid when the service is stopped due to a failure + */ + private STATE failureState = null; + + /** + * object used to co-ordinate {@link #waitForServiceToStop(long)} + * across threads. + */ + private final AtomicBoolean terminationNotification = + new AtomicBoolean(false); + + /** + * History of lifecycle transitions + */ + private final List lifecycleHistory + = new ArrayList(5); + + /** + * Map of blocking dependencies + */ + private final Map blockerMap = new HashMap(); /** * Construct the service. @@ -61,68 +103,290 @@ */ public AbstractService(String name) { this.name = name; + stateModel = new ServiceStateModel(name); } @Override - public synchronized STATE getServiceState() { - return state; + public final STATE getServiceState() { + return stateModel.getState(); + } + + @Override + public final synchronized Throwable getFailureCause() { + return failureCause; + } + + @Override + public synchronized STATE getFailureState() { + return failureState; + } + + + /** + * Set the configuration for this service. + * This method is called during {@link #init(Configuration)} + * and should only be needed if for some reason a service implementation + * needs to override that initial setting -for example replacing + * it with a new subclass of {@link Configuration} + * @param conf new configuration. + */ + protected void setConfig(Configuration conf) { + this.config = conf; } /** * {@inheritDoc} - * @throws IllegalStateException if the current service state does not permit - * this action + * This invokes {@link #innerInit} + * @param conf the configuration of the service. This must not be null + * @throws ServiceStateException if the configuration was null, + * the state change not permitted, or something else went wrong */ @Override - public synchronized void init(Configuration conf) { - ensureCurrentState(STATE.NOTINITED); - this.config = conf; - changeState(STATE.INITED); - LOG.info("Service:" + getName() + " is inited."); + public void init(Configuration conf) { + /* ISSUE: should null configuration be allowed or rejected? + if (conf == null) { + throw new ServiceStateException("Cannot initialize service " + + getName() + ": null configuration"); + } + */ + enterState(STATE.INITED); + setConfig(conf); + try { + innerInit(config); + notifyListeners(); + } catch (Exception e) { + noteFailure(e); + stopQuietly(this); + throw ServiceStateException.convert(e); + } } /** * {@inheritDoc} - * @throws IllegalStateException if the current service state does not permit + * @throws ServiceStateException if the current service state does not permit * this action */ @Override - public synchronized void start() { - startTime = System.currentTimeMillis(); - ensureCurrentState(STATE.INITED); - changeState(STATE.STARTED); - LOG.info("Service:" + getName() + " is started."); + public void start() { + //enter the started state + stateModel.enterState(STATE.STARTED); + try { + startTime = System.currentTimeMillis(); + innerStart(); + LOG.info("Service " + getName() + " is started"); + notifyListeners(); + } catch (Exception e) { + noteFailure(e); + stopQuietly(this); + throw ServiceStateException.convert(e); + } } /** * {@inheritDoc} - * @throws IllegalStateException if the current service state does not permit - * this action */ @Override - public synchronized void stop() { - if (state == STATE.STOPPED || - state == STATE.INITED || - state == STATE.NOTINITED) { - // already stopped, or else it was never - // started (eg another service failing canceled startup) + public void stop() { + //this operation is only invoked if the service is not already stopped; + // it is not an error + //to go STOPPED->STOPPED -it is just a no-op + if (enterState(STATE.STOPPED) != STATE.STOPPED) { + try { + innerStop(); + } catch (Exception e) { + //stop-time exceptions are logged if they are the first one, + noteFailure(e); + throw ServiceStateException.convert(e); + } finally { + //report that the service has terminated + synchronized (terminationNotification) { + terminationNotification.set(true); + terminationNotification.notifyAll(); + } + //notify anything listening for events + notifyListeners(); + } + } else { + //already stopped: note it + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring re-entrant call to stop()"); + } + } + } + + /** + * Relay to {@link #stop()} + * @throws IOException + */ + @Override + public final void close() throws IOException { + stop(); + } + + /** + * Stop a service, logging problems to this + * service's log. + * + * @param service service to stop -can be null + * @see ServiceOperations#stopQuietly(Log, Service) + */ + protected void stopQuietly(Service service) { + ServiceOperations.stopQuietly(LOG, service); + } + + /** + * Stop a service + * + * @param service service to stop -can be null + * @see ServiceOperations#stop(Service) + */ + protected void stopService(Service service) { + ServiceOperations.stop(service); + } + + /** + * Failure handling: record the exception + * that triggered it -if there was not one already. + * Services are free to call this themselves. + * @param exception the exception + */ + protected final void noteFailure(Exception exception) { + if (LOG.isDebugEnabled()) { + LOG.debug("noteFailure " + exception, null); + } + if (exception == null) { + //make sure failure logic doesn't itself cause problems return; } - ensureCurrentState(STATE.STARTED); - changeState(STATE.STOPPED); - LOG.info("Service:" + getName() + " is stopped."); + //record the failure details, and log it + synchronized (this) { + if (failureCause == null) { + failureCause = exception; + failureState = getServiceState(); + LOG.info("Service " + getName() + + "failed in state " + failureState + + "; cause: " + exception, + exception); + } + } } @Override - public synchronized void register(ServiceStateChangeListener l) { + public final boolean waitForServiceToStop(long timeout) { + synchronized (terminationNotification) { + boolean completed = terminationNotification.get(); + while (!completed) { + try { + terminationNotification.wait(timeout); + //here there has been a timeout, the object has terminated, + //or there has been a spurious wakeup (which we ignore) + completed = true; + } catch (InterruptedException e) { + //interrupted; have another look at the flag + completed = terminationNotification.get(); + } + } + return terminationNotification.get(); + } + } + + /* ===================================================================== */ + /* Override Points */ + /* ===================================================================== */ + + /** + * All initialization code needed by a service. + * + * This method will only ever be called once during the lifecycle of + * a specific service instance. + * + * Implementations do not need to be synchronized as the logic + * in {@link #init(Configuration)} prevents re-entrancy. + * + * The base implementation checks to see if the subclass has created + * a new configuration instance, and if so, updates the base class value + * @param conf configuration + * @throws Exception on a failure -these will be caught, + * possibly wrapped, and wil; trigger a service stop + */ + protected void innerInit(Configuration conf) throws Exception { + if (conf != config) { + LOG.debug("Config has been overridden during init"); + setConfig(conf); + } + } + + /** + * Actions called during the INITED to STARTED transition. + * + * This method will only ever be called once during the lifecycle of + * a specific service instance. + * + * Implementations do not need to be synchronized as the logic + * in {@link #start()} prevents re-entrancy. + * + * @throws Exception if needed -these will be caught, + * wrapped, and trigger a service stop + */ + protected void innerStart() throws Exception { + + } + + /** + * Actions called during the transition to the STOPPED state. + * + * This method will only ever be called once during the lifecycle of + * a specific service instance. + * + * Implementations do not need to be synchronized as the logic + * in {@link #stop()} prevents re-entrancy. + * + * Implementations MUST write this to be robust against failures, including + * checks for null references -and for the first failure to not stop other + * attempts to shut down parts of the service. + * + * @throws Exception if needed -these will be caught and logged. + */ + protected void innerStop() throws Exception { + + } + + @Override + public void register(ServiceStateChangeListener l) { listeners.add(l); } @Override - public synchronized void unregister(ServiceStateChangeListener l) { + public void unregister(ServiceStateChangeListener l) { listeners.remove(l); } + /** + * Register a global listener, which receives notifications + * from the state change events of all services in the JVM + * @param l listener + */ + public static void registerGlobalListener(ServiceStateChangeListener l) { + globalListeners.add(l); + } + + /** + * unregister a global listener. + * @param l listener to unregister + * @return true if the listener was found (and then deleted) + */ + public static boolean unregisterGlobalListener(ServiceStateChangeListener l) { + return globalListeners.remove(l); + } + + /** + * Package-scoped method for testing -resets the global listener list + */ + @VisibleForTesting + static void resetGlobalListeners() { + globalListeners.reset(); + } + @Override public String getName() { return name; @@ -139,28 +403,89 @@ public long getStartTime() { } /** - * Verify that that a service is in a given state. - * @param currentState the desired state - * @throws IllegalStateException if the service state is different from - * the desired state + * Notify local and global listeners of state changes. + * Exceptions raised by listeners are NOT passed up. + */ + private void notifyListeners() { + try { + listeners.notifyListeners(this); + globalListeners.notifyListeners(this); + } catch (Throwable e) { + LOG.warn("Exception while notifying listeners of " + this + ": " + e, + e); + } + } + + /** + * Add a state change event to the lifecycle history + */ + private void recordLifecycleEvent() { + LifecycleEvent event = new LifecycleEvent(); + event.time = System.currentTimeMillis(); + event.state = getServiceState(); + lifecycleHistory.add(event); + } + + @Override + public synchronized List getLifecycleHistory() { + return new ArrayList(lifecycleHistory); + } + + /** + * Enter a state; record this via {@link #recordLifecycleEvent} + * and log at the info level. + * @param newState the proposed new state + * @return the original state + * it wasn't already in that state, and the state model permits state re-entrancy. + */ + private STATE enterState(STATE newState) { + assert stateModel!=null: "null state in "+name + " " + this.getClass(); + STATE original = stateModel.enterState(newState); + if (original != newState) { + LOG.info("Service:" + getName() + " entered state " + getServiceState()); + recordLifecycleEvent(); + } + return original; + } + + @Override + public final boolean inState(Service.STATE expected) { + return stateModel.inState(expected); + } + + @Override + public String toString() { + return "Service " + name + " in state " + stateModel; + } + + /** + * Put a blocker to the blocker map -replacing any + * with the same name. + * @param name blocker name + * @param details any specifics on the block. This must be non-null. */ - private void ensureCurrentState(STATE currentState) { - ServiceOperations.ensureCurrentState(state, currentState); + protected void putBlocker(String name, String details) { + synchronized (blockerMap) { + blockerMap.put(name, details); + } } /** - * Change to a new state and notify all listeners. - * This is a private method that is only invoked from synchronized methods, - * which avoid having to clone the listener list. It does imply that - * the state change listener methods should be short lived, as they - * will delay the state transition. - * @param newState new service state + * Remove a blocker from the blocker map - + * this is a no-op if the blocker is not present + * @param name the name of the blocker */ - private void changeState(STATE newState) { - state = newState; - //notify listeners - for (ServiceStateChangeListener l : listeners) { - l.stateChanged(this); + public void removeBlocker(String name) { + synchronized (blockerMap) { + blockerMap.remove(name); + } + } + + @Override + public Map getBlockers() { + synchronized (blockerMap) { + Map map = new HashMap(blockerMap); + return map; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java index cd4e523..4eadd1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java @@ -26,7 +26,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.YarnException; /** * Composition of services. @@ -35,6 +34,16 @@ private static final Log LOG = LogFactory.getLog(CompositeService.class); + /** + * Policy on shutdown: attempt to close everything (purest) or + * only try to close started services (which assumes + * that the service implementations may not handle the stop() operation + * except when started. + * Irrespective of this policy, if a child service fails during + * its init() or start() operations, it will have stop() called on it. + */ + protected static final boolean STOP_ONLY_STARTED_SERVICES = true; + private List serviceList = new ArrayList(); public CompositeService(String name) { @@ -53,54 +62,54 @@ protected synchronized boolean removeService(Service service) { return serviceList.remove(service); } - public synchronized void init(Configuration conf) { + protected void innerInit(Configuration conf) throws Exception { for (Service service : serviceList) { service.init(conf); } - super.init(conf); + super.innerInit(conf); } - public synchronized void start() { - int i = 0; - try { - for (int n = serviceList.size(); i < n; i++) { - Service service = serviceList.get(i); - service.start(); - } - super.start(); - } catch (Throwable e) { - LOG.error("Error starting services " + getName(), e); - // Note that the state of the failed service is still INITED and not - // STARTED. Even though the last service is not started completely, still - // call stop() on all services including failed service to make sure cleanup - // happens. - stop(i); - throw new YarnException("Failed to Start " + getName(), e); + protected void innerStart() throws Exception { + for (Service service : serviceList) { + // start the service. If this fails that service + // will be stopped and an exception raised + service.start(); } - + super.innerStart(); } - public synchronized void stop() { - if (this.getServiceState() == STATE.STOPPED) { - // The base composite-service is already stopped, don't do anything again. - return; - } - if (serviceList.size() > 0) { - stop(serviceList.size() - 1); - } - super.stop(); + protected void innerStop() throws Exception{ + //stop all services in reverse order + stop(serviceList.size(), STOP_ONLY_STARTED_SERVICES); + super.innerStop(); } - private synchronized void stop(int numOfServicesStarted) { + /** + * Stop the services in reverse order + * + * @param numOfServicesStarted index from where the stop should work + * @param stopOnlyStartedServices + * @throws RuntimeException the first exception raised during the + * stop process -after all services are stopped + */ + private synchronized void stop(int numOfServicesStarted, + boolean stopOnlyStartedServices) { // stop in reserve order of start - for (int i = numOfServicesStarted; i >= 0; i--) { + Exception firstException = null; + for (int i = numOfServicesStarted - 1; i >= 0; i--) { Service service = serviceList.get(i); - try { - service.stop(); - } catch (Throwable t) { - LOG.info("Error stopping " + service.getName(), t); + STATE state = service.getServiceState(); + if (!stopOnlyStartedServices || state == STATE.STARTED) { + Exception ex = ServiceOperations.stopQuietly(LOG, service); + if (ex != null && firstException == null) { + firstException = ex; + } } } + //after stopping all services, rethrow the first exception raised + if (firstException != null) { + throw ServiceStateException.convert(firstException); + } } /** @@ -109,7 +118,7 @@ private synchronized void stop(int numOfServicesStarted) { */ public static class CompositeServiceShutdownHook implements Runnable { - private CompositeService compositeService; + private final CompositeService compositeService; public CompositeServiceShutdownHook(CompositeService compositeService) { this.compositeService = compositeService; @@ -117,13 +126,8 @@ public CompositeServiceShutdownHook(CompositeService compositeService) { @Override public void run() { - try { - // Stop the Composite Service - compositeService.stop(); - } catch (Throwable t) { - LOG.info("Error stopping " + compositeService.getName(), t); - } + ServiceOperations.stopQuietly(compositeService); } } - + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java index 314d664..a2778c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java @@ -20,6 +20,10 @@ import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import java.util.List; +import java.util.Map; + public class FilterService implements Service { private final Service service; @@ -45,6 +49,11 @@ public void stop() { } @Override + public void close() throws IOException { + service.close(); + } + + @Override public void register(ServiceStateChangeListener listener) { service.register(listener); } @@ -73,4 +82,34 @@ public STATE getServiceState() { public long getStartTime() { return startTime; } + + @Override + public boolean inState(STATE state) { + return service.inState(state); + } + + @Override + public Throwable getFailureCause() { + return service.getFailureCause(); + } + + @Override + public STATE getFailureState() { + return service.getFailureState(); + } + + @Override + public boolean waitForServiceToStop(long timeout) { + return service.waitForServiceToStop(timeout); + } + + @Override + public List getLifecycleHistory() { + return service.getLifecycleHistory(); + } + + @Override + public Map getBlockers() { + return service.getBlockers(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LoggingStateChangeListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LoggingStateChangeListener.java new file mode 100644 index 0000000..b0c3af1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LoggingStateChangeListener.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This is a state change listener that logs events at INFO level + */ +public class LoggingStateChangeListener implements ServiceStateChangeListener { + + private static final Log LOG = LogFactory.getLog(LoggingStateChangeListener.class); + + private final Log log; + + /** + * Log events to the given log + * @param log destination for events + */ + public LoggingStateChangeListener(Log log) { + //force an NPE if a null log came in + log.isDebugEnabled(); + this.log = log; + } + + /** + * Log events to the static log for this class + */ + public LoggingStateChangeListener() { + this(LOG); + } + + /** + * Callback for a state change event: log it + * @param service the service that has changed. + */ + @Override + public void stateChanged(Service service) { + log.info("Entry to state " + service.getServiceState() + + " for " + service.getName()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java index 8b8b183..be7352d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java @@ -20,34 +20,78 @@ import org.apache.hadoop.conf.Configuration; +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + /** * Service LifeCycle. */ -public interface Service { +public interface Service extends Closeable { /** * Service states */ public enum STATE { /** Constructed but not initialized */ - NOTINITED, + NOTINITED(0, "NOTINITED"), /** Initialized but not started or stopped */ - INITED, + INITED(1, "INITED"), /** started and not stopped */ - STARTED, + STARTED(2, "STARTED"), /** stopped. No further state transitions are permitted */ - STOPPED + STOPPED(3, "STOPPED"); + + /** + * An integer value for use in array lookup and JMX interfaces. + * Although {@link Enum#ordinal()} could do this, explicitly + * identify the numbers gives more stability guarantees over time. + */ + private final int value; + + /** + * A name of the state that can be used in messages + */ + private final String statename; + + private STATE(int value, String name) { + this.value = value; + this.statename = name; + } + + /** + * Get the integer value of a state + * @return the numeric value of the state + */ + public int getValue() { + return value; + } + + /** + * Get the name of a state + * @return the state's name + */ + @Override + public String toString() { + return statename; + } } /** * Initialize the service. * - * The transition must be from {@link STATE#NOTINITED} to {@link STATE#INITED} - * unless the operation failed and an exception was raised. + * The transition MUST be from {@link STATE#NOTINITED} to {@link STATE#INITED} + * unless the operation failed and an exception was raised, in which case + * {@link #stop()} MUST be invoked and the service enter the state + * {@link STATE#STOPPED}. * @param config the configuration of the service + * @throws RuntimeException on any failure during the operation + */ void init(Configuration config); @@ -55,21 +99,37 @@ /** * Start the service. * - * The transition should be from {@link STATE#INITED} to {@link STATE#STARTED} - * unless the operation failed and an exception was raised. + * The transition MUST be from {@link STATE#INITED} to {@link STATE#STARTED} + * unless the operation failed and an exception was raised, in which case + * {@link #stop()} MUST be invoked and the service enter the state + * {@link STATE#STOPPED}. + * @throws RuntimeException on any failure during the operation */ void start(); /** - * Stop the service. + * Stop the service. This MUST be a no-op if the service is already + * in the {@link STATE#STOPPED} state. It SHOULD be a best-effort attempt + * to stop all parts of the service. * - * This operation must be designed to complete regardless of the initial state - * of the service, including the state of all its internal fields. + * The implementation must be designed to complete regardless of the service + * state, including the initialized/uninitialized state of all its internal + * fields. + * @throws RuntimeException on any failure during the stop operation */ void stop(); /** + * A version of stop() that is designed to be usable in Java7 closure + * clauses. + * Implementation classes MUST relay this directly to {@link #stop()} + * @throws IOException never + * @throws RuntimeException on any failure during the stop operation + */ + void close() throws IOException; + + /** * Register an instance of the service state change events. * @param listener a new listener */ @@ -108,4 +168,67 @@ * has not yet been started. */ long getStartTime(); + + /** + * Query to see if the service is in a specific state. + * In a multi-threaded system, the state may not hold for very long. + * @param state the expected state + * @return true if, at the time of invocation, the service was in that state. + */ + boolean inState(STATE state); + + /** + * Get the first exception raised during the service failure. If null, + * no exception was logged + * @return the failure logged during a transition to the stopped state + */ + Throwable getFailureCause(); + + /** + * Get the state in which the failure in {@link #getFailureCause()} occurred. + * @return the state or null if there was no failure + */ + STATE getFailureState(); + + /** + * Block waiting for the service to stop; uses the termination notification + * object to do so. + * + * This method will only return after all the service stop actions + * have been executed (to success or failure), or the timeout elapsed + * This method can be called before the service is inited or started; this is + * to eliminate any race condition with the service stopping before + * this event occurs. + * @param timeout timeout in milliseconds. A value of zero means "forever" + * @return true iff the service stopped in the time period + */ + boolean waitForServiceToStop(long timeout); + + /** + * Get a snapshot of the lifecycle history; it is a static list + * @return a possibly empty but never null list of lifecycle events. + */ + public List getLifecycleHistory(); + + /** + * A serializable lifecycle event: the time a state + * transition occurred, and what state was entered. + */ + public class LifecycleEvent implements Serializable { + /** + * Local time in milliseconds when the event occurred + */ + public long time; + /** + * new state + */ + public Service.STATE state; + } + + /** + * Get the blockers on a service -remote dependencies + * that are stopping the service from being live. + * @return a (snapshotted) map of blocker name->description values + */ + public Map getBlockers(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java index 151caa9..6f0f17f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java @@ -21,6 +21,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ShutdownHookManager; + +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.List; /** * This class contains a set of methods to work with services, especially @@ -50,6 +55,28 @@ public static void ensureCurrentState(Service.STATE state, } /** + * Check that a state tansition is valid and + * throw an exception if not + * @param state current state + * @param proposed proposed new state + */ + public static void checkStateTransition(Service.STATE state, Service.STATE proposed) { + ServiceStateModel.checkStateTransition("", state, proposed); + } + + /** + * Check that a state tansition is valid and + * throw an exception if not + * @param service the service to probe + * @param proposed proposed new state + */ + public static void checkStateTransition(Service service, Service.STATE proposed) { + ServiceStateModel.checkStateTransition(service.getName(), + service.getServiceState(), + proposed); + } + + /** * Initialize a service. *

* The service state is checked before the operation begins. @@ -62,8 +89,7 @@ public static void ensureCurrentState(Service.STATE state, */ public static void init(Service service, Configuration configuration) { - Service.STATE state = service.getServiceState(); - ensureCurrentState(state, Service.STATE.NOTINITED); + checkStateTransition(service, Service.STATE.INITED); service.init(configuration); } @@ -79,8 +105,7 @@ public static void init(Service service, Configuration configuration) { */ public static void start(Service service) { - Service.STATE state = service.getServiceState(); - ensureCurrentState(state, Service.STATE.INITED); + checkStateTransition(service, Service.STATE.STARTED); service.start(); } @@ -111,10 +136,7 @@ public static void deploy(Service service, Configuration configuration) { */ public static void stop(Service service) { if (service != null) { - Service.STATE state = service.getServiceState(); - if (state == Service.STATE.STARTED) { - service.stop(); - } + service.stop(); } } @@ -127,14 +149,140 @@ public static void stop(Service service) { * @return any exception that was caught; null if none was. */ public static Exception stopQuietly(Service service) { + return stopQuietly(LOG, service); + } + + /** + * Stop a service; if it is null do nothing. Exceptions are caught and + * logged at warn level. (but not Throwables). This operation is intended to + * be used in cleanup operations + * + * @param log the log to warn at + * @param service a service; may be null + * @return any exception that was caught; null if none was. + * @see ServiceOperations#stopQuietly(Service) + */ + public static Exception stopQuietly(Log log, Service service) { try { stop(service); } catch (Exception e) { - LOG.warn("When stopping the service " + service.getName() - + " : " + e, + log.warn("When stopping the service " + service.getName() + + " : " + e, e); return e; } return null; } + + + /** + * Class to manage a list of {@link ServiceStateChangeListener} instances, + * including a notification loop that is robust against changes to the list + * during the notification process. + */ + public static class ServiceListeners { + /** + * List of state change listeners; it is final to guarantee + * that it will never be null. + */ + private final List listeners = + new ArrayList(); + + /** + * Thread-safe addition of a new listener to the end of a list. + * Attempts to re-register a listener that is already registered + * will be ignored. + * @param l listener + */ + public synchronized void add(ServiceStateChangeListener l) { + if(!listeners.contains(l)) { + listeners.add(l); + } + } + + /** + * Remove any registration of a listener from the listener list. + * @param l listener + * @return true if the listener was found (and then removed) + */ + public synchronized boolean remove(ServiceStateChangeListener l) { + return listeners.remove(l); + } + + /** + * Reset the listener list + */ + public synchronized void reset() { + listeners.clear(); + } + + /** + * Change to a new state and notify all listeners. + * This method will block until all notifications have been issued. + * It caches the list of listeners before the notification begins, + * so additions or removal of listeners will not be visible. + * @param service the service that has changed state + */ + public void notifyListeners(Service service) { + //take a very fast snapshot of the callback list + //very much like CopyOnWriteArrayList, only more minimal + ServiceStateChangeListener[] callbacks; + synchronized (this) { + callbacks = listeners.toArray(new ServiceStateChangeListener[listeners.size()]); + } + //iterate through the listeners outside the synchronized method, + //ensuring that listener registration/unregistration doesn't break anything + for (ServiceStateChangeListener l : callbacks) { + l.stateChanged(service); + } + } + } + + /** + * JVM Shutdown hook for Service which will stop the + * Service gracefully in case of JVM shutdown. + * This hook uses a weak reference to the service, so + * does not cause services to be retained after they have + * been stopped and deferenced elsewhere. + */ + public static class ServiceShutdownHook implements Runnable { + + private WeakReference serviceRef; + private Thread hook; + + public ServiceShutdownHook(Service service) { + serviceRef = new WeakReference(service); + } + + public void register(int priority) { + unregister(); + hook = new Thread(this); + ShutdownHookManager.get().addShutdownHook(hook, priority); + } + + public void unregister() { + if (hook != null) { + try { + ShutdownHookManager.get().removeShutdownHook(hook); + } catch (IllegalStateException e) { + LOG.info("Failed to unregister shutdown hook",e); + } + hook = null; + } + } + + @Override + public void run() { + Service service = serviceRef.get(); + if (service == null) { + return; + } + try { + // Stop the Service + service.stop(); + } catch (Throwable t) { + LOG.info("Error stopping " + service.getName(), t); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java new file mode 100644 index 0000000..fc30376 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.yarn.YarnException; + +/** + * Subclass of {@link YarnException} that is raised on state change operations. + */ +public class ServiceStateException extends YarnException { + + public ServiceStateException(String message) { + super(message); + } + + public ServiceStateException(String message, Throwable cause) { + super(message, cause); + } + + public ServiceStateException(Throwable cause) { + super(cause); + } + + /** + * Convert any exception into a {@link RuntimeException}. + * If the caught exception already is of that type -including + * a {@link YarnException} it is typecast to a {@link RuntimeException} + * and returned. + * + * All other exception types are wrapped in a new instance of + * ServiceStateException + * @param fault exception or throwable + * @return a ServiceStateException to rethrow + */ + public static RuntimeException convert(Throwable fault) { + if (fault instanceof RuntimeException) { + return (RuntimeException) fault; + } else { + return new ServiceStateException(fault); + } + } + + /** + * Convert any exception into a {@link RuntimeException}. + * If the caught exception already is of that type -including + * a {@link YarnException} it is typecast to a {@link RuntimeException} + * and returned. + * + * All other exception types are wrapped in a new instance of + * ServiceStateException + * @param text text to use if a new exception is created + * @param fault exception or throwable + * @return a ServiceStateException to rethrow + */ + public static RuntimeException convert(String text, Throwable fault) { + if (fault instanceof RuntimeException) { + return (RuntimeException) fault; + } else { + return new ServiceStateException(text, fault); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java new file mode 100644 index 0000000..d77c9ab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +/** + * Implements the service state model for YARN. + */ +public class ServiceStateModel { + + /** + * Map of all valid state transitions + * [current] [proposed1, proposed2, ...] + */ + private static final boolean[][] statemap = + { + // uninited inited started stopped + /* uninited */ {false, true, false, true}, + /* inited */ {false, false, true, true}, + /* started */ {false, false, false, true}, + /* stopped */ {false, false, false, true}, + }; + + /** + * The state of the service + */ + private volatile Service.STATE state; + + /** + * The name of the service: used in exceptions + */ + private String name; + + /** + * Create the service state model in the {@link Service.STATE#NOTINITED} + * state. + */ + public ServiceStateModel(String name) { + this(name, Service.STATE.NOTINITED); + } + + /** + * Create a service state model instance in the chosen state + * @param state the starting state + */ + public ServiceStateModel(String name, Service.STATE state) { + this.state = state; + this.name = name; + } + + /** + * Query the service state. This is a non-blocking operation. + * @return the state + */ + public Service.STATE getState() { + return state; + } + + /** + * Query that the state is in a specific state + * @param proposed proposed new state + * @return the state + */ + public boolean inState(Service.STATE proposed) { + return state.equals(proposed); + } + + /** + * Verify that that a service is in a given state. + * @param expectedState the desired state + * @throws ServiceStateException if the service state is different from + * the desired state + */ + public void ensureCurrentState(Service.STATE expectedState) { + if (state != expectedState) { + throw new ServiceStateException(name+ ": for this operation, the " + + "current service state must be " + + expectedState + + " instead of " + state); + } + } + + /** + * Enter a state -thread safe. + * + * @param proposed proposed new state + * @return the original state + * @throws ServiceStateException if the transition is not permitted + */ + public synchronized Service.STATE enterState(Service.STATE proposed) { + checkStateTransition(name, state, proposed); + Service.STATE original = state; + //atomic write of the new state + state = proposed; + return original; + } + + /** + * Check that a state tansition is valid and + * throw an exception if not + * @param name name of the service (can be null) + * @param state current state + * @param proposed proposed new state + */ + public static void checkStateTransition(String name, + Service.STATE state, + Service.STATE proposed) { + if (!isValidStateTransition(state, proposed)) { + throw new ServiceStateException(name + " cannot enter state " + + proposed + " from state " + state); + } + } + + /** + * Is a state transition valid? + * There are no checks for current==proposed + * as that is considered a non-transition. + * + * using an array kills off all branch misprediction costs, at the expense + * of cache line misses. + * + * @param current current state + * @param proposed proposed new state + * @return true if the transition to a new state is valid + */ + public static boolean isValidStateTransition(Service.STATE current, + Service.STATE proposed) { + boolean[] row = statemap[current.getValue()]; + return row[proposed.getValue()]; + } + + /** + * return the state text as the toString() value + * @return the current state's description + */ + @Override + public String toString() { + return (name.isEmpty() ? "" : ((name) + ": ")) + + state.toString(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index b46ad3e..e1da355 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -49,21 +49,21 @@ public AbstractLivelinessMonitor(String name, Clock clock) { } @Override - public void start() { + protected void innerStart() throws Exception { assert !stopped : "starting when already stopped"; checkerThread = new Thread(new PingChecker()); checkerThread.setName("Ping Checker"); checkerThread.start(); - super.start(); + super.innerStart(); } @Override - public void stop() { + protected void innerStop() throws Exception { stopped = true; if (checkerThread != null) { checkerThread.interrupt(); } - super.stop(); + super.innerStop(); } protected abstract void expire(O ob); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java index 5907f39..0b69202 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java @@ -55,13 +55,7 @@ public BreakableService(boolean failOnInit, } private int convert(STATE state) { - switch (state) { - case NOTINITED: return 0; - case INITED: return 1; - case STARTED: return 2; - case STOPPED: return 3; - default: return 0; - } + return state.getValue(); } private void inc(STATE state) { @@ -75,29 +69,27 @@ public int getCount(STATE state) { private void maybeFail(boolean fail, String action) { if (fail) { - throw new BrokenLifecycleEvent(action); + throw new BrokenLifecycleEvent(this, action); } } @Override - public void init(Configuration conf) { + protected void innerInit(Configuration conf) throws Exception { inc(STATE.INITED); maybeFail(failOnInit, "init"); - super.init(conf); + super.innerInit(conf); } @Override - public void start() { + protected void innerStart() { inc(STATE.STARTED); maybeFail(failOnStart, "start"); - super.start(); } @Override - public void stop() { + protected void innerStop() { inc(STATE.STOPPED); maybeFail(failOnStop, "stop"); - super.stop(); } public void setFailOnInit(boolean failOnInit) { @@ -116,8 +108,13 @@ public void setFailOnStop(boolean failOnStop) { * The exception explicitly raised on a failure */ public static class BrokenLifecycleEvent extends RuntimeException { - BrokenLifecycleEvent(String action) { - super("Lifecycle Failure during " + action); + + final STATE state; + + public BrokenLifecycleEvent(Service service, String action) { + super("Lifecycle Failure during " + action + " state is " + + service.getServiceState()); + state = service.getServiceState(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java new file mode 100644 index 0000000..65c84f0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +/** + * A state change listener that logs the number of state change events received, + * and the last state invoked. + * + * It can be configured to fail during a state change event + */ +public class BreakableStateChangeListener + implements ServiceStateChangeListener { + + private final String name; + + public BreakableStateChangeListener() { + this( "BreakableStateChangeListener"); + } + + public BreakableStateChangeListener(String name) { + this.name = name; + } + + private int eventCount; + private int failureCount; + private Service lastService; + private Service.STATE lastState = Service.STATE.NOTINITED; + //no callbacks are ever received for this event, so it + //can be used as an 'undefined'. + private Service.STATE failingState = Service.STATE.NOTINITED; + + @Override + public synchronized void stateChanged(Service service) { + eventCount++; + lastService = service; + lastState = service.getServiceState(); + if (lastState == failingState) { + failureCount++; + throw new BreakableService.BrokenLifecycleEvent(service, + "Failure entering " + + lastState + + " for " + + service.getName()); + } + } + + public synchronized int getEventCount() { + return eventCount; + } + + public synchronized Service getLastService() { + return lastService; + } + + public synchronized Service.STATE getLastState() { + return lastState; + } + + public synchronized void setFailingState(Service.STATE failingState) { + this.failingState = failingState; + } + + public synchronized int getFailureCount() { + return failureCount; + } + + @Override + public synchronized String toString() { + return name + " - event count = " + eventCount + " last state " + lastState; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java new file mode 100644 index 0000000..c29be42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Test; + +/** + * Test global state changes. It is critical for all tests to clean up the + * global listener afterwards to avoid interfering with follow-on tests. + * + * One listener, {@link #listener} is defined which is automatically + * unregistered on cleanup. All other listeners must be unregistered in the + * finally clauses of the tests. + */ +public class TestGlobalStateChangeListener extends ServiceAssert { + + BreakableStateChangeListener listener = new BreakableStateChangeListener("listener"); + + + private void register() { + register(listener); + } + + private boolean unregister() { + return unregister(listener); + } + + private void register(ServiceStateChangeListener l) { + AbstractService.registerGlobalListener(l); + } + + private boolean unregister(ServiceStateChangeListener l) { + return AbstractService.unregisterGlobalListener(l); + } + + /** + * After every test case reset the list of global listeners. + */ + @After + public void cleanup() { + AbstractService.resetGlobalListeners(); + } + + /** + * Assert that the last state of the listener is that the test expected. + * @param breakable a breakable listener + * @param state the expected state + */ + public void assertListenerState(BreakableStateChangeListener breakable, + Service.STATE state) { + assertEquals("Wrong state in " + breakable, state, breakable.getLastState()); + } + + /** + * Assert that the number of state change notifications matches expectations. + * @param breakable the listener + * @param count the expected count. + */ + public void assertListenerEventCount(BreakableStateChangeListener breakable, + int count) { + assertEquals("Wrong event count in " + breakable, count, + breakable.getEventCount()); + } + + /** + * Test that register/unregister works + */ + @Test + public void testRegisterListener() { + register(); + assertTrue("listener not registered", unregister()); + } + + /** + * Test that double registration results in one registration only. + */ + @Test + public void testRegisterListenerTwice() { + register(); + register(); + assertTrue("listener not registered", unregister()); + //there should be no listener to unregister the second time + assertFalse("listener double registered", unregister()); + } + + /** + * Test that the {@link BreakableStateChangeListener} is picking up + * the state changes and that its last event field is as expected. + */ + @Test + public void testEventHistory() { + register(); + BreakableService service = new BreakableService(); + assertListenerState(listener, Service.STATE.NOTINITED); + assertEquals(0, listener.getEventCount()); + service.init(new Configuration()); + assertListenerState(listener, Service.STATE.INITED); + assertSame(service, listener.getLastService()); + assertListenerEventCount(listener, 1); + + service.start(); + assertListenerState(listener, Service.STATE.STARTED); + assertListenerEventCount(listener, 2); + + service.stop(); + assertListenerState(listener, Service.STATE.STOPPED); + assertListenerEventCount(listener, 3); + } + + /** + * This test triggers a failure in the listener - the expectation is that the + * service has already reached it's desired state, purely because the + * notifications take place afterwards. + * + */ + @Test + public void testListenerFailure() { + listener.setFailingState(Service.STATE.INITED); + register(); + BreakableStateChangeListener l2 = new BreakableStateChangeListener(); + register(l2); + BreakableService service = new BreakableService(); + service.init(new Configuration()); + //expected notifications to fail + + //still should record its invocation + assertListenerState(listener, Service.STATE.INITED); + assertListenerEventCount(listener, 1); + + //and second listener didn't get notified of anything + assertListenerEventCount(l2, 0); + + //service should still consider itself started + assertServiceStateInited(service); + service.start(); + service.stop(); + } + + /** + * Create a chain of listeners and set one in the middle to fail; verify that + * those in front got called, and those after did not. + */ + @Test + public void testListenerChain() { + + //create and register the listeners + LoggingStateChangeListener logListener = new LoggingStateChangeListener(); + register(logListener); + BreakableStateChangeListener l0 = new BreakableStateChangeListener("l0"); + register(l0); + listener.setFailingState(Service.STATE.STARTED); + register(); + BreakableStateChangeListener l3 = new BreakableStateChangeListener("l3"); + register(l3); + + //create and init a service. + BreakableService service = new BreakableService(); + service.init(new Configuration()); + assertServiceStateInited(service); + assertListenerState(l0, Service.STATE.INITED); + assertListenerState(listener, Service.STATE.INITED); + assertListenerState(l3, Service.STATE.INITED); + + service.start(); + //expect that listener l1 and the failing listener are in start, but + //not the final one + assertServiceStateStarted(service); + assertListenerState(l0, Service.STATE.STARTED); + assertListenerEventCount(l0, 2); + assertListenerState(listener, Service.STATE.STARTED); + assertListenerEventCount(listener, 2); + //this is the listener that is not expected to have been invoked + assertListenerState(l3, Service.STATE.INITED); + assertListenerEventCount(l3, 1); + + //stop the service + service.stop(); + //listeners are all updated + assertListenerEventCount(l0, 3); + assertListenerEventCount(listener, 3); + assertListenerEventCount(l3, 2); + //can all be unregistered in any order + unregister(logListener); + unregister(l0); + unregister(l3); + + //check that the listeners are all unregistered, even + //though they were registered in a different order. + //rather than do this by doing unregister checks, a new service is created + service = new BreakableService(); + //this service is initialized + service.init(new Configuration()); + //it is asserted that the event count has not changed for the unregistered + //listeners + assertListenerEventCount(l0, 3); + assertListenerEventCount(l3, 2); + //except for the one listener that was not unregistered, which + //has incremented by one + assertListenerEventCount(listener, 4); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java index c69b7b7..0637f6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java @@ -19,10 +19,13 @@ package org.apache.hadoop.yarn.service; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.junit.Test; public class TestServiceLifecycle extends ServiceAssert { + private static Log LOG = LogFactory.getLog(TestServiceLifecycle.class); /** * Walk the {@link BreakableService} through it's lifecycle, @@ -62,10 +65,10 @@ public void testInitTwice() throws Throwable { try { svc.init(new Configuration()); fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { + } catch (ServiceStateException e) { //expected } - assertStateCount(svc, Service.STATE.INITED, 2); + assertStateCount(svc, Service.STATE.INITED, 1); assertServiceConfigurationContains(svc, "test.init"); } @@ -81,18 +84,16 @@ public void testStartTwice() throws Throwable { try { svc.start(); fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { + } catch (ServiceStateException e) { //expected } - assertStateCount(svc, Service.STATE.STARTED, 2); + assertStateCount(svc, Service.STATE.STARTED, 1); } /** * Verify that when a service is stopped more than once, no exception - * is thrown, and the counter is incremented. - * This is because the state change operations happen after the counter in - * the subclass is incremented, even though stop is meant to be a no-op + * is thrown. * @throws Throwable if necessary */ @Test @@ -103,7 +104,7 @@ public void testStopTwice() throws Throwable { svc.stop(); assertStateCount(svc, Service.STATE.STOPPED, 1); svc.stop(); - assertStateCount(svc, Service.STATE.STOPPED, 2); + assertStateCount(svc, Service.STATE.STOPPED, 1); } @@ -124,12 +125,12 @@ public void testStopFailedInit() throws Throwable { //expected } //the service state wasn't passed - assertServiceStateCreated(svc); + assertServiceStateStopped(svc); assertStateCount(svc, Service.STATE.INITED, 1); + assertStateCount(svc, Service.STATE.STOPPED, 1); //now try to stop svc.stop(); - //even after the stop operation, we haven't entered the state - assertServiceStateCreated(svc); + assertStateCount(svc, Service.STATE.STOPPED, 1); } @@ -151,18 +152,12 @@ public void testStopFailedStart() throws Throwable { //expected } //the service state wasn't passed - assertServiceStateInited(svc); - assertStateCount(svc, Service.STATE.INITED, 1); - //now try to stop - svc.stop(); - //even after the stop operation, we haven't entered the state - assertServiceStateInited(svc); + assertServiceStateStopped(svc); } /** * verify that when a service fails during its stop operation, - * its state does not change, and the subclass invocation counter - * increments. + * its state does not change. * @throws Throwable if necessary */ @Test @@ -177,42 +172,93 @@ public void testFailingStop() throws Throwable { //expected } assertStateCount(svc, Service.STATE.STOPPED, 1); - assertServiceStateStarted(svc); - //now try again, and expect it to happen again - try { - svc.stop(); - fail("Expected a failure, got " + svc); - } catch (BreakableService.BrokenLifecycleEvent e) { - //expected - } - assertStateCount(svc, Service.STATE.STOPPED, 2); } /** - * verify that when a service that is not started is stopped, its counter - * of stop calls is still incremented-and the service remains in its - * original state.. + * verify that when a service that is not started is stopped, the + * service enters the stopped state * @throws Throwable on a failure */ @Test public void testStopUnstarted() throws Throwable { BreakableService svc = new BreakableService(); svc.stop(); - assertServiceStateCreated(svc); + assertServiceStateStopped(svc); + assertStateCount(svc, Service.STATE.INITED, 0); assertStateCount(svc, Service.STATE.STOPPED, 1); + } - //stop failed, now it can be initialised - svc.init(new Configuration()); + /** + * Show that if the service failed during an init + * operation, stop was called. + */ - //and try to stop again, with no state change but an increment + @Test + public void testStopFailingInitAndStop() throws Throwable { + BreakableService svc = new BreakableService(true, false, true); + svc.register(new LoggingStateChangeListener()); + try { + svc.init(new Configuration()); + fail("Expected a failure, got " + svc); + } catch (BreakableService.BrokenLifecycleEvent e) { + assertEquals(Service.STATE.INITED, e.state); + } + //the service state is stopped + assertServiceStateStopped(svc); + assertEquals(Service.STATE.INITED, svc.getFailureState()); + + Throwable failureCause = svc.getFailureCause(); + assertNotNull("Null failure cause in " + svc, failureCause); + BreakableService.BrokenLifecycleEvent cause = + (BreakableService.BrokenLifecycleEvent) failureCause; + assertNotNull("null state in " + cause + " raised by " + svc, cause.state); + assertEquals(Service.STATE.INITED, cause.state); + } + + @Test + public void testInitNullConf() throws Throwable { + BreakableService svc = new BreakableService(false, false, false); + try { + svc.init(null); + LOG.warn("Null Configurations are permitted "); + } catch (ServiceStateException e) { + //expected + } + } + + @Test + public void testServiceNotifications() throws Throwable { + BreakableService svc = new BreakableService(false, false, false); + BreakableStateChangeListener listener = new BreakableStateChangeListener(); + svc.register(listener); + svc.init(new Configuration()); + assertEquals(1, listener.getEventCount()); + svc.start(); + assertEquals(2, listener.getEventCount()); svc.stop(); - assertServiceStateInited(svc); - assertStateCount(svc, Service.STATE.STOPPED, 2); + assertEquals(3, listener.getEventCount()); + svc.stop(); + assertEquals(3, listener.getEventCount()); + } - //once started, the service can be stopped reliably + @Test + public void testServiceFailingNotifications() throws Throwable { + BreakableService svc = new BreakableService(false, false, false); + BreakableStateChangeListener listener = new BreakableStateChangeListener(); + listener.setFailingState(Service.STATE.STARTED); + svc.register(listener); + svc.init(new Configuration()); + assertEquals(1, listener.getEventCount()); + //start this; the listener failed but this won't show svc.start(); - ServiceOperations.stop(svc); - assertServiceStateStopped(svc); - assertStateCount(svc, Service.STATE.STOPPED, 3); + //counter went up + assertEquals(2, listener.getEventCount()); + assertEquals(1, listener.getFailureCount()); + //stop the service -this doesn't fail + svc.stop(); + assertEquals(3, listener.getEventCount()); + assertEquals(1, listener.getFailureCount()); + svc.stop(); } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceOperations.java index 14aa1f5..55f53d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceOperations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceOperations.java @@ -65,7 +65,7 @@ public void testInitTwice() throws Throwable { try { ServiceOperations.init(svc, new Configuration()); fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { + } catch (ServiceStateException e) { //expected } assertStateCount(svc, Service.STATE.INITED, 1); @@ -85,7 +85,7 @@ public void testStartTwice() throws Throwable { try { ServiceOperations.start(svc); fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { + } catch (ServiceStateException e) { //expected } assertStateCount(svc, Service.STATE.STARTED, 1); @@ -121,7 +121,7 @@ public void testDeployNotIdempotent() throws Throwable { try { ServiceOperations.deploy(svc, new Configuration()); fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { + } catch (ServiceStateException e) { //expected } //verify state and values are unchanged @@ -133,12 +133,11 @@ public void testDeployNotIdempotent() throws Throwable { /** * Test that the deploy operation can fail part way through, in which - * case the service is in the state that it was in before the failing - * state method was called. + * case the service ends up stopped * @throws Throwable on any failure. */ @Test - public void testDeployNotAtomic() throws Throwable { + public void testDeployFailure() throws Throwable { //this instance is set to fail in the start() call. BreakableService svc = new BreakableService(false, true, false); try { @@ -148,14 +147,14 @@ public void testDeployNotAtomic() throws Throwable { //expected } //now in the inited state - assertServiceStateInited(svc); + assertServiceStateStopped(svc); assertStateCount(svc, Service.STATE.INITED, 1); assertStateCount(svc, Service.STATE.STARTED, 1); - //try again -expect a failure as the service is now inited. + //try again -expect a failure as the service is now stopped. try { ServiceOperations.deploy(svc, new Configuration()); fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { + } catch (ServiceStateException e) { //expected } } @@ -187,10 +186,10 @@ public void testStopTwice() throws Throwable { public void testStopInit() throws Throwable { BreakableService svc = new BreakableService(); ServiceOperations.stop(svc); - assertServiceStateCreated(svc); - assertStateCount(svc, Service.STATE.STOPPED, 0); + assertServiceStateStopped(svc); + assertStateCount(svc, Service.STATE.STOPPED, 1); ServiceOperations.stop(svc); - assertStateCount(svc, Service.STATE.STOPPED, 0); + assertStateCount(svc, Service.STATE.STOPPED, 1); } @@ -210,14 +209,10 @@ public void testStopFailedInit() throws Throwable { } catch (BreakableService.BrokenLifecycleEvent e) { //expected } - //the service state wasn't passed - assertServiceStateCreated(svc); + //the service ended up stopped + assertServiceStateStopped(svc); //the init state got invoked once assertStateCount(svc, Service.STATE.INITED, 1); - //now try to stop - ServiceOperations.stop(svc); - //even after the stop operation, we haven't entered the state - assertServiceStateCreated(svc); } @@ -238,13 +233,11 @@ public void testStopFailedStart() throws Throwable { } catch (BreakableService.BrokenLifecycleEvent e) { //expected } - //the service state wasn't passed - assertServiceStateInited(svc); + //the service state stopped + assertServiceStateStopped(svc); assertStateCount(svc, Service.STATE.INITED, 1); //now try to stop ServiceOperations.stop(svc); - //even after the stop operation, we haven't entered the state - assertServiceStateInited(svc); } /** @@ -270,9 +263,7 @@ public void testFailingStop() throws Throwable { assertStateCount(svc, Service.STATE.STOPPED, 1); //now try to stop, this time doing it quietly Exception exception = ServiceOperations.stopQuietly(svc); - assertTrue("Wrong exception type : " + exception, - exception instanceof BreakableService.BrokenLifecycleEvent); - assertStateCount(svc, Service.STATE.STOPPED, 2); + assertNull(exception); } @@ -286,27 +277,10 @@ public void testFailingStop() throws Throwable { public void testStopUnstarted() throws Throwable { BreakableService svc = new BreakableService(); - //invocation in NOTINITED state should be no-op - ServiceOperations.stop(svc); - assertServiceStateCreated(svc); - assertStateCount(svc, Service.STATE.STOPPED, 0); - - //stop failed, now it can be initialised - ServiceOperations.init(svc, new Configuration()); - - //again, no-op - ServiceOperations.stop(svc); - assertServiceStateInited(svc); - assertStateCount(svc, Service.STATE.STOPPED, 0); - - //once started, the service can be stopped reliably - ServiceOperations.start(svc); + //NOTINITED.stop()->STOP ServiceOperations.stop(svc); assertServiceStateStopped(svc); assertStateCount(svc, Service.STATE.STOPPED, 1); - - //now stop one more time - ServiceOperations.stop(svc); assertStateCount(svc, Service.STATE.STOPPED, 1); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java index 67c2de1..d69c748 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java @@ -21,10 +21,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.service.CompositeService; +import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service.STATE; +import org.apache.hadoop.yarn.service.ServiceStateException; import org.junit.Before; import org.junit.Test; @@ -34,6 +38,16 @@ private static final int FAILED_SERVICE_SEQ_NUMBER = 2; + private static final Log LOG = LogFactory.getLog(TestCompositeService.class); + + /** + * flag to state policy of CompositeService, and hence + * what to look for after trying to stop a service from another state + * (e.g inited) + */ + private static final boolean STOP_ONLY_STARTED_SERVICES = + CompositeServiceImpl.isPolicyToStopOnlyStartedServices(); + @Before public void setup() { CompositeServiceImpl.resetCounter(); @@ -59,6 +73,9 @@ public void testCallSequence() { // Initialise the composite service serviceManager.init(conf); + //verify they were all inited + assertInState(STATE.INITED, services); + // Verify the init() call sequence numbers for every service for (int i = 0; i < NUM_OF_SERVICES; i++) { assertEquals("For " + services[i] @@ -67,11 +84,11 @@ public void testCallSequence() { } // Reset the call sequence numbers - for (int i = 0; i < NUM_OF_SERVICES; i++) { - services[i].reset(); - } + resetServices(services); serviceManager.start(); + //verify they were all started + assertInState(STATE.STARTED, services); // Verify the start() call sequence numbers for every service for (int i = 0; i < NUM_OF_SERVICES; i++) { @@ -79,13 +96,12 @@ public void testCallSequence() { + " service, start() call sequence number should have been ", i, services[i].getCallSequenceNumber()); } + resetServices(services); - // Reset the call sequence numbers - for (int i = 0; i < NUM_OF_SERVICES; i++) { - services[i].reset(); - } serviceManager.stop(); + //verify they were all stopped + assertInState(STATE.STOPPED, services); // Verify the stop() call sequence numbers for every service for (int i = 0; i < NUM_OF_SERVICES; i++) { @@ -104,6 +120,13 @@ public void testCallSequence() { } } + private void resetServices(CompositeServiceImpl[] services) { + // Reset the call sequence numbers + for (int i = 0; i < NUM_OF_SERVICES; i++) { + services[i].reset(); + } + } + @Test public void testServiceStartup() { ServiceManager serviceManager = new ServiceManager("ServiceManager"); @@ -128,20 +151,25 @@ public void testServiceStartup() { // Start the composite service try { serviceManager.start(); - fail("Exception should have been thrown due to startup failure of last service"); + fail( + "Exception should have been thrown due to startup failure of last service"); } catch (YarnException e) { - for (int i = 0; i < NUM_OF_SERVICES - 1; i++) { - if (i >= FAILED_SERVICE_SEQ_NUMBER) { - // Failed service state should be INITED - assertEquals("Service state should have been ", STATE.INITED, - services[NUM_OF_SERVICES - 1].getServiceState()); - } else { - assertEquals("Service state should have been ", STATE.STOPPED, - services[i].getServiceState()); + if (STOP_ONLY_STARTED_SERVICES) { + for (int i = 0; i < NUM_OF_SERVICES - 1; i++) { + //everything already started is stopped + assertInState(STATE.STOPPED, services,0,FAILED_SERVICE_SEQ_NUMBER); + //the service that failed has also been stopped + assertInState(STATE.STOPPED, services[FAILED_SERVICE_SEQ_NUMBER]); + //the not-yet started services are left alone + assertInState(STATE.INITED, services,FAILED_SERVICE_SEQ_NUMBER+1, + NUM_OF_SERVICES); } + } else { + //all services have been stopped + assertInState(STATE.STOPPED, services); } - } + } @Test @@ -171,14 +199,136 @@ public void testServiceStop() { try { serviceManager.stop(); } catch (YarnException e) { - for (int i = 0; i < NUM_OF_SERVICES - 1; i++) { - assertEquals("Service state should have been ", STATE.STOPPED, - services[NUM_OF_SERVICES].getServiceState()); + + } + assertInState(STATE.STOPPED, services); + } + + /** + * Assert that all services are in the same expected state + * @param expected expected state value + * @param services services to examine + */ + private void assertInState(STATE expected, CompositeServiceImpl[] services) { + assertInState(expected, services,0, services.length); + } + + /** + * Assert that all services are in the same expected state + * @param expected expected state value + * @param services services to examine + * @param start start offset + * @param finish finish offset: the count stops before this number + */ + private void assertInState(STATE expected, + CompositeServiceImpl[] services, + int start, int finish) { + for (int i = start; i < finish; i++) { + Service service = services[i]; + assertInState(expected, service); + } + } + + private void assertInState(STATE expected, Service service) { + assertEquals("Service state should have been " + expected + " in " + + service, + expected, + service.getServiceState()); + } + + /** + * Shut down from not-inited + */ + @Test + public void testServiceStopFromNotInited() { + ServiceManager serviceManager = new ServiceManager("ServiceManager"); + + // Add services + for (int i = 0; i < NUM_OF_SERVICES; i++) { + CompositeServiceImpl service = new CompositeServiceImpl(i); + serviceManager.addTestService(service); + } + + CompositeServiceImpl[] services = serviceManager.getServices().toArray( + new CompositeServiceImpl[0]); + serviceManager.stop(); + if (STOP_ONLY_STARTED_SERVICES) { + //this policy => no services were stopped + assertInState(STATE.NOTINITED, services); + } else { + //this policy => all services were stopped in reverse order + assertInState(STATE.STOPPED, services); + // Verify the stop() call sequence numbers for every service + for (int i = 0; i < NUM_OF_SERVICES; i++) { + assertEquals("For " + services[i] + + + " service, stop() call sequence number should have been ", + ((NUM_OF_SERVICES - 1) - i), + services[i].getCallSequenceNumber()); } } } - public static class CompositeServiceImpl extends CompositeService { + /** + * Shut down from inited + */ + @Test + public void testServiceStopFromInited() { + ServiceManager serviceManager = new ServiceManager("ServiceManager"); + + // Add services + for (int i = 0; i < NUM_OF_SERVICES; i++) { + CompositeServiceImpl service = new CompositeServiceImpl(i); + serviceManager.addTestService(service); + } + + CompositeServiceImpl[] services = serviceManager.getServices().toArray( + new CompositeServiceImpl[0]); + serviceManager.init(new Configuration()); + serviceManager.stop(); + if (STOP_ONLY_STARTED_SERVICES) { + //this policy => no services were stopped + assertInState(STATE.INITED, services); + } else { + assertInState(STATE.STOPPED, services); + } + } + + /** + * Use a null configuration & expect a failure + * @throws Throwable + */ + @Test + public void testInitNullConf() throws Throwable { + ServiceManager serviceManager = new ServiceManager("testInitNullConf"); + + CompositeServiceImpl service = new CompositeServiceImpl(0); + serviceManager.addTestService(service); + try { + serviceManager.init(null); + LOG.warn("Null Configurations are permitted " + serviceManager); + } catch (ServiceStateException e) { + //expected + } + } + + /** + * Walk the service through their lifecycle without any children; + * verify that it all works. + */ + @Test + public void testServiceLifecycleNoChildrenl() { + ServiceManager serviceManager = new ServiceManager("ServiceManager"); + serviceManager.init(new Configuration()); + serviceManager.start(); + serviceManager.stop(); + } + + public static class CompositeServiceImpl extends CompositeService { + + public static boolean isPolicyToStopOnlyStartedServices() { + return STOP_ONLY_STARTED_SERVICES; + } private static int counter = -1; @@ -193,30 +343,30 @@ public CompositeServiceImpl(int sequenceNumber) { } @Override - public synchronized void init(Configuration conf) { + protected void innerInit(Configuration conf) throws Exception { counter++; callSequenceNumber = counter; - super.init(conf); + super.innerInit(conf); } @Override - public synchronized void start() { + protected void innerStart() throws Exception { if (throwExceptionOnStart) { throw new YarnException("Fake service start exception"); } counter++; callSequenceNumber = counter; - super.start(); + super.innerStart(); } @Override - public synchronized void stop() { + protected void innerStop() throws Exception { counter++; callSequenceNumber = counter; if (throwExceptionOnStop) { throw new YarnException("Fake service stop exception"); } - super.stop(); + super.innerStop(); } public static int getCounter() {