diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index a8a9be4..1f29e03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -82,24 +82,24 @@ public void run() { } @Override - public synchronized void init(Configuration conf) { + protected void serviceInit(Configuration conf) throws Exception { this.exitOnDispatchException = conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); - super.init(conf); + super.serviceInit(conf); } @Override - public void start() { + protected void serviceStart() throws Exception { //start all the components - super.start(); + super.serviceStart(); eventHandlingThread = new Thread(createThread()); eventHandlingThread.setName("AsyncDispatcher event handler"); eventHandlingThread.start(); } @Override - public void stop() { + protected void serviceStop() throws Exception { stopped = true; if (eventHandlingThread != null) { eventHandlingThread.interrupt(); @@ -111,7 +111,7 @@ public void stop() { } // stop all the components - super.stop(); + super.serviceStop(); } @SuppressWarnings("unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index c8603ab..c9dc580 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -125,8 +125,9 @@ private static void logIOException(String comment, IOException e) { public AggregatedLogDeletionService() { super(AggregatedLogDeletionService.class.getName()); } - - public void start() { + + @Override + protected void serviceStart() throws Exception { Configuration conf = getConfig(); if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { @@ -150,14 +151,14 @@ public void start() { TimerTask task = new LogDeletionTask(conf, retentionSecs); timer = new Timer(); timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); - super.start(); + super.serviceStart(); } @Override - public void stop() { + protected void serviceStop() throws Exception { if(timer != null) { timer.cancel(); } - super.stop(); + super.serviceStop(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java index eeea1e1..4678e26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/AbstractService.java @@ -18,26 +18,33 @@ package org.apache.hadoop.yarn.service; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +/** + * This is the base implementation class for YARN services. + */ public abstract class AbstractService implements Service { private static final Log LOG = LogFactory.getLog(AbstractService.class); /** - * Service state: initially {@link STATE#NOTINITED}. - */ - private STATE state = STATE.NOTINITED; - - /** * Service name. */ private final String name; + + /** service state */ + private final ServiceStateModel stateModel; + /** * Service start time. Will be zero until the service is started. */ @@ -46,14 +53,49 @@ /** * The configuration. Will be null until the service is initialized. */ - private Configuration config; + private volatile Configuration config; /** * List of state change listeners; it is final to ensure * that it will never be null. */ - private List listeners = - new ArrayList(); + private final ServiceOperations.ServiceListeners listeners + = new ServiceOperations.ServiceListeners(); + /** + * Static listeners to all events across all services + */ + private static ServiceOperations.ServiceListeners globalListeners + = new ServiceOperations.ServiceListeners(); + + /** + * The cause of any failure -will be null. + * if a service did not stop due to a failure. + */ + private Exception failureCause; + + /** + * the state in which the service was when it failed. + * Only valid when the service is stopped due to a failure + */ + private STATE failureState = null; + + /** + * object used to co-ordinate {@link #waitForServiceToStop(long)} + * across threads. + */ + private final AtomicBoolean terminationNotification = + new AtomicBoolean(false); + + /** + * History of lifecycle transitions + */ + private final List lifecycleHistory + = new ArrayList(5); + + /** + * Map of blocking dependencies + */ + private final Map blockerMap = new HashMap(); /** * Construct the service. @@ -61,68 +103,277 @@ */ public AbstractService(String name) { this.name = name; + stateModel = new ServiceStateModel(name); + } + + @Override + public final STATE getServiceState() { + return stateModel.getState(); + } + + @Override + public final synchronized Throwable getFailureCause() { + return failureCause; } @Override - public synchronized STATE getServiceState() { - return state; + public synchronized STATE getFailureState() { + return failureState; + } + + /** + * Set the configuration for this service. + * This method is called during {@link #init(Configuration)} + * and should only be needed if for some reason a service implementation + * needs to override that initial setting -for example replacing + * it with a new subclass of {@link Configuration} + * @param conf new configuration. + */ + protected void setConfig(Configuration conf) { + this.config = conf; } /** * {@inheritDoc} - * @throws IllegalStateException if the current service state does not permit - * this action + * This invokes {@link #serviceInit} + * @param conf the configuration of the service. This must not be null + * @throws ServiceStateException if the configuration was null, + * the state change not permitted, or something else went wrong */ @Override public synchronized void init(Configuration conf) { - ensureCurrentState(STATE.NOTINITED); - this.config = conf; - changeState(STATE.INITED); - LOG.info("Service:" + getName() + " is inited."); + if (conf == null) { + throw new ServiceStateException("Cannot initialize service " + + getName() + ": null configuration"); + } + if (enterState(STATE.INITED) != STATE.INITED) { + setConfig(conf); + try { + serviceInit(config); + if (isInState(STATE.INITED)) { + //if the service ended up here during init, + //notify the listeners + notifyListeners(); + } + } catch (Exception e) { + noteFailure(e); + ServiceOperations.stopQuietly(LOG, this); + throw ServiceStateException.convert(e); + } + } } /** * {@inheritDoc} - * @throws IllegalStateException if the current service state does not permit + * @throws ServiceStateException if the current service state does not permit * this action */ @Override public synchronized void start() { - startTime = System.currentTimeMillis(); - ensureCurrentState(STATE.INITED); - changeState(STATE.STARTED); - LOG.info("Service:" + getName() + " is started."); + //enter the started state + if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) { + try { + startTime = System.currentTimeMillis(); + serviceStart(); + if (isInState(STATE.STARTED)) { + //if the service started (and isn't now in a later state), notify + if (LOG.isDebugEnabled()) { + LOG.debug("Service " + getName() + " is started"); + } + notifyListeners(); + } + } catch (Exception e) { + noteFailure(e); + ServiceOperations.stopQuietly(LOG, this); + throw ServiceStateException.convert(e); + } + } } /** * {@inheritDoc} - * @throws IllegalStateException if the current service state does not permit - * this action */ @Override public synchronized void stop() { - if (state == STATE.STOPPED || - state == STATE.INITED || - state == STATE.NOTINITED) { - // already stopped, or else it was never - // started (eg another service failing canceled startup) + //this operation is only invoked if the service is not already stopped; + // it is not an error + //to go STOPPED->STOPPED -it is just a no-op + if (enterState(STATE.STOPPED) != STATE.STOPPED) { + try { + serviceStop(); + } catch (Exception e) { + //stop-time exceptions are logged if they are the first one, + noteFailure(e); + throw ServiceStateException.convert(e); + } finally { + //report that the service has terminated + terminationNotification.set(true); + synchronized (terminationNotification) { + terminationNotification.notifyAll(); + } + //notify anything listening for events + notifyListeners(); + } + } else { + //already stopped: note it + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring re-entrant call to stop()"); + } + } + } + + /** + * Relay to {@link #stop()} + * @throws IOException + */ + @Override + public final void close() throws IOException { + stop(); + } + + /** + * Failure handling: record the exception + * that triggered it -if there was not one already. + * Services are free to call this themselves. + * @param exception the exception + */ + protected final void noteFailure(Exception exception) { + if (LOG.isDebugEnabled()) { + LOG.debug("noteFailure " + exception, null); + } + if (exception == null) { + //make sure failure logic doesn't itself cause problems return; } - ensureCurrentState(STATE.STARTED); - changeState(STATE.STOPPED); - LOG.info("Service:" + getName() + " is stopped."); + //record the failure details, and log it + synchronized (this) { + if (failureCause == null) { + failureCause = exception; + failureState = getServiceState(); + LOG.info("Service " + getName() + + " failed in state " + failureState + + "; cause: " + exception, + exception); + } + } + } + + @Override + public final boolean waitForServiceToStop(long timeout) { + boolean completed = terminationNotification.get(); + while (!completed) { + try { + synchronized(terminationNotification) { + terminationNotification.wait(timeout); + } + // here there has been a timeout, the object has terminated, + // or there has been a spurious wakeup (which we ignore) + completed = true; + } catch (InterruptedException e) { + // interrupted; have another look at the flag + completed = terminationNotification.get(); + } + } + return terminationNotification.get(); + } + + /* ===================================================================== */ + /* Override Points */ + /* ===================================================================== */ + + /** + * All initialization code needed by a service. + * + * This method will only ever be called once during the lifecycle of + * a specific service instance. + * + * Implementations do not need to be synchronized as the logic + * in {@link #init(Configuration)} prevents re-entrancy. + * + * The base implementation checks to see if the subclass has created + * a new configuration instance, and if so, updates the base class value + * @param conf configuration + * @throws Exception on a failure -these will be caught, + * possibly wrapped, and wil; trigger a service stop + */ + protected void serviceInit(Configuration conf) throws Exception { + if (conf != config) { + LOG.debug("Config has been overridden during init"); + setConfig(conf); + } + } + + /** + * Actions called during the INITED to STARTED transition. + * + * This method will only ever be called once during the lifecycle of + * a specific service instance. + * + * Implementations do not need to be synchronized as the logic + * in {@link #start()} prevents re-entrancy. + * + * @throws Exception if needed -these will be caught, + * wrapped, and trigger a service stop + */ + protected void serviceStart() throws Exception { + + } + + /** + * Actions called during the transition to the STOPPED state. + * + * This method will only ever be called once during the lifecycle of + * a specific service instance. + * + * Implementations do not need to be synchronized as the logic + * in {@link #stop()} prevents re-entrancy. + * + * Implementations MUST write this to be robust against failures, including + * checks for null references -and for the first failure to not stop other + * attempts to shut down parts of the service. + * + * @throws Exception if needed -these will be caught and logged. + */ + protected void serviceStop() throws Exception { + } @Override - public synchronized void register(ServiceStateChangeListener l) { + public void register(ServiceStateChangeListener l) { listeners.add(l); } @Override - public synchronized void unregister(ServiceStateChangeListener l) { + public void unregister(ServiceStateChangeListener l) { listeners.remove(l); } + /** + * Register a global listener, which receives notifications + * from the state change events of all services in the JVM + * @param l listener + */ + public static void registerGlobalListener(ServiceStateChangeListener l) { + globalListeners.add(l); + } + + /** + * unregister a global listener. + * @param l listener to unregister + * @return true if the listener was found (and then deleted) + */ + public static boolean unregisterGlobalListener(ServiceStateChangeListener l) { + return globalListeners.remove(l); + } + + /** + * Package-scoped method for testing -resets the global listener list + */ + @VisibleForTesting + static void resetGlobalListeners() { + globalListeners.reset(); + } + @Override public String getName() { return name; @@ -139,28 +390,92 @@ public long getStartTime() { } /** - * Verify that that a service is in a given state. - * @param currentState the desired state - * @throws IllegalStateException if the service state is different from - * the desired state + * Notify local and global listeners of state changes. + * Exceptions raised by listeners are NOT passed up. */ - private void ensureCurrentState(STATE currentState) { - ServiceOperations.ensureCurrentState(state, currentState); + private void notifyListeners() { + try { + listeners.notifyListeners(this); + globalListeners.notifyListeners(this); + } catch (Throwable e) { + LOG.warn("Exception while notifying listeners of " + this + ": " + e, + e); + } + } + + /** + * Add a state change event to the lifecycle history + */ + private void recordLifecycleEvent() { + LifecycleEvent event = new LifecycleEvent(); + event.time = System.currentTimeMillis(); + event.state = getServiceState(); + lifecycleHistory.add(event); + } + + @Override + public synchronized List getLifecycleHistory() { + return new ArrayList(lifecycleHistory); + } + + /** + * Enter a state; record this via {@link #recordLifecycleEvent} + * and log at the info level. + * @param newState the proposed new state + * @return the original state + * it wasn't already in that state, and the state model permits state re-entrancy. + */ + private STATE enterState(STATE newState) { + assert stateModel != null : "null state in " + name + " " + this.getClass(); + STATE oldState = stateModel.enterState(newState); + if (oldState != newState) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Service: " + getName() + " entered state " + getServiceState()); + } + recordLifecycleEvent(); + } + return oldState; + } + + @Override + public final boolean isInState(Service.STATE expected) { + return stateModel.isInState(expected); + } + + @Override + public String toString() { + return "Service " + name + " in state " + stateModel; + } + + /** + * Put a blocker to the blocker map -replacing any + * with the same name. + * @param name blocker name + * @param details any specifics on the block. This must be non-null. + */ + protected void putBlocker(String name, String details) { + synchronized (blockerMap) { + blockerMap.put(name, details); + } } /** - * Change to a new state and notify all listeners. - * This is a private method that is only invoked from synchronized methods, - * which avoid having to clone the listener list. It does imply that - * the state change listener methods should be short lived, as they - * will delay the state transition. - * @param newState new service state + * Remove a blocker from the blocker map - + * this is a no-op if the blocker is not present + * @param name the name of the blocker */ - private void changeState(STATE newState) { - state = newState; - //notify listeners - for (ServiceStateChangeListener l : listeners) { - l.stateChanged(this); + public void removeBlocker(String name) { + synchronized (blockerMap) { + blockerMap.remove(name); + } + } + + @Override + public Map getBlockers() { + synchronized (blockerMap) { + Map map = new HashMap(blockerMap); + return map; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java index 26a091d..0e6603b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java @@ -19,14 +19,12 @@ package org.apache.hadoop.yarn.service; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.YarnRuntimeException; /** * Composition of services. @@ -35,72 +33,115 @@ private static final Log LOG = LogFactory.getLog(CompositeService.class); - private List serviceList = new ArrayList(); + /** + * Policy on shutdown: attempt to close everything (purest) or + * only try to close started services (which assumes + * that the service implementations may not handle the stop() operation + * except when started. + * Irrespective of this policy, if a child service fails during + * its init() or start() operations, it will have stop() called on it. + */ + protected static final boolean STOP_ONLY_STARTED_SERVICES = false; + + private final List serviceList = new ArrayList(); public CompositeService(String name) { super(name); } - public Collection getServices() { - return Collections.unmodifiableList(serviceList); + /** + * Get an unmodifiable list of services + * @return a list of child services at the time of invocation - + * added services will not be picked up. + */ + public List getServices() { + synchronized (serviceList) { + return Collections.unmodifiableList(serviceList); + } } - protected synchronized void addService(Service service) { - serviceList.add(service); + protected void addService(Service service) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding service " + service.getName()); + } + synchronized (serviceList) { + serviceList.add(service); + } } protected synchronized boolean removeService(Service service) { - return serviceList.remove(service); + synchronized (serviceList) { + return serviceList.add(service); + } } - public synchronized void init(Configuration conf) { - for (Service service : serviceList) { + protected void serviceInit(Configuration conf) throws Exception { + List services = getServices(); + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": initing services, size=" + services.size()); + } + for (Service service : services) { service.init(conf); } - super.init(conf); + super.serviceInit(conf); } - public synchronized void start() { - int i = 0; - try { - for (int n = serviceList.size(); i < n; i++) { - Service service = serviceList.get(i); - service.start(); - } - super.start(); - } catch (Throwable e) { - LOG.error("Error starting services " + getName(), e); - // Note that the state of the failed service is still INITED and not - // STARTED. Even though the last service is not started completely, still - // call stop() on all services including failed service to make sure cleanup - // happens. - stop(i); - throw new YarnRuntimeException("Failed to Start " + getName(), e); + protected void serviceStart() throws Exception { + List services = getServices(); + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": starting services, size=" + services.size()); } - + for (Service service : services) { + // start the service. If this fails that service + // will be stopped and an exception raised + service.start(); + } + super.serviceStart(); } - public synchronized void stop() { - if (this.getServiceState() == STATE.STOPPED) { - // The base composite-service is already stopped, don't do anything again. - return; + protected void serviceStop() throws Exception { + //stop all services that were started + int numOfServicesToStop = serviceList.size(); + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": stopping services, size=" + numOfServicesToStop); } - if (serviceList.size() > 0) { - stop(serviceList.size() - 1); - } - super.stop(); + stop(numOfServicesToStop, STOP_ONLY_STARTED_SERVICES); + super.serviceStop(); } - private synchronized void stop(int numOfServicesStarted) { - // stop in reserve order of start - for (int i = numOfServicesStarted; i >= 0; i--) { - Service service = serviceList.get(i); - try { - service.stop(); - } catch (Throwable t) { - LOG.info("Error stopping " + service.getName(), t); + /** + * Stop the services in reverse order + * + * @param numOfServicesStarted index from where the stop should work + * @param stopOnlyStartedServices flag to say "only start services that are + * started, not those that are NOTINITED or INITED. + * @throws RuntimeException the first exception raised during the + * stop process -after all services are stopped + */ + private synchronized void stop(int numOfServicesStarted, + boolean stopOnlyStartedServices) { + // stop in reverse order of start + Exception firstException = null; + List services = getServices(); + for (int i = numOfServicesStarted - 1; i >= 0; i--) { + Service service = services.get(i); + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping service #" + i + ": " + service); + } + STATE state = service.getServiceState(); + //depending on the stop police + if (state == STATE.STARTED + || (!stopOnlyStartedServices && state == STATE.INITED)) { + Exception ex = ServiceOperations.stopQuietly(LOG, service); + if (ex != null && firstException == null) { + firstException = ex; + } } } + //after stopping all services, rethrow the first exception raised + if (firstException != null) { + throw ServiceStateException.convert(firstException); + } } /** @@ -117,13 +158,8 @@ public CompositeServiceShutdownHook(CompositeService compositeService) { @Override public void run() { - try { - // Stop the Composite Service - compositeService.stop(); - } catch (Throwable t) { - LOG.info("Error stopping " + compositeService.getName(), t); - } + ServiceOperations.stopQuietly(compositeService); } } - + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java index 314d664..07aafaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/FilterService.java @@ -20,6 +20,10 @@ import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import java.util.List; +import java.util.Map; + public class FilterService implements Service { private final Service service; @@ -45,6 +49,11 @@ public void stop() { } @Override + public void close() throws IOException { + service.close(); + } + + @Override public void register(ServiceStateChangeListener listener) { service.register(listener); } @@ -73,4 +82,34 @@ public STATE getServiceState() { public long getStartTime() { return startTime; } + + @Override + public boolean isInState(STATE state) { + return service.isInState(state); + } + + @Override + public Throwable getFailureCause() { + return service.getFailureCause(); + } + + @Override + public STATE getFailureState() { + return service.getFailureState(); + } + + @Override + public boolean waitForServiceToStop(long timeout) { + return service.waitForServiceToStop(timeout); + } + + @Override + public List getLifecycleHistory() { + return service.getLifecycleHistory(); + } + + @Override + public Map getBlockers() { + return service.getBlockers(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LifecycleEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LifecycleEvent.java new file mode 100644 index 0000000..4acd4a1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LifecycleEvent.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service; + +import java.io.Serializable; + +/** + * A serializable lifecycle event: the time a state + * transition occurred, and what state was entered. + */ +public class LifecycleEvent implements Serializable { + /** + * Local time in milliseconds when the event occurred + */ + public long time; + /** + * new state + */ + public Service.STATE state; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LoggingStateChangeListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LoggingStateChangeListener.java new file mode 100644 index 0000000..b0c3af1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/LoggingStateChangeListener.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This is a state change listener that logs events at INFO level + */ +public class LoggingStateChangeListener implements ServiceStateChangeListener { + + private static final Log LOG = LogFactory.getLog(LoggingStateChangeListener.class); + + private final Log log; + + /** + * Log events to the given log + * @param log destination for events + */ + public LoggingStateChangeListener(Log log) { + //force an NPE if a null log came in + log.isDebugEnabled(); + this.log = log; + } + + /** + * Log events to the static log for this class + */ + public LoggingStateChangeListener() { + this(LOG); + } + + /** + * Callback for a state change event: log it + * @param service the service that has changed. + */ + @Override + public void stateChanged(Service service) { + log.info("Entry to state " + service.getServiceState() + + " for " + service.getName()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java index 8b8b183..1749bf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/Service.java @@ -20,34 +20,77 @@ import org.apache.hadoop.conf.Configuration; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + /** * Service LifeCycle. */ -public interface Service { +public interface Service extends Closeable { /** * Service states */ public enum STATE { /** Constructed but not initialized */ - NOTINITED, + NOTINITED(0, "NOTINITED"), /** Initialized but not started or stopped */ - INITED, + INITED(1, "INITED"), /** started and not stopped */ - STARTED, + STARTED(2, "STARTED"), /** stopped. No further state transitions are permitted */ - STOPPED + STOPPED(3, "STOPPED"); + + /** + * An integer value for use in array lookup and JMX interfaces. + * Although {@link Enum#ordinal()} could do this, explicitly + * identify the numbers gives more stability guarantees over time. + */ + private final int value; + + /** + * A name of the state that can be used in messages + */ + private final String statename; + + private STATE(int value, String name) { + this.value = value; + this.statename = name; + } + + /** + * Get the integer value of a state + * @return the numeric value of the state + */ + public int getValue() { + return value; + } + + /** + * Get the name of a state + * @return the state's name + */ + @Override + public String toString() { + return statename; + } } /** * Initialize the service. * - * The transition must be from {@link STATE#NOTINITED} to {@link STATE#INITED} - * unless the operation failed and an exception was raised. + * The transition MUST be from {@link STATE#NOTINITED} to {@link STATE#INITED} + * unless the operation failed and an exception was raised, in which case + * {@link #stop()} MUST be invoked and the service enter the state + * {@link STATE#STOPPED}. * @param config the configuration of the service + * @throws RuntimeException on any failure during the operation + */ void init(Configuration config); @@ -55,21 +98,37 @@ /** * Start the service. * - * The transition should be from {@link STATE#INITED} to {@link STATE#STARTED} - * unless the operation failed and an exception was raised. + * The transition MUST be from {@link STATE#INITED} to {@link STATE#STARTED} + * unless the operation failed and an exception was raised, in which case + * {@link #stop()} MUST be invoked and the service enter the state + * {@link STATE#STOPPED}. + * @throws RuntimeException on any failure during the operation */ void start(); /** - * Stop the service. + * Stop the service. This MUST be a no-op if the service is already + * in the {@link STATE#STOPPED} state. It SHOULD be a best-effort attempt + * to stop all parts of the service. * - * This operation must be designed to complete regardless of the initial state - * of the service, including the state of all its internal fields. + * The implementation must be designed to complete regardless of the service + * state, including the initialized/uninitialized state of all its internal + * fields. + * @throws RuntimeException on any failure during the stop operation */ void stop(); /** + * A version of stop() that is designed to be usable in Java7 closure + * clauses. + * Implementation classes MUST relay this directly to {@link #stop()} + * @throws IOException never + * @throws RuntimeException on any failure during the stop operation + */ + void close() throws IOException; + + /** * Register an instance of the service state change events. * @param listener a new listener */ @@ -108,4 +167,52 @@ * has not yet been started. */ long getStartTime(); + + /** + * Query to see if the service is in a specific state. + * In a multi-threaded system, the state may not hold for very long. + * @param state the expected state + * @return true if, at the time of invocation, the service was in that state. + */ + boolean isInState(STATE state); + + /** + * Get the first exception raised during the service failure. If null, + * no exception was logged + * @return the failure logged during a transition to the stopped state + */ + Throwable getFailureCause(); + + /** + * Get the state in which the failure in {@link #getFailureCause()} occurred. + * @return the state or null if there was no failure + */ + STATE getFailureState(); + + /** + * Block waiting for the service to stop; uses the termination notification + * object to do so. + * + * This method will only return after all the service stop actions + * have been executed (to success or failure), or the timeout elapsed + * This method can be called before the service is inited or started; this is + * to eliminate any race condition with the service stopping before + * this event occurs. + * @param timeout timeout in milliseconds. A value of zero means "forever" + * @return true iff the service stopped in the time period + */ + boolean waitForServiceToStop(long timeout); + + /** + * Get a snapshot of the lifecycle history; it is a static list + * @return a possibly empty but never null list of lifecycle events. + */ + public List getLifecycleHistory(); + + /** + * Get the blockers on a service -remote dependencies + * that are stopping the service from being live. + * @return a (snapshotted) map of blocker name->description values + */ + public Map getBlockers(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java index 151caa9..24f9e85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java @@ -21,6 +21,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ShutdownHookManager; + +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.List; /** * This class contains a set of methods to work with services, especially @@ -33,74 +38,6 @@ private ServiceOperations() { } /** - * Verify that that a service is in a given state. - * @param state the actual state a service is in - * @param expectedState the desired state - * @throws IllegalStateException if the service state is different from - * the desired state - */ - public static void ensureCurrentState(Service.STATE state, - Service.STATE expectedState) { - if (state != expectedState) { - throw new IllegalStateException("For this operation, the " + - "current service state must be " - + expectedState - + " instead of " + state); - } - } - - /** - * Initialize a service. - *

- * The service state is checked before the operation begins. - * This process is not thread safe. - * @param service a service that must be in the state - * {@link Service.STATE#NOTINITED} - * @param configuration the configuration to initialize the service with - * @throws RuntimeException on a state change failure - * @throws IllegalStateException if the service is in the wrong state - */ - - public static void init(Service service, Configuration configuration) { - Service.STATE state = service.getServiceState(); - ensureCurrentState(state, Service.STATE.NOTINITED); - service.init(configuration); - } - - /** - * Start a service. - *

- * The service state is checked before the operation begins. - * This process is not thread safe. - * @param service a service that must be in the state - * {@link Service.STATE#INITED} - * @throws RuntimeException on a state change failure - * @throws IllegalStateException if the service is in the wrong state - */ - - public static void start(Service service) { - Service.STATE state = service.getServiceState(); - ensureCurrentState(state, Service.STATE.INITED); - service.start(); - } - - /** - * Initialize then start a service. - *

- * The service state is checked before the operation begins. - * This process is not thread safe. - * @param service a service that must be in the state - * {@link Service.STATE#NOTINITED} - * @param configuration the configuration to initialize the service with - * @throws RuntimeException on a state change failure - * @throws IllegalStateException if the service is in the wrong state - */ - public static void deploy(Service service, Configuration configuration) { - init(service, configuration); - start(service); - } - - /** * Stop a service. *

Do nothing if the service is null or not * in a state in which it can be/needs to be stopped. @@ -111,10 +48,7 @@ public static void deploy(Service service, Configuration configuration) { */ public static void stop(Service service) { if (service != null) { - Service.STATE state = service.getServiceState(); - if (state == Service.STATE.STARTED) { - service.stop(); - } + service.stop(); } } @@ -127,14 +61,93 @@ public static void stop(Service service) { * @return any exception that was caught; null if none was. */ public static Exception stopQuietly(Service service) { + return stopQuietly(LOG, service); + } + + /** + * Stop a service; if it is null do nothing. Exceptions are caught and + * logged at warn level. (but not Throwables). This operation is intended to + * be used in cleanup operations + * + * @param log the log to warn at + * @param service a service; may be null + * @return any exception that was caught; null if none was. + * @see ServiceOperations#stopQuietly(Service) + */ + public static Exception stopQuietly(Log log, Service service) { try { stop(service); } catch (Exception e) { - LOG.warn("When stopping the service " + service.getName() - + " : " + e, + log.warn("When stopping the service " + service.getName() + + " : " + e, e); return e; } return null; } + + + /** + * Class to manage a list of {@link ServiceStateChangeListener} instances, + * including a notification loop that is robust against changes to the list + * during the notification process. + */ + public static class ServiceListeners { + /** + * List of state change listeners; it is final to guarantee + * that it will never be null. + */ + private final List listeners = + new ArrayList(); + + /** + * Thread-safe addition of a new listener to the end of a list. + * Attempts to re-register a listener that is already registered + * will be ignored. + * @param l listener + */ + public synchronized void add(ServiceStateChangeListener l) { + if(!listeners.contains(l)) { + listeners.add(l); + } + } + + /** + * Remove any registration of a listener from the listener list. + * @param l listener + * @return true if the listener was found (and then removed) + */ + public synchronized boolean remove(ServiceStateChangeListener l) { + return listeners.remove(l); + } + + /** + * Reset the listener list + */ + public synchronized void reset() { + listeners.clear(); + } + + /** + * Change to a new state and notify all listeners. + * This method will block until all notifications have been issued. + * It caches the list of listeners before the notification begins, + * so additions or removal of listeners will not be visible. + * @param service the service that has changed state + */ + public void notifyListeners(Service service) { + //take a very fast snapshot of the callback list + //very much like CopyOnWriteArrayList, only more minimal + ServiceStateChangeListener[] callbacks; + synchronized (this) { + callbacks = listeners.toArray(new ServiceStateChangeListener[listeners.size()]); + } + //iterate through the listeners outside the synchronized method, + //ensuring that listener registration/unregistration doesn't break anything + for (ServiceStateChangeListener l : callbacks) { + l.stateChanged(service); + } + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java new file mode 100644 index 0000000..124b5d1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.yarn.YarnRuntimeException; + +/** + * Exception that is raised on state change operations. + */ +public class ServiceStateException extends YarnRuntimeException { + + public ServiceStateException(String message) { + super(message); + } + + public ServiceStateException(String message, Throwable cause) { + super(message, cause); + } + + public ServiceStateException(Throwable cause) { + super(cause); + } + + /** + * Convert any exception into a {@link RuntimeException}. + * If the caught exception already is of that type -including + * a {@link YarnException} it is typecast to a {@link RuntimeException} + * and returned. + * + * All other exception types are wrapped in a new instance of + * ServiceStateException + * @param fault exception or throwable + * @return a ServiceStateException to rethrow + */ + public static RuntimeException convert(Throwable fault) { + if (fault instanceof RuntimeException) { + return (RuntimeException) fault; + } else { + return new ServiceStateException(fault); + } + } + + /** + * Convert any exception into a {@link RuntimeException}. + * If the caught exception already is of that type -including + * a {@link YarnException} it is typecast to a {@link RuntimeException} + * and returned. + * + * All other exception types are wrapped in a new instance of + * ServiceStateException + * @param text text to use if a new exception is created + * @param fault exception or throwable + * @return a ServiceStateException to rethrow + */ + public static RuntimeException convert(String text, Throwable fault) { + if (fault instanceof RuntimeException) { + return (RuntimeException) fault; + } else { + return new ServiceStateException(text, fault); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java new file mode 100644 index 0000000..a9dd138 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +/** + * Implements the service state model for YARN. + */ +public class ServiceStateModel { + + /** + * Map of all valid state transitions + * [current] [proposed1, proposed2, ...] + */ + private static final boolean[][] statemap = + { + // uninited inited started stopped + /* uninited */ {false, true, false, true}, + /* inited */ {false, true, true, true}, + /* started */ {false, false, true, true}, + /* stopped */ {false, false, false, true}, + }; + + /** + * The state of the service + */ + private volatile Service.STATE state; + + /** + * The name of the service: used in exceptions + */ + private String name; + + /** + * Create the service state model in the {@link Service.STATE#NOTINITED} + * state. + */ + public ServiceStateModel(String name) { + this(name, Service.STATE.NOTINITED); + } + + /** + * Create a service state model instance in the chosen state + * @param state the starting state + */ + public ServiceStateModel(String name, Service.STATE state) { + this.state = state; + this.name = name; + } + + /** + * Query the service state. This is a non-blocking operation. + * @return the state + */ + public Service.STATE getState() { + return state; + } + + /** + * Query that the state is in a specific state + * @param proposed proposed new state + * @return the state + */ + public boolean isInState(Service.STATE proposed) { + return state.equals(proposed); + } + + /** + * Verify that that a service is in a given state. + * @param expectedState the desired state + * @throws ServiceStateException if the service state is different from + * the desired state + */ + public void ensureCurrentState(Service.STATE expectedState) { + if (state != expectedState) { + throw new ServiceStateException(name+ ": for this operation, the " + + "current service state must be " + + expectedState + + " instead of " + state); + } + } + + /** + * Enter a state -thread safe. + * + * @param proposed proposed new state + * @return the original state + * @throws ServiceStateException if the transition is not permitted + */ + public synchronized Service.STATE enterState(Service.STATE proposed) { + checkStateTransition(name, state, proposed); + Service.STATE oldState = state; + //atomic write of the new state + state = proposed; + return oldState; + } + + /** + * Check that a state tansition is valid and + * throw an exception if not + * @param name name of the service (can be null) + * @param state current state + * @param proposed proposed new state + */ + public static void checkStateTransition(String name, + Service.STATE state, + Service.STATE proposed) { + if (!isValidStateTransition(state, proposed)) { + throw new ServiceStateException(name + " cannot enter state " + + proposed + " from state " + state); + } + } + + /** + * Is a state transition valid? + * There are no checks for current==proposed + * as that is considered a non-transition. + * + * using an array kills off all branch misprediction costs, at the expense + * of cache line misses. + * + * @param current current state + * @param proposed proposed new state + * @return true if the transition to a new state is valid + */ + public static boolean isValidStateTransition(Service.STATE current, + Service.STATE proposed) { + boolean[] row = statemap[current.getValue()]; + return row[proposed.getValue()]; + } + + /** + * return the state text as the toString() value + * @return the current state's description + */ + @Override + public String toString() { + return (name.isEmpty() ? "" : ((name) + ": ")) + + state.toString(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index b46ad3e..c6e4d22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -49,21 +49,21 @@ public AbstractLivelinessMonitor(String name, Clock clock) { } @Override - public void start() { + protected void serviceStart() throws Exception { assert !stopped : "starting when already stopped"; checkerThread = new Thread(new PingChecker()); checkerThread.setName("Ping Checker"); checkerThread.start(); - super.start(); + super.serviceStart(); } @Override - public void stop() { + protected void serviceStop() throws Exception { stopped = true; if (checkerThread != null) { checkerThread.interrupt(); } - super.stop(); + super.serviceStop(); } protected abstract void expire(O ob); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java index 5907f39..12c9649 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java @@ -55,13 +55,7 @@ public BreakableService(boolean failOnInit, } private int convert(STATE state) { - switch (state) { - case NOTINITED: return 0; - case INITED: return 1; - case STARTED: return 2; - case STOPPED: return 3; - default: return 0; - } + return state.getValue(); } private void inc(STATE state) { @@ -75,29 +69,27 @@ public int getCount(STATE state) { private void maybeFail(boolean fail, String action) { if (fail) { - throw new BrokenLifecycleEvent(action); + throw new BrokenLifecycleEvent(this, action); } } @Override - public void init(Configuration conf) { + protected void serviceInit(Configuration conf) throws Exception { inc(STATE.INITED); maybeFail(failOnInit, "init"); - super.init(conf); + super.serviceInit(conf); } @Override - public void start() { + protected void serviceStart() { inc(STATE.STARTED); maybeFail(failOnStart, "start"); - super.start(); } @Override - public void stop() { + protected void serviceStop() { inc(STATE.STOPPED); maybeFail(failOnStop, "stop"); - super.stop(); } public void setFailOnInit(boolean failOnInit) { @@ -116,8 +108,13 @@ public void setFailOnStop(boolean failOnStop) { * The exception explicitly raised on a failure */ public static class BrokenLifecycleEvent extends RuntimeException { - BrokenLifecycleEvent(String action) { - super("Lifecycle Failure during " + action); + + final STATE state; + + public BrokenLifecycleEvent(Service service, String action) { + super("Lifecycle Failure during " + action + " state is " + + service.getServiceState()); + state = service.getServiceState(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java new file mode 100644 index 0000000..7b88c90 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import java.util.ArrayList; +import java.util.List; + +/** + * A state change listener that logs the number of state change events received, + * and the last state invoked. + * + * It can be configured to fail during a state change event + */ +public class BreakableStateChangeListener + implements ServiceStateChangeListener { + + private final String name; + + private int eventCount; + private int failureCount; + private Service lastService; + private Service.STATE lastState = Service.STATE.NOTINITED; + //no callbacks are ever received for this event, so it + //can be used as an 'undefined'. + private Service.STATE failingState = Service.STATE.NOTINITED; + private List stateEventList = new ArrayList(4); + + public BreakableStateChangeListener() { + this( "BreakableStateChangeListener"); + } + + public BreakableStateChangeListener(String name) { + this.name = name; + } + + @Override + public synchronized void stateChanged(Service service) { + eventCount++; + lastService = service; + lastState = service.getServiceState(); + stateEventList.add(lastState); + if (lastState == failingState) { + failureCount++; + throw new BreakableService.BrokenLifecycleEvent(service, + "Failure entering " + + lastState + + " for " + + service.getName()); + } + } + + public synchronized int getEventCount() { + return eventCount; + } + + public synchronized Service getLastService() { + return lastService; + } + + public synchronized Service.STATE getLastState() { + return lastState; + } + + public synchronized void setFailingState(Service.STATE failingState) { + this.failingState = failingState; + } + + public synchronized int getFailureCount() { + return failureCount; + } + + public List getStateEventList() { + return stateEventList; + } + + @Override + public synchronized String toString() { + String s = + name + " - event count = " + eventCount + " last state " + lastState; + StringBuilder history = new StringBuilder(stateEventList.size()*10); + for (Service.STATE state: stateEventList) { + history.append(state).append(" "); + } + return s + " [ " + history + "]"; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java new file mode 100644 index 0000000..5a44bd8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Test; + +/** + * Test global state changes. It is critical for all tests to clean up the + * global listener afterwards to avoid interfering with follow-on tests. + * + * One listener, {@link #listener} is defined which is automatically + * unregistered on cleanup. All other listeners must be unregistered in the + * finally clauses of the tests. + */ +public class TestGlobalStateChangeListener extends ServiceAssert { + + BreakableStateChangeListener listener = new BreakableStateChangeListener("listener"); + + + private void register() { + register(listener); + } + + private boolean unregister() { + return unregister(listener); + } + + private void register(ServiceStateChangeListener l) { + AbstractService.registerGlobalListener(l); + } + + private boolean unregister(ServiceStateChangeListener l) { + return AbstractService.unregisterGlobalListener(l); + } + + /** + * After every test case reset the list of global listeners. + */ + @After + public void cleanup() { + AbstractService.resetGlobalListeners(); + } + + /** + * Assert that the last state of the listener is that the test expected. + * @param breakable a breakable listener + * @param state the expected state + */ + public void assertListenerState(BreakableStateChangeListener breakable, + Service.STATE state) { + assertEquals("Wrong state in " + breakable, state, breakable.getLastState()); + } + + /** + * Assert that the number of state change notifications matches expectations. + * @param breakable the listener + * @param count the expected count. + */ + public void assertListenerEventCount(BreakableStateChangeListener breakable, + int count) { + assertEquals("Wrong event count in " + breakable, count, + breakable.getEventCount()); + } + + /** + * Test that register/unregister works + */ + @Test + public void testRegisterListener() { + register(); + assertTrue("listener not registered", unregister()); + } + + /** + * Test that double registration results in one registration only. + */ + @Test + public void testRegisterListenerTwice() { + register(); + register(); + assertTrue("listener not registered", unregister()); + //there should be no listener to unregister the second time + assertFalse("listener double registered", unregister()); + } + + /** + * Test that the {@link BreakableStateChangeListener} is picking up + * the state changes and that its last event field is as expected. + */ + @Test + public void testEventHistory() { + register(); + BreakableService service = new BreakableService(); + assertListenerState(listener, Service.STATE.NOTINITED); + assertEquals(0, listener.getEventCount()); + service.init(new Configuration()); + assertListenerState(listener, Service.STATE.INITED); + assertSame(service, listener.getLastService()); + assertListenerEventCount(listener, 1); + + service.start(); + assertListenerState(listener, Service.STATE.STARTED); + assertListenerEventCount(listener, 2); + + service.stop(); + assertListenerState(listener, Service.STATE.STOPPED); + assertListenerEventCount(listener, 3); + } + + /** + * This test triggers a failure in the listener - the expectation is that the + * service has already reached it's desired state, purely because the + * notifications take place afterwards. + * + */ + @Test + public void testListenerFailure() { + listener.setFailingState(Service.STATE.INITED); + register(); + BreakableStateChangeListener l2 = new BreakableStateChangeListener(); + register(l2); + BreakableService service = new BreakableService(); + service.init(new Configuration()); + //expected notifications to fail + + //still should record its invocation + assertListenerState(listener, Service.STATE.INITED); + assertListenerEventCount(listener, 1); + + //and second listener didn't get notified of anything + assertListenerEventCount(l2, 0); + + //service should still consider itself started + assertServiceStateInited(service); + service.start(); + service.stop(); + } + + /** + * Create a chain of listeners and set one in the middle to fail; verify that + * those in front got called, and those after did not. + */ + @Test + public void testListenerChain() { + + //create and register the listeners + LoggingStateChangeListener logListener = new LoggingStateChangeListener(); + register(logListener); + BreakableStateChangeListener l0 = new BreakableStateChangeListener("l0"); + register(l0); + listener.setFailingState(Service.STATE.STARTED); + register(); + BreakableStateChangeListener l3 = new BreakableStateChangeListener("l3"); + register(l3); + + //create and init a service. + BreakableService service = new BreakableService(); + service.init(new Configuration()); + assertServiceStateInited(service); + assertListenerState(l0, Service.STATE.INITED); + assertListenerState(listener, Service.STATE.INITED); + assertListenerState(l3, Service.STATE.INITED); + + service.start(); + //expect that listener l1 and the failing listener are in start, but + //not the final one + assertServiceStateStarted(service); + assertListenerState(l0, Service.STATE.STARTED); + assertListenerEventCount(l0, 2); + assertListenerState(listener, Service.STATE.STARTED); + assertListenerEventCount(listener, 2); + //this is the listener that is not expected to have been invoked + assertListenerState(l3, Service.STATE.INITED); + assertListenerEventCount(l3, 1); + + //stop the service + service.stop(); + //listeners are all updated + assertListenerEventCount(l0, 3); + assertListenerEventCount(listener, 3); + assertListenerEventCount(l3, 2); + //can all be unregistered in any order + unregister(logListener); + unregister(l0); + unregister(l3); + + //check that the listeners are all unregistered, even + //though they were registered in a different order. + //rather than do this by doing unregister checks, a new service is created + service = new BreakableService(); + //this service is initialized + service.init(new Configuration()); + //it is asserted that the event count has not changed for the unregistered + //listeners + assertListenerEventCount(l0, 3); + assertListenerEventCount(l3, 2); + //except for the one listener that was not unregistered, which + //has incremented by one + assertListenerEventCount(listener, 4); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java index c69b7b7..bd73a41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java @@ -19,10 +19,13 @@ package org.apache.hadoop.yarn.service; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.junit.Test; public class TestServiceLifecycle extends ServiceAssert { + private static Log LOG = LogFactory.getLog(TestServiceLifecycle.class); /** * Walk the {@link BreakableService} through it's lifecycle, @@ -59,13 +62,8 @@ public void testInitTwice() throws Throwable { Configuration conf = new Configuration(); conf.set("test.init","t"); svc.init(conf); - try { - svc.init(new Configuration()); - fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { - //expected - } - assertStateCount(svc, Service.STATE.INITED, 2); + svc.init(new Configuration()); + assertStateCount(svc, Service.STATE.INITED, 1); assertServiceConfigurationContains(svc, "test.init"); } @@ -78,21 +76,14 @@ public void testStartTwice() throws Throwable { BreakableService svc = new BreakableService(); svc.init(new Configuration()); svc.start(); - try { - svc.start(); - fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { - //expected - } - assertStateCount(svc, Service.STATE.STARTED, 2); + svc.start(); + assertStateCount(svc, Service.STATE.STARTED, 1); } /** * Verify that when a service is stopped more than once, no exception - * is thrown, and the counter is incremented. - * This is because the state change operations happen after the counter in - * the subclass is incremented, even though stop is meant to be a no-op + * is thrown. * @throws Throwable if necessary */ @Test @@ -103,7 +94,7 @@ public void testStopTwice() throws Throwable { svc.stop(); assertStateCount(svc, Service.STATE.STOPPED, 1); svc.stop(); - assertStateCount(svc, Service.STATE.STOPPED, 2); + assertStateCount(svc, Service.STATE.STOPPED, 1); } @@ -124,12 +115,12 @@ public void testStopFailedInit() throws Throwable { //expected } //the service state wasn't passed - assertServiceStateCreated(svc); + assertServiceStateStopped(svc); assertStateCount(svc, Service.STATE.INITED, 1); + assertStateCount(svc, Service.STATE.STOPPED, 1); //now try to stop svc.stop(); - //even after the stop operation, we haven't entered the state - assertServiceStateCreated(svc); + assertStateCount(svc, Service.STATE.STOPPED, 1); } @@ -151,18 +142,12 @@ public void testStopFailedStart() throws Throwable { //expected } //the service state wasn't passed - assertServiceStateInited(svc); - assertStateCount(svc, Service.STATE.INITED, 1); - //now try to stop - svc.stop(); - //even after the stop operation, we haven't entered the state - assertServiceStateInited(svc); + assertServiceStateStopped(svc); } /** * verify that when a service fails during its stop operation, - * its state does not change, and the subclass invocation counter - * increments. + * its state does not change. * @throws Throwable if necessary */ @Test @@ -177,42 +162,302 @@ public void testFailingStop() throws Throwable { //expected } assertStateCount(svc, Service.STATE.STOPPED, 1); - assertServiceStateStarted(svc); - //now try again, and expect it to happen again + } + + /** + * verify that when a service that is not started is stopped, the + * service enters the stopped state + * @throws Throwable on a failure + */ + @Test + public void testStopUnstarted() throws Throwable { + BreakableService svc = new BreakableService(); + svc.stop(); + assertServiceStateStopped(svc); + assertStateCount(svc, Service.STATE.INITED, 0); + assertStateCount(svc, Service.STATE.STOPPED, 1); + } + + /** + * Show that if the service failed during an init + * operation, stop was called. + */ + + @Test + public void testStopFailingInitAndStop() throws Throwable { + BreakableService svc = new BreakableService(true, false, true); + svc.register(new LoggingStateChangeListener()); try { - svc.stop(); + svc.init(new Configuration()); fail("Expected a failure, got " + svc); } catch (BreakableService.BrokenLifecycleEvent e) { + assertEquals(Service.STATE.INITED, e.state); + } + //the service state is stopped + assertServiceStateStopped(svc); + assertEquals(Service.STATE.INITED, svc.getFailureState()); + + Throwable failureCause = svc.getFailureCause(); + assertNotNull("Null failure cause in " + svc, failureCause); + BreakableService.BrokenLifecycleEvent cause = + (BreakableService.BrokenLifecycleEvent) failureCause; + assertNotNull("null state in " + cause + " raised by " + svc, cause.state); + assertEquals(Service.STATE.INITED, cause.state); + } + + @Test + public void testInitNullConf() throws Throwable { + BreakableService svc = new BreakableService(false, false, false); + try { + svc.init(null); + LOG.warn("Null Configurations are permitted "); + } catch (ServiceStateException e) { //expected } - assertStateCount(svc, Service.STATE.STOPPED, 2); + } + + @Test + public void testServiceNotifications() throws Throwable { + BreakableService svc = new BreakableService(false, false, false); + BreakableStateChangeListener listener = new BreakableStateChangeListener(); + svc.register(listener); + svc.init(new Configuration()); + assertEventCount(listener, 1); + svc.start(); + assertEventCount(listener, 2); + svc.stop(); + assertEventCount(listener, 3); + svc.stop(); + assertEventCount(listener, 3); } /** - * verify that when a service that is not started is stopped, its counter - * of stop calls is still incremented-and the service remains in its - * original state.. + * Test that when a service listener is unregistered, it stops being invoked * @throws Throwable on a failure */ @Test - public void testStopUnstarted() throws Throwable { - BreakableService svc = new BreakableService(); + public void testServiceNotificationsStopOnceUnregistered() throws Throwable { + BreakableService svc = new BreakableService(false, false, false); + BreakableStateChangeListener listener = new BreakableStateChangeListener(); + svc.register(listener); + svc.init(new Configuration()); + assertEventCount(listener, 1); + svc.unregister(listener); + svc.start(); + assertEventCount(listener, 1); svc.stop(); - assertServiceStateCreated(svc); - assertStateCount(svc, Service.STATE.STOPPED, 1); + assertEventCount(listener, 1); + svc.stop(); + } - //stop failed, now it can be initialised + /** + * This test uses a service listener that unregisters itself during the callbacks. + * This a test that verifies the concurrency logic on the listener management + * code, that it doesn't throw any immutable state change exceptions + * if you change list membership during the notifications. + * The standard AbstractService implementation copies the list + * to an array in a synchronized block then iterates through + * the copy precisely to prevent this problem. + * @throws Throwable on a failure + */ + @Test + public void testServiceNotificationsUnregisterDuringCallback() throws Throwable { + BreakableService svc = new BreakableService(false, false, false); + BreakableStateChangeListener listener = + new SelfUnregisteringBreakableStateChangeListener(); + BreakableStateChangeListener l2 = + new BreakableStateChangeListener(); + svc.register(listener); + svc.register(l2); svc.init(new Configuration()); - - //and try to stop again, with no state change but an increment + assertEventCount(listener, 1); + assertEventCount(l2, 1); + svc.unregister(listener); + svc.start(); + assertEventCount(listener, 1); + assertEventCount(l2, 2); svc.stop(); - assertServiceStateInited(svc); - assertStateCount(svc, Service.STATE.STOPPED, 2); + assertEventCount(listener, 1); + svc.stop(); + } + + private static class SelfUnregisteringBreakableStateChangeListener + extends BreakableStateChangeListener { + + @Override + public synchronized void stateChanged(Service service) { + super.stateChanged(service); + service.unregister(this); + } + } - //once started, the service can be stopped reliably + private void assertEventCount(BreakableStateChangeListener listener, + int expected) { + assertEquals(listener.toString(), expected, listener.getEventCount()); + } + + @Test + public void testServiceFailingNotifications() throws Throwable { + BreakableService svc = new BreakableService(false, false, false); + BreakableStateChangeListener listener = new BreakableStateChangeListener(); + listener.setFailingState(Service.STATE.STARTED); + svc.register(listener); + svc.init(new Configuration()); + assertEventCount(listener, 1); + //start this; the listener failed but this won't show svc.start(); - ServiceOperations.stop(svc); - assertServiceStateStopped(svc); - assertStateCount(svc, Service.STATE.STOPPED, 3); + //counter went up + assertEventCount(listener, 2); + assertEquals(1, listener.getFailureCount()); + //stop the service -this doesn't fail + svc.stop(); + assertEventCount(listener, 3); + assertEquals(1, listener.getFailureCount()); + svc.stop(); + } + + /** + * This test verifies that you can block waiting for something to happen + * and use notifications to manage it + * @throws Throwable on a failure + */ + @Test + public void testListenerWithNotifications() throws Throwable { + //this tests that a listener can get notified when a service is stopped + AsyncSelfTerminatingService service = new AsyncSelfTerminatingService(2000); + NotifyingListener listener = new NotifyingListener(); + service.register(listener); + service.init(new Configuration()); + service.start(); + assertServiceInState(service, Service.STATE.STARTED); + long start = System.currentTimeMillis(); + synchronized (listener) { + listener.wait(20000); + } + long duration = System.currentTimeMillis() - start; + assertEquals(Service.STATE.STOPPED, listener.notifyingState); + assertServiceInState(service, Service.STATE.STOPPED); + assertTrue("Duration of " + duration + " too long", duration < 10000); + } + + @Test + public void testSelfTerminatingService() throws Throwable { + SelfTerminatingService service = new SelfTerminatingService(); + BreakableStateChangeListener listener = new BreakableStateChangeListener(); + service.register(listener); + service.init(new Configuration()); + assertEventCount(listener, 1); + //start the service + service.start(); + //and expect an event count of exactly two + assertEventCount(listener, 2); + } + + @Test + public void testStartInInitService() throws Throwable { + Service service = new StartInInitService(); + BreakableStateChangeListener listener = new BreakableStateChangeListener(); + service.register(listener); + service.init(new Configuration()); + assertServiceInState(service, Service.STATE.STARTED); + assertEventCount(listener, 1); + } + + @Test + public void testStopInInitService() throws Throwable { + Service service = new StopInInitService(); + BreakableStateChangeListener listener = new BreakableStateChangeListener(); + service.register(listener); + service.init(new Configuration()); + assertServiceInState(service, Service.STATE.STOPPED); + assertEventCount(listener, 1); + } + + /** + * Listener that wakes up all threads waiting on it + */ + private static class NotifyingListener implements ServiceStateChangeListener { + public Service.STATE notifyingState = Service.STATE.NOTINITED; + + public synchronized void stateChanged(Service service) { + notifyingState = service.getServiceState(); + this.notifyAll(); + } + } + + /** + * Service that terminates itself after starting and sleeping for a while + */ + private static class AsyncSelfTerminatingService extends AbstractService + implements Runnable { + final int timeout; + private AsyncSelfTerminatingService(int timeout) { + super("AsyncSelfTerminatingService"); + this.timeout = timeout; + } + + @Override + protected void serviceStart() throws Exception { + new Thread(this).start(); + super.serviceStart(); + } + + @Override + public void run() { + try { + Thread.sleep(timeout); + } catch (InterruptedException ignored) { + + } + this.stop(); + } + } + + /** + * Service that terminates itself in startup + */ + private static class SelfTerminatingService extends AbstractService { + private SelfTerminatingService() { + super("SelfTerminatingService"); + } + + @Override + protected void serviceStart() throws Exception { + //start + super.serviceStart(); + //then stop + stop(); + } + } + + /** + * Service that starts itself in init + */ + private static class StartInInitService extends AbstractService { + private StartInInitService() { + super("StartInInitService"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + start(); + } } + + /** + * Service that starts itself in init + */ + private static class StopInInitService extends AbstractService { + private StopInInitService() { + super("StopInInitService"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + stop(); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceOperations.java deleted file mode 100644 index 14aa1f5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceOperations.java +++ /dev/null @@ -1,312 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.hadoop.yarn.service; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -/** - * These tests verify that the {@link ServiceOperations} methods - * do a best-effort attempt to make the service state change operations - * idempotent. That is still best effort -there is no thread safety, and - * a failure during a state change does not prevent the operation - * being called again. - */ -public class TestServiceOperations extends ServiceAssert { - - @Test - public void testWalkthrough() throws Throwable { - BreakableService svc = new BreakableService(); - assertServiceStateCreated(svc); - Configuration conf = new Configuration(); - conf.set("test.walkthrough","t"); - ServiceOperations.init(svc, conf); - assertServiceStateInited(svc); - assertStateCount(svc, Service.STATE.INITED, 1); - //check the configuration made it all the way through. - assertServiceConfigurationContains(svc, "test.walkthrough"); - ServiceOperations.start(svc); - assertServiceStateStarted(svc); - assertStateCount(svc, Service.STATE.STARTED, 1); - ServiceOperations.stop(svc); - assertServiceStateStopped(svc); - assertStateCount(svc, Service.STATE.STOPPED, 1); - } - - /** - * Call init twice -expect a failure, and expect the count - * of initialization attempts to still be 1: the state - * check was made before the subclass method was called. - * @throws Throwable if need be - */ - @Test - public void testInitTwice() throws Throwable { - BreakableService svc = new BreakableService(); - Configuration conf = new Configuration(); - conf.set("test.init", "t"); - ServiceOperations.init(svc, conf); - try { - ServiceOperations.init(svc, new Configuration()); - fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { - //expected - } - assertStateCount(svc, Service.STATE.INITED, 1); - assertServiceConfigurationContains(svc, "test.init"); - } - - /** - * call start twice; expect failures and the start invoke count to - * be exactly 1. - * @throws Throwable if necessary - */ - @Test - public void testStartTwice() throws Throwable { - BreakableService svc = new BreakableService(); - ServiceOperations.init(svc, new Configuration()); - ServiceOperations.start(svc); - try { - ServiceOperations.start(svc); - fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { - //expected - } - assertStateCount(svc, Service.STATE.STARTED, 1); - } - - /** - * Test that the deploy operation pushes a service into its started state - * @throws Throwable on any failure. - */ - @Test - public void testDeploy() throws Throwable { - BreakableService svc = new BreakableService(); - assertServiceStateCreated(svc); - ServiceOperations.deploy(svc, new Configuration()); - assertServiceStateStarted(svc); - assertStateCount(svc, Service.STATE.INITED, 1); - assertStateCount(svc, Service.STATE.STARTED, 1); - ServiceOperations.stop(svc); - assertServiceStateStopped(svc); - assertStateCount(svc, Service.STATE.STOPPED, 1); - } - - /** - * Demonstrate that the deploy operation fails when invoked twice, - * but the service method call counts are unchanged after the second call. - * @throws Throwable on any failure. - */ - @Test - public void testDeployNotIdempotent() throws Throwable { - BreakableService svc = new BreakableService(); - assertServiceStateCreated(svc); - ServiceOperations.deploy(svc, new Configuration()); - try { - ServiceOperations.deploy(svc, new Configuration()); - fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { - //expected - } - //verify state and values are unchanged - assertServiceStateStarted(svc); - assertStateCount(svc, Service.STATE.INITED, 1); - assertStateCount(svc, Service.STATE.STARTED, 1); - ServiceOperations.stop(svc); - } - - /** - * Test that the deploy operation can fail part way through, in which - * case the service is in the state that it was in before the failing - * state method was called. - * @throws Throwable on any failure. - */ - @Test - public void testDeployNotAtomic() throws Throwable { - //this instance is set to fail in the start() call. - BreakableService svc = new BreakableService(false, true, false); - try { - ServiceOperations.deploy(svc, new Configuration()); - fail("Expected a failure, got " + svc); - } catch (BreakableService.BrokenLifecycleEvent expected) { - //expected - } - //now in the inited state - assertServiceStateInited(svc); - assertStateCount(svc, Service.STATE.INITED, 1); - assertStateCount(svc, Service.STATE.STARTED, 1); - //try again -expect a failure as the service is now inited. - try { - ServiceOperations.deploy(svc, new Configuration()); - fail("Expected a failure, got " + svc); - } catch (IllegalStateException e) { - //expected - } - } - - /** - * verify that when a service is stopped more than once, no exception - * is thrown, and the counter is not incremented - * this is because the state change operations happen after the counter in - * the subclass is incremented, even though stop is meant to be a no-op - * @throws Throwable on a failure - */ - @Test - public void testStopTwice() throws Throwable { - BreakableService svc = new BreakableService(); - ServiceOperations.deploy(svc, new Configuration()); - ServiceOperations.stop(svc); - assertStateCount(svc, Service.STATE.STOPPED, 1); - assertServiceStateStopped(svc); - ServiceOperations.stop(svc); - assertStateCount(svc, Service.STATE.STOPPED, 1); - } - - /** - * verify that when a service that is not started is stopped, it's counter - * is not incremented -the stop() method was not invoked. - * @throws Throwable on a failure - */ - @Test - public void testStopInit() throws Throwable { - BreakableService svc = new BreakableService(); - ServiceOperations.stop(svc); - assertServiceStateCreated(svc); - assertStateCount(svc, Service.STATE.STOPPED, 0); - ServiceOperations.stop(svc); - assertStateCount(svc, Service.STATE.STOPPED, 0); - } - - - /** - * Show that if the service failed during an init - * operation, it stays in the created state, even after stopping it - * @throws Throwable - */ - - @Test - public void testStopFailedInit() throws Throwable { - BreakableService svc = new BreakableService(true, false, false); - assertServiceStateCreated(svc); - try { - ServiceOperations.init(svc, new Configuration()); - fail("Expected a failure, got " + svc); - } catch (BreakableService.BrokenLifecycleEvent e) { - //expected - } - //the service state wasn't passed - assertServiceStateCreated(svc); - //the init state got invoked once - assertStateCount(svc, Service.STATE.INITED, 1); - //now try to stop - ServiceOperations.stop(svc); - //even after the stop operation, we haven't entered the state - assertServiceStateCreated(svc); - } - - - /** - * Show that if the service failed during an init - * operation, it stays in the created state, even after stopping it - * @throws Throwable - */ - - @Test - public void testStopFailedStart() throws Throwable { - BreakableService svc = new BreakableService(false, true, false); - ServiceOperations.init(svc, new Configuration()); - assertServiceStateInited(svc); - try { - ServiceOperations.start(svc); - fail("Expected a failure, got " + svc); - } catch (BreakableService.BrokenLifecycleEvent e) { - //expected - } - //the service state wasn't passed - assertServiceStateInited(svc); - assertStateCount(svc, Service.STATE.INITED, 1); - //now try to stop - ServiceOperations.stop(svc); - //even after the stop operation, we haven't entered the state - assertServiceStateInited(svc); - } - - /** - * verify that when a service is stopped more than once, no exception - * is thrown, and the counter is incremented - * this is because the state change operations happen after the counter in - * the subclass is incremented, even though stop is meant to be a no-op. - * - * The {@link ServiceOperations#stop(Service)} operation does not prevent - * this from happening - * @throws Throwable - */ - @Test - public void testFailingStop() throws Throwable { - BreakableService svc = new BreakableService(false, false, true); - ServiceOperations.deploy(svc, new Configuration()); - try { - ServiceOperations.stop(svc); - fail("Expected a failure, got " + svc); - } catch (BreakableService.BrokenLifecycleEvent e) { - //expected - } - assertStateCount(svc, Service.STATE.STOPPED, 1); - //now try to stop, this time doing it quietly - Exception exception = ServiceOperations.stopQuietly(svc); - assertTrue("Wrong exception type : " + exception, - exception instanceof BreakableService.BrokenLifecycleEvent); - assertStateCount(svc, Service.STATE.STOPPED, 2); - } - - - /** - * verify that when a service that is not started is stopped, its counter - * of stop calls is still incremented-and the service remains in its - * original state.. - * @throws Throwable on a failure - */ - @Test - public void testStopUnstarted() throws Throwable { - BreakableService svc = new BreakableService(); - - //invocation in NOTINITED state should be no-op - ServiceOperations.stop(svc); - assertServiceStateCreated(svc); - assertStateCount(svc, Service.STATE.STOPPED, 0); - - //stop failed, now it can be initialised - ServiceOperations.init(svc, new Configuration()); - - //again, no-op - ServiceOperations.stop(svc); - assertServiceStateInited(svc); - assertStateCount(svc, Service.STATE.STOPPED, 0); - - //once started, the service can be stopped reliably - ServiceOperations.start(svc); - ServiceOperations.stop(svc); - assertServiceStateStopped(svc); - assertStateCount(svc, Service.STATE.STOPPED, 1); - - //now stop one more time - ServiceOperations.stop(svc); - assertStateCount(svc, Service.STATE.STOPPED, 1); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java index 0fc598a..753d35f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java @@ -21,10 +21,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnRuntimeException; +import org.apache.hadoop.yarn.service.BreakableService; import org.apache.hadoop.yarn.service.CompositeService; +import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service.STATE; +import org.apache.hadoop.yarn.service.ServiceStateException; import org.junit.Before; import org.junit.Test; @@ -34,6 +39,16 @@ private static final int FAILED_SERVICE_SEQ_NUMBER = 2; + private static final Log LOG = LogFactory.getLog(TestCompositeService.class); + + /** + * flag to state policy of CompositeService, and hence + * what to look for after trying to stop a service from another state + * (e.g inited) + */ + private static final boolean STOP_ONLY_STARTED_SERVICES = + CompositeServiceImpl.isPolicyToStopOnlyStartedServices(); + @Before public void setup() { CompositeServiceImpl.resetCounter(); @@ -59,6 +74,9 @@ public void testCallSequence() { // Initialise the composite service serviceManager.init(conf); + //verify they were all inited + assertInState(STATE.INITED, services); + // Verify the init() call sequence numbers for every service for (int i = 0; i < NUM_OF_SERVICES; i++) { assertEquals("For " + services[i] @@ -67,11 +85,11 @@ public void testCallSequence() { } // Reset the call sequence numbers - for (int i = 0; i < NUM_OF_SERVICES; i++) { - services[i].reset(); - } + resetServices(services); serviceManager.start(); + //verify they were all started + assertInState(STATE.STARTED, services); // Verify the start() call sequence numbers for every service for (int i = 0; i < NUM_OF_SERVICES; i++) { @@ -79,13 +97,12 @@ public void testCallSequence() { + " service, start() call sequence number should have been ", i, services[i].getCallSequenceNumber()); } + resetServices(services); - // Reset the call sequence numbers - for (int i = 0; i < NUM_OF_SERVICES; i++) { - services[i].reset(); - } serviceManager.stop(); + //verify they were all stopped + assertInState(STATE.STOPPED, services); // Verify the stop() call sequence numbers for every service for (int i = 0; i < NUM_OF_SERVICES; i++) { @@ -104,6 +121,13 @@ public void testCallSequence() { } } + private void resetServices(CompositeServiceImpl[] services) { + // Reset the call sequence numbers + for (int i = 0; i < NUM_OF_SERVICES; i++) { + services[i].reset(); + } + } + @Test public void testServiceStartup() { ServiceManager serviceManager = new ServiceManager("ServiceManager"); @@ -131,7 +155,7 @@ public void testServiceStartup() { fail("Exception should have been thrown due to startup failure of last service"); } catch (YarnRuntimeException e) { for (int i = 0; i < NUM_OF_SERVICES - 1; i++) { - if (i >= FAILED_SERVICE_SEQ_NUMBER) { + if (i >= FAILED_SERVICE_SEQ_NUMBER && STOP_ONLY_STARTED_SERVICES) { // Failed service state should be INITED assertEquals("Service state should have been ", STATE.INITED, services[NUM_OF_SERVICES - 1].getServiceState()); @@ -171,15 +195,147 @@ public void testServiceStop() { try { serviceManager.stop(); } catch (YarnRuntimeException e) { - for (int i = 0; i < NUM_OF_SERVICES - 1; i++) { - assertEquals("Service state should have been ", STATE.STOPPED, - services[NUM_OF_SERVICES].getServiceState()); - } + } + assertInState(STATE.STOPPED, services); + } + + /** + * Assert that all services are in the same expected state + * @param expected expected state value + * @param services services to examine + */ + private void assertInState(STATE expected, CompositeServiceImpl[] services) { + assertInState(expected, services,0, services.length); + } + + /** + * Assert that all services are in the same expected state + * @param expected expected state value + * @param services services to examine + * @param start start offset + * @param finish finish offset: the count stops before this number + */ + private void assertInState(STATE expected, + CompositeServiceImpl[] services, + int start, int finish) { + for (int i = start; i < finish; i++) { + Service service = services[i]; + assertInState(expected, service); + } + } + + private void assertInState(STATE expected, Service service) { + assertEquals("Service state should have been " + expected + " in " + + service, + expected, + service.getServiceState()); + } + + /** + * Shut down from not-inited: expect nothing to have happened + */ + @Test + public void testServiceStopFromNotInited() { + ServiceManager serviceManager = new ServiceManager("ServiceManager"); + + // Add services + for (int i = 0; i < NUM_OF_SERVICES; i++) { + CompositeServiceImpl service = new CompositeServiceImpl(i); + serviceManager.addTestService(service); + } + + CompositeServiceImpl[] services = serviceManager.getServices().toArray( + new CompositeServiceImpl[0]); + serviceManager.stop(); + assertInState(STATE.NOTINITED, services); + } + + /** + * Shut down from inited + */ + @Test + public void testServiceStopFromInited() { + ServiceManager serviceManager = new ServiceManager("ServiceManager"); + + // Add services + for (int i = 0; i < NUM_OF_SERVICES; i++) { + CompositeServiceImpl service = new CompositeServiceImpl(i); + serviceManager.addTestService(service); + } + + CompositeServiceImpl[] services = serviceManager.getServices().toArray( + new CompositeServiceImpl[0]); + serviceManager.init(new Configuration()); + serviceManager.stop(); + if (STOP_ONLY_STARTED_SERVICES) { + //this policy => no services were stopped + assertInState(STATE.INITED, services); + } else { + assertInState(STATE.STOPPED, services); } } + /** + * Use a null configuration & expect a failure + * @throws Throwable + */ + @Test + public void testInitNullConf() throws Throwable { + ServiceManager serviceManager = new ServiceManager("testInitNullConf"); + + CompositeServiceImpl service = new CompositeServiceImpl(0); + serviceManager.addTestService(service); + try { + serviceManager.init(null); + LOG.warn("Null Configurations are permitted " + serviceManager); + } catch (ServiceStateException e) { + //expected + } + } + + /** + * Walk the service through their lifecycle without any children; + * verify that it all works. + */ + @Test + public void testServiceLifecycleNoChildren() { + ServiceManager serviceManager = new ServiceManager("ServiceManager"); + serviceManager.init(new Configuration()); + serviceManager.start(); + serviceManager.stop(); + } + + @Test + public void testAddServiceInInit() throws Throwable { + BreakableService child = new BreakableService(); + assertInState(STATE.NOTINITED, child); + CompositeServiceAddingAChild composite = + new CompositeServiceAddingAChild(child); + composite.init(new Configuration()); + assertInState(STATE.INITED, child); + } + + public static class CompositeServiceAddingAChild extends CompositeService{ + Service child; + + public CompositeServiceAddingAChild(Service child) { + super("CompositeServiceAddingAChild"); + this.child = child; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + addService(child); + super.serviceInit(conf); + } + } + public static class CompositeServiceImpl extends CompositeService { + public static boolean isPolicyToStopOnlyStartedServices() { + return STOP_ONLY_STARTED_SERVICES; + } + private static int counter = -1; private int callSequenceNumber = -1; @@ -193,30 +349,30 @@ public CompositeServiceImpl(int sequenceNumber) { } @Override - public synchronized void init(Configuration conf) { + protected void serviceInit(Configuration conf) throws Exception { counter++; callSequenceNumber = counter; - super.init(conf); + super.serviceInit(conf); } @Override - public synchronized void start() { + protected void serviceStart() throws Exception { if (throwExceptionOnStart) { throw new YarnRuntimeException("Fake service start exception"); } counter++; callSequenceNumber = counter; - super.start(); + super.serviceStart(); } @Override - public synchronized void stop() { + protected void serviceStop() throws Exception { counter++; callSequenceNumber = counter; if (throwExceptionOnStop) { throw new YarnRuntimeException("Fake service stop exception"); } - super.stop(); + super.serviceStop(); } public static int getCounter() {