diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index ab8673b..87e7255 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -392,4 +392,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java
index 51cb4a3..f036d87 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java
@@ -21,32 +21,39 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Composition of services.
+ *
+ * Child services will have their {@link Service#init(Configuration)},
+ * {@link Service#start()} and {@link Service#stop()} lifecycle operations
+ * invoked to follow this parent service's lifecycle.
*/
@Public
@Evolving
-public class CompositeService extends AbstractService {
+public class CompositeService extends AbstractService implements ServiceParent {
- private static final Log LOG = LogFactory.getLog(CompositeService.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CompositeService.class);
/**
* Policy on shutdown: attempt to close everything (purest) or
- * only try to close started services (which assumes
- * that the service implementations may not handle the stop() operation
+ * only try to close started services (which assumes that the service
+ * implementations may not handle the {@code stop()} operation
* except when started.
+ *
* Irrespective of this policy, if a child service fails during
- * its init() or start() operations, it will have stop() called on it.
+ * its init() or start() operations,
+ * it will have stop() called on it.
*/
protected static final boolean STOP_ONLY_STARTED_SERVICES = false;
- private final List serviceList = new ArrayList();
+ private final List serviceList = new ArrayList<>();
public CompositeService(String name) {
super(name);
@@ -59,7 +66,7 @@ public CompositeService(String name) {
*/
public List getServices() {
synchronized (serviceList) {
- return new ArrayList(serviceList);
+ return new ArrayList<>(serviceList);
}
}
@@ -68,10 +75,8 @@ public CompositeService(String name) {
* {@link CompositeService}
* @param service the {@link Service} to be added
*/
- protected void addService(Service service) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding service " + service.getName());
- }
+ public void addService(Service service) {
+ LOG.debug("Adding service {}", service.getName());
synchronized (serviceList) {
serviceList.add(service);
}
@@ -92,17 +97,25 @@ protected boolean addIfService(Object object) {
}
}
+ /**
+ * Remove a service
+ * @param service service to remove
+ * @return true if the service was on the list of child services
+ */
protected synchronized boolean removeService(Service service) {
synchronized (serviceList) {
return serviceList.remove(service);
}
}
+ /**
+ * Initialize all child services.
+ * @param conf configuration
+ * @throws Exception
+ */
protected void serviceInit(Configuration conf) throws Exception {
List services = getServices();
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": initing services, size=" + services.size());
- }
+ LOG.debug("{}: initing services, size={}", getName() , services.size());
for (Service service : services) {
service.init(conf);
}
@@ -111,9 +124,7 @@ protected void serviceInit(Configuration conf) throws Exception {
protected void serviceStart() throws Exception {
List services = getServices();
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": starting services, size=" + services.size());
- }
+ LOG.debug("{}: starting services, size={}", getName(), services.size());
for (Service service : services) {
// start the service. If this fails that service
// will be stopped and an exception raised
@@ -125,15 +136,13 @@ protected void serviceStart() throws Exception {
protected void serviceStop() throws Exception {
//stop all services that were started
int numOfServicesToStop = serviceList.size();
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": stopping services, size=" + numOfServicesToStop);
- }
+ LOG.debug("{}: stopping services, size={}", getName(), numOfServicesToStop);
stop(numOfServicesToStop, STOP_ONLY_STARTED_SERVICES);
super.serviceStop();
}
/**
- * Stop the services in reverse order
+ * Stop the services in reverse order.
*
* @param numOfServicesStarted index from where the stop should work
* @param stopOnlyStartedServices flag to say "only start services that are
@@ -147,9 +156,7 @@ private void stop(int numOfServicesStarted, boolean stopOnlyStartedServices) {
List services = getServices();
for (int i = numOfServicesStarted - 1; i >= 0; i--) {
Service service = services.get(i);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping service #" + i + ": " + service);
- }
+ LOG.debug("Stopping service #{} : {}", i, service);
STATE state = service.getServiceState();
//depending on the stop police
if (state == STATE.STARTED
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceOperations.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceOperations.java
index 6c03e25..647e5a7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceOperations.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceOperations.java
@@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.slf4j.Logger;
/**
* This class contains a set of methods to work with services, especially
@@ -87,6 +88,27 @@ public static Exception stopQuietly(Log log, Service service) {
return null;
}
+ /**
+ * Stop a service; if it is null do nothing. Exceptions are caught and
+ * logged at warn level. (but not Throwables). This operation is intended to
+ * be used in cleanup operations
+ *
+ * @param log the log to warn at
+ * @param service a service; may be null
+ * @return any exception that was caught; null if none was.
+ * @see ServiceOperations#stopQuietly(Service)
+ */
+ public static Exception stopQuietly(Logger log, Service service) {
+ try {
+ stop(service);
+ } catch (Exception e) {
+ log.warn("When stopping the service {}: {}" , service.getName(), e, e);
+ return e;
+ }
+ return null;
+ }
+
+
/**
* Class to manage a list of {@link ServiceStateChangeListener} instances,
@@ -99,7 +121,7 @@ public static Exception stopQuietly(Log log, Service service) {
* that it will never be null.
*/
private final List listeners =
- new ArrayList();
+ new ArrayList<>();
/**
* Thread-safe addition of a new listener to the end of a list.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceParent.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceParent.java
new file mode 100644
index 0000000..277f191
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/ServiceParent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service;
+
+import java.util.List;
+
+/**
+ * Interface for accessing services that contain one or more child
+ * services.
+ */
+public interface ServiceParent extends Service {
+
+ /**
+ * Add a child service.
+ *
+ * It must be in a consistent state with the
+ * service to which it is being added.
+ * @param service the service to add.
+ */
+ void addService(Service service);
+
+ /**
+ * Get an unmodifiable list of services.
+ * @return a list of child services at the time of invocation -
+ * added services will not be picked up.
+ */
+ List getServices();
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/AbstractWorkflowExecutorService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/AbstractWorkflowExecutorService.java
new file mode 100644
index 0000000..9533b25
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/AbstractWorkflowExecutorService.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * An abstract service that hosts an executor. When the service is stopped,
+ * {@link ExecutorService#shutdownNow()} is invoked.
+ *
+ * The executor itself is not created: it must be set in the constructor
+ * or in {@link #setExecutor(ExecutorService)}
+ */
+public abstract class AbstractWorkflowExecutorService extends AbstractService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractWorkflowExecutorService.class);
+
+ /**
+ * The executor.
+ */
+ private ExecutorService executor;
+
+ /**
+ * Construct an instance with the given name -but no executor.
+ * @param name service name
+ */
+ protected AbstractWorkflowExecutorService(String name) {
+ this(name, null);
+ }
+
+ /**
+ * Construct an instance with the given name and executor.
+ * @param name service name
+ * @param executor executor
+ */
+ protected AbstractWorkflowExecutorService(String name,
+ ExecutorService executor) {
+ super(name);
+ this.executor = executor;
+ }
+
+ /**
+ * Get the executor.
+ * @return the executor
+ */
+ public synchronized ExecutorService getExecutor() {
+ return executor;
+ }
+
+ /**
+ * Set the executor.
+ *
+ * This is protected as it
+ * is intended to be restricted to subclasses
+ * @param ex executor
+ */
+ protected synchronized void setExecutor(ExecutorService ex) {
+ this.executor = ex;
+ }
+
+ /**
+ * Execute the runnable with the executor (which
+ * must have been created already).
+ * @param runnable runnable to execute
+ */
+ public void execute(Runnable runnable) {
+ getExecutor().execute(runnable);
+ }
+
+ /**
+ * Submit a callable.
+ * @param callable callable
+ * @param type of the final get
+ * @return a future to wait on
+ */
+ public Future submit(Callable callable) {
+ return getExecutor().submit(callable);
+ }
+
+ /**
+ * Stop the service: halt any executor.
+ * @throws Exception exception.
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ stopExecutor();
+ }
+
+ /**
+ * Stop the executor if it is not null.
+ * This uses {@link ExecutorService#shutdownNow()}
+ * and so does not block until they have completed.
+ */
+ protected synchronized void stopExecutor() {
+ if (executor != null) {
+ LOG.debug("Stopping executor");
+ executor.shutdownNow();
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcess.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcess.java
new file mode 100644
index 0000000..1155dcf
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcess.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Execute a long-lived process.
+ *
+ *
+ * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
+ * a short lived application; this class allows for the process to run for the
+ * life of the Java process that forked it.
+ * It is designed to be embedded inside a YARN service, though this is not
+ * the sole way that it can be used
+ *
+ * Key Features:
+ *
+ * - Output is streamed to the output logger provided
+ * - The most recent lines of output are saved to a linked list.
+ * - A synchronous callback, {@link LongLivedProcessLifecycleEvent},
+ * is raised on the start and finish of a process.
+ *
+ *
+ */
+public class LongLivedProcess implements Runnable {
+ /**
+ * Limit on number of lines to retain in the "recent" line list: {@value}.
+ */
+ public static final int RECENT_LINE_LOG_LIMIT = 64;
+
+ /**
+ * Const defining the time in millis between polling for new text.
+ */
+ private static final int STREAM_READER_SLEEP_TIME = 200;
+
+ /**
+ * Limit on the length of a stream before it triggers an automatic newline.
+ */
+ private static final int LINE_LENGTH = 256;
+ private final ProcessBuilder processBuilder;
+ private Process process;
+ private Integer exitCode = null;
+ private final String description;
+ private final ExecutorService processExecutor;
+ private final ExecutorService logExecutor;
+
+ private ProcessStreamReader processStreamReader;
+ /** list of recent lines, recorded for extraction into reports. */
+ private final List recentLines = new LinkedList<>();
+ // tagged as volatile to stop findbugs complaining
+ private volatile int recentLineLimit = RECENT_LINE_LOG_LIMIT;
+ private LongLivedProcessLifecycleEvent lifecycleCallback;
+ private final AtomicBoolean finalOutputProcessed = new AtomicBoolean(false);
+
+ /**
+ * Log supplied in the constructor for the spawned process -accessible
+ * to inner classes.
+ */
+ private Logger processLog;
+
+ /**
+ * Class log -accessible to inner classes.
+ */
+ private static final Logger LOG =
+ LoggerFactory.getLogger(LongLivedProcess.class);
+
+ /**
+ * Volatile flag to indicate that the process is done.
+ */
+ private volatile boolean finished;
+
+ /**
+ * Construct a service instance.
+ * @param name name of the process
+ * @param processLog log to log to
+ * @param commands list of commands
+ */
+ public LongLivedProcess(String name,
+ Logger processLog,
+ List commands) {
+ Preconditions.checkArgument(processLog != null, "null processLog");
+ Preconditions.checkArgument(commands != null, "null command list");
+ Preconditions.checkArgument(!commands.isEmpty(), "empty command list");
+ String command = commands.get(0);
+ Preconditions.checkArgument(command != null && !command.isEmpty(),
+ "null or empty command list");
+
+ this.description = String.format("%s: \"%s\"", name, command);
+ this.processLog = processLog;
+ ServiceThreadFactory factory = new ServiceThreadFactory(name, true);
+ processExecutor = Executors.newSingleThreadExecutor(factory);
+ logExecutor = Executors.newSingleThreadExecutor(factory);
+ processBuilder = new ProcessBuilder(commands);
+ processBuilder.redirectErrorStream(false);
+ }
+
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(description);
+ sb.append(", running=").append(isRunning());
+ sb.append(", finished=").append(finished);
+ sb.append(", exitCode=").append(exitCode);
+ return sb.toString();
+ }
+
+ /**
+ * Set the limit on recent lines to retain.
+ * @param limit size of rolling list of recent lines.
+ */
+ public void setRecentLineLimit(int limit) {
+ this.recentLineLimit = limit;
+ }
+
+ /**
+ * Set an optional application exit callback.
+ * @param lifecycleCallback callback to notify on application exit
+ */
+ public void setLifecycleCallback(
+ LongLivedProcessLifecycleEvent lifecycleCallback) {
+ this.lifecycleCallback = lifecycleCallback;
+ }
+
+ /**
+ * Add an entry to the environment.
+ * @param envVar envVar -must not be null
+ * @param val value
+ */
+ public void setEnv(String envVar, String val) {
+ Preconditions.checkArgument(envVar != null, "envVar");
+ Preconditions.checkArgument(val != null, "val");
+ processBuilder.environment().put(envVar, val);
+ }
+
+ /**
+ * Bulk set the environment from a map.
+ * This does not replace the existing environment, just extends it
+ * and/or overwrites existing entries.
+ * @param map map to add
+ */
+ public void putEnvMap(Map map) {
+ for (Map.Entry entry : map.entrySet()) {
+ String val = entry.getValue();
+ String key = entry.getKey();
+ setEnv(key, val);
+ }
+ }
+
+ /**
+ * Get the process environment.
+ * @param variable environment variable
+ * @return the value or null if there is no match
+ */
+ public String getEnv(String variable) {
+ return processBuilder.environment().get(variable);
+ }
+
+ /**
+ * Set the process log.
+ *
+ * Ignored once the process starts
+ * @param log new log ... may be null
+ */
+ public void setProcessLog(Logger log) {
+ this.processLog = log;
+ }
+
+ /**
+ * Get the process reference.
+ * @return the process -null if the process is not started
+ */
+ public Process getProcess() {
+ return process;
+ }
+
+ /**
+ * Get the process builder -this can be manipulated
+ * up to the {@Code start()} operation.
+ *
+ * As there is no synchronization
+ * around it, it must only be used in the same thread setting up the command.
+ * @return the process builder
+ */
+ public ProcessBuilder getProcessBuilder() {
+ return processBuilder;
+ }
+
+ /**
+ * Get the command list.
+ * @return the commands
+ */
+ public List getCommands() {
+ return processBuilder.command();
+ }
+
+ /**
+ * Get the first command. Construction-time checks guarantee
+ * that this always returns a string.
+ * @return the command to execute
+ */
+ public String getCommand() {
+ return getCommands().get(0);
+ }
+
+ /**
+ * Probe to see if the process is running.
+ * @return true iff the process has been started and is not yet finished
+ */
+ public boolean isRunning() {
+ return process != null && !finished;
+ }
+
+ /**
+ * Get the exit code: null until the process has finished.
+ * @return the exit code or null
+ */
+ public Integer getExitCode() {
+ return exitCode;
+ }
+
+ /**
+ * Get the exit code sign corrected: null until the process has finished.
+ * @return the exit code or null
+ */
+ public Integer getExitCodeSignCorrected() {
+ Integer result;
+ if (exitCode != null) {
+ result = signCorrectExitCode(exitCode);
+ } else {
+ result = null;
+ }
+ return result;
+ }
+
+ /**
+ * Stop the process if it is running.
+ * This will trigger an application completion event with the given exit code
+ */
+ public void stop() {
+ if (!isRunning()) {
+ return;
+ }
+ process.destroy();
+ }
+
+ /**
+ * Get a text description of the builder suitable for log output.
+ * @return a multiline string
+ */
+ protected String describeBuilder() {
+ StringBuilder buffer = new StringBuilder();
+ for (String arg : processBuilder.command()) {
+ buffer.append('"').append(arg).append("\" ");
+ }
+ return buffer.toString();
+ }
+
+ /**
+ * Dump the environment to a string builder.
+ * @param buffer the buffer to append to
+ */
+ public void dumpEnv(StringBuilder buffer) {
+ buffer.append("\nEnvironment\n-----------");
+ Map env = processBuilder.environment();
+ Set keys = env.keySet();
+ List sortedKeys = new ArrayList<>(keys);
+ Collections.sort(sortedKeys);
+ for (String key : sortedKeys) {
+ buffer.append(key).append("=").append(env.get(key)).append('\n');
+ }
+ }
+
+ /**
+ * Execute the process.
+ * @return the process
+ * @throws IOException on any problem
+ * @throws FileNotFoundException if the process could not be found
+ */
+ private Process spawnChildProcess() throws IOException {
+ if (process != null) {
+ throw new IOException("Process already started");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Spawning process:\n {}", describeBuilder());
+ }
+ process = processBuilder.start();
+ return process;
+ }
+
+ /**
+ * Entry point for waiting for the program to finish.
+ */
+ @Override // Runnable
+ public void run() {
+ LOG.debug("Lifecycle callback thread running");
+ //notify the callback that the process has started
+ if (lifecycleCallback != null) {
+ lifecycleCallback.onProcessStarted(this);
+ }
+ try {
+ //close stdin for the process
+ IOUtils.closeStream(process.getOutputStream());
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ LOG.debug("Process wait interrupted -exiting thread", e);
+ } finally {
+ //here the process has finished
+ LOG.debug("process {} has finished", description);
+ //tell the logger it has to finish too
+ finished = true;
+
+ // shut down the threads
+ logExecutor.shutdown();
+ try {
+ logExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ //ignored
+ LOG.debug("Interrupted while waiting for logExectuor to stop");
+ }
+
+ //now call the callback if it is set
+ if (lifecycleCallback != null) {
+ lifecycleCallback.onProcessExited(this, exitCode,
+ getExitCodeSignCorrected());
+ }
+ }
+ }
+
+ /**
+ * Spawn the application.
+ * @throws IOException IO problems
+ */
+ public void start() throws IOException {
+
+ spawnChildProcess();
+ processStreamReader =
+ new ProcessStreamReader(processLog, STREAM_READER_SLEEP_TIME);
+ LOG.debug("Submitting process stream reader for execution on a thread");
+ logExecutor.submit(processStreamReader);
+ LOG.debug("Submitting self for execution on a thread");
+ processExecutor.submit(this);
+ }
+
+ /**
+ * Get the lines of recent output.
+ * @return the last few lines of output; an empty list if there are none
+ * or the process is not actually running
+ */
+ public synchronized List getRecentOutput() {
+ return new ArrayList<>(recentLines);
+ }
+
+ /**
+ * @return whether lines of recent output are empty.
+ */
+ public synchronized boolean isRecentOutputEmpty() {
+ return recentLines.isEmpty();
+ }
+
+ /**
+ * Query to see if the final output has been processed.
+ * @return true if the final output has been processed
+ */
+ public boolean isFinalOutputProcessed() {
+ return finalOutputProcessed.get();
+ }
+
+ /**
+ * Get the recent output from the process, or an empty list if not defined.
+ *
+ * @param finalOutput flag to indicate
+ * "wait for the final output of the process"
+ * @param duration the duration, in milliseconds,
+ * to wait for recent output to become non-empty
+ * @return a possibly empty list
+ */
+ public List getRecentOutput(boolean finalOutput, int duration) {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start <= duration) {
+ boolean finishedOutput;
+ if (finalOutput) {
+ // final flag means block until all data is done
+ finishedOutput = isFinalOutputProcessed();
+ } else {
+ // there is some output
+ finishedOutput = !isRecentOutputEmpty();
+ }
+ if (finishedOutput) {
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ return getRecentOutput();
+ }
+
+ /**
+ * Add the recent line to the list of recent lines; deleting
+ * an earlier on if the limit is reached.
+ *
+ * Implementation note: yes, a circular array would be more
+ * efficient, especially with some power of two as the modulo,
+ * but is it worth the complexity and risk of errors for
+ * something that is only called once per line of IO?
+ * @param line line to record
+ * @param isErrorStream is the line from the error stream
+ * @param logger logger to log to - null for no logging
+ */
+ private synchronized void recordRecentLine(String line,
+ boolean isErrorStream,
+ Logger logger) {
+ if (line == null) {
+ return;
+ }
+ String entry = (isErrorStream ? "[ERR] " : "[OUT] ") + line;
+ recentLines.add(entry);
+ if (recentLines.size() > recentLineLimit) {
+ recentLines.remove(0);
+ }
+ if (logger != null) {
+ if (isErrorStream) {
+ logger.warn(line);
+ } else {
+ logger.info(line);
+ }
+ }
+ }
+
+ /**
+ * Class to read data from the two process streams, and, when run in a thread
+ * to keep running until the done flag is set.
+ * Lines are fetched from stdout and stderr and logged at info and error
+ * respectively.
+ */
+ private final class ProcessStreamReader implements Runnable {
+ private final Logger streamLog;
+ private final int sleepTime;
+
+ private ProcessStreamReader(Logger streamLog, int sleepTime) {
+ this.streamLog = streamLog;
+ this.sleepTime = sleepTime;
+ }
+
+ /**
+ * Return a character if there is one, -1 if nothing is ready yet.
+ * @param reader reader
+ * @return the value from the reader, or -1 if it is not ready
+ * @throws IOException IO problems
+ */
+ private int readCharNonBlocking(BufferedReader reader) throws IOException {
+ if (reader.ready()) {
+ return reader.read();
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Read in a line, or, if the limit has been reached, the buffer
+ * so far.
+ * @param reader source of data
+ * @param line line to build
+ * @param limit limit of line length
+ * @return true if the line can be printed
+ * @throws IOException IO trouble
+ */
+ @SuppressWarnings("NestedAssignment")
+ private boolean readAnyLine(BufferedReader reader,
+ StringBuilder line,
+ int limit) throws IOException {
+ int next;
+ while ((-1 != (next = readCharNonBlocking(reader)))) {
+ if (next != '\n') {
+ line.append((char) next);
+ limit--;
+ if (line.length() > limit) {
+ //enough has been read in to print it any
+ return true;
+ }
+ } else {
+ //line end return flag to say so
+ return true;
+ }
+ }
+ //here the end of the stream is hit, or the limit
+ return false;
+ }
+
+
+ @Override //Runnable
+ @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
+ public void run() {
+ StringBuilder outLine = new StringBuilder(LINE_LENGTH);
+ StringBuilder errorLine = new StringBuilder(LINE_LENGTH);
+ try(BufferedReader errReader =
+ new BufferedReader(
+ new InputStreamReader(process.getErrorStream()));
+ BufferedReader outReader =
+ new BufferedReader(
+ new InputStreamReader(process.getInputStream()))) {
+ while (!finished) {
+ boolean processed = false;
+ if (readAnyLine(errReader, errorLine, LINE_LENGTH)) {
+ recordRecentLine(errorLine.toString(), true, streamLog);
+ errorLine.setLength(0);
+ processed = true;
+ }
+ if (readAnyLine(outReader, outLine, LINE_LENGTH)) {
+ recordRecentLine(outLine.toString(), false, streamLog);
+ outLine.setLength(0);
+ processed |= true;
+ }
+ if (!processed && !finished) {
+ //nothing processed: wait a bit for data.
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ //ignore this, rely on the done flag
+ LOG.debug("Ignoring ", e);
+ }
+ }
+ }
+ // finished: cleanup
+
+ //print the current error line then stream through the rest
+ recordFinalOutput(errReader, errorLine, true, streamLog);
+ //now do the info line
+ recordFinalOutput(outReader, outLine, false, streamLog);
+
+
+ } catch (Exception ignored) {
+ LOG.warn("encountered {}", ignored, ignored);
+ //process connection has been torn down
+ } finally {
+ //mark output as done
+ finalOutputProcessed.set(true);
+ }
+ }
+ }
+
+ /**
+ * Record the final output of a process stream.
+ * @param reader reader of output
+ * @param lineBuilder string builder into which line is built
+ * @param isErrorStream flag to indicate whether or not this is the
+ * is the line from the error stream
+ * @param logger logger to log to
+ * @throws IOException problems reading the data
+ */
+ protected void recordFinalOutput(BufferedReader reader,
+ StringBuilder lineBuilder, boolean isErrorStream, Logger logger) throws
+ IOException {
+ String line = lineBuilder.toString();
+ recordRecentLine(line, isErrorStream, logger);
+ line = reader.readLine();
+ while (line != null) {
+ recordRecentLine(line, isErrorStream, logger);
+ line = reader.readLine();
+ if (Thread.interrupted()) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Sign correct an exit code.
+ * @param code an integer code value in the range 0-255.
+ * @return a code in the range -127 to +128.
+ */
+ public static int signCorrectExitCode(int code) {
+ return (code << 24) >> 24;
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcessLifecycleEvent.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcessLifecycleEvent.java
new file mode 100644
index 0000000..16708fa
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/LongLivedProcessLifecycleEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+/**
+ * Callback invoked when a long-lived application exits.
+ */
+public interface LongLivedProcessLifecycleEvent {
+
+ /**
+ * Callback when a process is started.
+ * @param process the process invoking the callback
+ */
+ void onProcessStarted(LongLivedProcess process);
+
+ /**
+ * Callback when a process has finished.
+ * @param process the process invoking the callback
+ * @param exitCode exit code from the process
+ * @param signCorrectedCode the code- as sign corrected
+ */
+ void onProcessExited(LongLivedProcess process,
+ int exitCode,
+ int signCorrectedCode);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingCallable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingCallable.java
new file mode 100644
index 0000000..ff7afe2
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingCallable.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceOperations;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A runnable which terminates its owner; it also catches any
+ * exception raised and can serve it back.
+ *
+ */
+public class ServiceTerminatingCallable implements Callable {
+
+ /**
+ * Owning service: may be null.
+ */
+ private final Service owner;
+
+ /**
+ * Any exception raised.
+ */
+ private Exception exception;
+ /**
+ * This is the callback.
+ */
+ private final Callable callable;
+
+ /**
+ * Create an instance. If the owner is null, the owning service
+ * is not terminated.
+ * @param owner owning service -can be null
+ * @param callable callback.
+ */
+ public ServiceTerminatingCallable(Service owner,
+ Callable callable) {
+ Preconditions.checkArgument(callable != null, "null callable");
+ this.owner = owner;
+ this.callable = callable;
+ }
+
+ /**
+ * Get the owning service.
+ * @return the service to receive notification when
+ * the runnable completes.
+ */
+ public Service getOwner() {
+ return owner;
+ }
+
+ /**
+ * Any exception raised by inner action's run.
+ * @return an exception or null.
+ */
+ public Exception getException() {
+ return exception;
+ }
+
+ /**
+ * Delegates the call to the callable supplied in the constructor,
+ * then calls the stop() operation on its owner.
+ *
+ * Any exception is caught, noted and rethrown
+ * @return the outcome of the delegated call operation
+ * @throws Exception if one was raised.
+ */
+ @Override
+ public V call() throws Exception {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ exception = e;
+ throw e;
+ } finally {
+ // this operation is a no-op if owner==null
+ ServiceOperations.stop(owner);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingRunnable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingRunnable.java
new file mode 100644
index 0000000..2b606cb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceTerminatingRunnable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.Service;
+
+/**
+ * A runnable which terminates its after running; it also catches any
+ * exception raised and can serve it back.
+ */
+public class ServiceTerminatingRunnable implements Runnable {
+
+ private final Service owner;
+ private final Runnable action;
+ private Exception exception;
+
+ /**
+ * Create an instance.
+ * @param owner owning service: may be null
+ * @param action action to execute before terminating the service
+ */
+ public ServiceTerminatingRunnable(Service owner, Runnable action) {
+ Preconditions.checkArgument(owner != null, "null owner");
+ Preconditions.checkArgument(action != null, "null action");
+ this.owner = owner;
+ this.action = action;
+ }
+
+ /**
+ * Get the owning service.
+ * @return the service to receive notification when
+ * the runnable completes.
+ */
+ public Service getOwner() {
+ return owner;
+ }
+
+ /**
+ * Any exception raised by inner action's run.
+ * @return an exception or null.
+ */
+ public Exception getException() {
+ return exception;
+ }
+
+ @Override
+ public void run() {
+ try {
+ action.run();
+ } catch (Exception e) {
+ exception = e;
+ }
+ owner.stop();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceThreadFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceThreadFactory.java
new file mode 100644
index 0000000..10156b1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/ServiceThreadFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A thread factory that creates threads (possibly daemon threads)
+ * using the name and naming policy supplied.
+ * The thread counter starts at 1, increments atomically,
+ * and is supplied as the second argument in the format string.
+ *
+ * A static method, {@link #singleThreadExecutor(String, boolean)},
+ * exists to simplify the construction of an executor with a single well-named
+ * threads.
+ *
+ * Example
+ *
+ * ExecutorService exec =
+ * ServiceThreadFactory.newSingleThreadExecutor("live", true)
+ *
+ */
+public class ServiceThreadFactory implements ThreadFactory {
+
+ private static AtomicInteger counter = new AtomicInteger(1);
+
+ /**
+ * Default format for thread names: {@value}.
+ */
+ public static final String DEFAULT_NAMING_FORMAT = "%s-%03d";
+ private final String name;
+ private final boolean daemons;
+ private final String namingFormat;
+
+ /**
+ * Create an instance.
+ * @param name base thread name
+ * @param daemons flag to indicate the threads should be marked as daemons
+ * @param namingFormat format string to generate thread names from
+ */
+ public ServiceThreadFactory(String name,
+ boolean daemons,
+ String namingFormat) {
+ Preconditions.checkArgument(name != null, "null name");
+ Preconditions.checkArgument(namingFormat != null, "null naming format");
+ this.name = name;
+ this.daemons = daemons;
+ this.namingFormat = namingFormat;
+ }
+
+ /**
+ *
+ * Create an instance with the default naming format.
+ * @param name base thread name
+ * @param daemons flag to indicate the threads should be marked as daemons
+ */
+ public ServiceThreadFactory(String name,
+ boolean daemons) {
+ this(name, daemons, DEFAULT_NAMING_FORMAT);
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Preconditions.checkArgument(r != null, "null runnable");
+ String threadName =
+ String.format(namingFormat, name, counter.getAndIncrement());
+ Thread thread = new Thread(r, threadName);
+ thread.setDaemon(daemons);
+ return thread;
+ }
+
+ /**
+ * Create a single thread executor using this naming policy.
+ * @param name base thread name
+ * @param daemons flag to indicate the threads should be marked as daemons
+ * @return an executor
+ */
+ public static ExecutorService singleThreadExecutor(String name,
+ boolean daemons) {
+ return Executors.newSingleThreadExecutor(
+ new ServiceThreadFactory(name, daemons));
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCallbackService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCallbackService.java
new file mode 100644
index 0000000..602d0a3
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCallbackService.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service that calls the supplied callback when it is started -after the
+ * given delay.
+ *
+ * It can be configured to stop itself after the callback has
+ * completed, marking any exception raised as the exception of this service.
+ * The notifications come in on a callback thread -a thread that is only
+ * started in this service's start() operation.
+ */
+public class WorkflowCallbackService extends
+ AbstractWorkflowExecutorService {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(WorkflowCallbackService.class);
+
+ /**
+ * This is the callback.
+ */
+ private final Callable callback;
+ private final int delay;
+ private final ServiceTerminatingCallable command;
+
+ private ScheduledFuture scheduledFuture;
+
+ /**
+ * Create an instance of the service.
+ *
+ * @param name service name
+ * @param callback callback to invoke
+ * @param delay delay -or 0 for no delay
+ * @param terminate terminate this service after the callback?
+ */
+ public WorkflowCallbackService(String name,
+ Callable callback,
+ int delay,
+ boolean terminate) {
+ super(name);
+ Preconditions.checkNotNull(callback, "Null callback argument");
+ this.callback = callback;
+ this.delay = delay;
+ command = new ServiceTerminatingCallable(
+ terminate ? this : null,
+ callback);
+ }
+
+ public ScheduledFuture getScheduledFuture() {
+ return scheduledFuture;
+ }
+
+ /**
+ * Starting the service will schedule the callback to be invoked after the
+ * delay passed in. This will be in a separate thread from the one which
+ * has made the {@code Service.start()} call.
+ * @throws Exception
+ */
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
+ ScheduledExecutorService executorService =
+ Executors.newSingleThreadScheduledExecutor(
+ new ServiceThreadFactory(getName(), true));
+ setExecutor(executorService);
+ scheduledFuture =
+ executorService.schedule(command, delay, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Stop the service.
+ *
+ * If there is any exception noted from any executed notification,
+ * note the exception in this class
+ * @throws Exception exception.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ // propagate any failure
+ if (getCallbackException() != null) {
+ throw getCallbackException();
+ }
+ }
+
+ /**
+ * Get the exception raised by a callback. Will always be null if the
+ * callback has not been executed; will only be non-null after any success.
+ * @return a callback
+ */
+ public Exception getCallbackException() {
+ return command.getException();
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowClosingService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowClosingService.java
new file mode 100644
index 0000000..6dd2ceb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowClosingService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.service.AbstractService;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Service that closes the closeable supplied during shutdown, if not null.
+ *
+ * As the Service interface itself extends Closeable, this service
+ * can be used to shut down other services if desired.
+ */
+public class WorkflowClosingService
+ extends AbstractService {
+
+ /**
+ * Entity to close.
+ */
+ private C closeable;
+
+ /**
+ * Construct an instance of the service.
+ * @param name service name
+ * @param closeable closeable to close (may be null)
+ */
+ public WorkflowClosingService(String name, C closeable) {
+ super(name);
+ this.closeable = closeable;
+ }
+
+ /**
+ * Construct an instance of the service, using the default name.
+ * @param closeable closeable to close (may be null)
+ */
+ public WorkflowClosingService(C closeable) {
+ this("WorkflowClosingService", closeable);
+ }
+
+ /**
+ * Get the closeable.
+ * @return the closeable
+ */
+ public synchronized C getCloseable() {
+ return closeable;
+ }
+
+ /**
+ * Set or update the closeable.
+ * @param c new closeable
+ */
+ public synchronized void setCloseable(C c) {
+ this.closeable = c;
+ }
+
+ /**
+ * Stop routine will close the closeable -if not null - and set the
+ * reference to null afterwards.
+ * This operation does raise any exception on the close, though it does
+ * record it.
+ */
+ @Override
+ protected void serviceStop() {
+ C target = getCloseable();
+ if (target != null) {
+ try {
+ target.close();
+ } catch (IOException ioe) {
+ noteFailure(ioe);
+ }
+ setCloseable(null);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCompositeService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCompositeService.java
new file mode 100644
index 0000000..b69d98f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowCompositeService.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceParent;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * An extended composite service which stops itself if any child service
+ * fails, or when all its children have successfully stopped without failure.
+ *
+ * Lifecycle
+ *
+ * - If any child exits with a failure: this service stops, propagating
+ * the exception.
+ * - When all child services has stopped, this service stops itself
+ *
+ *
+ */
+public class WorkflowCompositeService extends CompositeService
+ implements ServiceParent, ServiceStateChangeListener {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WorkflowCompositeService.class);
+
+ /**
+ * Construct an instance.
+ * @param name name of this service instance
+ */
+ public WorkflowCompositeService(String name) {
+ super(name);
+ }
+
+ /**
+ * Construct an instance with the default name.
+ */
+ public WorkflowCompositeService() {
+ this("WorkflowCompositeService");
+ }
+
+ /**
+ * Varargs constructor.
+ * @param name name of this service instance
+ * @param children children
+ */
+ public WorkflowCompositeService(String name, Service... children) {
+ this(name);
+ for (Service child : children) {
+ addService(child);
+ }
+ }
+
+ /**
+ * Construct with a list of children.
+ * @param name name of this service instance
+ * @param children children to add
+ */
+ public WorkflowCompositeService(String name, List children) {
+ this(name);
+ for (Service child : children) {
+ addService(child);
+ }
+ }
+
+ /**
+ * Add a service, and register it.
+ * @param service the {@link Service} to be added.
+ */
+ @Override
+ public synchronized void addService(Service service) {
+ Preconditions.checkArgument(service != null, "null service argument");
+ service.registerServiceListener(this);
+ super.addService(service);
+ }
+
+ /**
+ * Handle state change events.
+ *
+ * When this service is started, any service stopping with a failure
+ * exception is converted immediately into a failure of this service,
+ * storing the failure and stopping ourselves.
+ * @param child the service that has changed.
+ */
+ @Override
+ public void stateChanged(Service child) {
+ //if that child stopped while we are running:
+ if (isInState(STATE.STARTED) && child.isInState(STATE.STOPPED)) {
+ // a child service has stopped
+ //did the child fail? if so: propagate
+ Throwable failureCause = child.getFailureCause();
+ if (failureCause != null) {
+ LOG.info("Child service {} failed: {}", child, failureCause.toString());
+ //failure. Convert to an exception
+ Exception e = (failureCause instanceof Exception) ?
+ (Exception) failureCause : new Exception(failureCause);
+ //flip ourselves into the failed state
+ noteFailure(e);
+ stop();
+ } else {
+ LOG.debug("Child service completed {}", child);
+ if (areAllChildrenStopped()) {
+ LOG.debug("All children are halted: stopping");
+ stop();
+ }
+ }
+ }
+ }
+
+ /**
+ * Probe to query if all children are stopped -simply
+ * by taking a snapshot of the child service list and enumerating
+ * their state.
+ * The state of the children may change during this operation -that will
+ * not get picked up.
+ * @return true if all the children are stopped.
+ */
+ private boolean areAllChildrenStopped() {
+ List children = getServices();
+ boolean stopped = true;
+ for (Service child : children) {
+ if (!child.isInState(STATE.STOPPED)) {
+ stopped = false;
+ break;
+ }
+ }
+ return stopped;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowForkedProcessService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowForkedProcessService.java
new file mode 100644
index 0000000..35f5ba2
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowForkedProcessService.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.util.ExitUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Service wrapper for an external program that is launched and can/will
+ * terminate.
+ *
+ * This service is notified when the subprocess terminates, and stops itself
+ * and converts a non-zero exit code into a failure exception.
+ *
+ * Key Features:
+ *
+ * - The property {@link #executionTimeout} can be set to set a limit
+ * on the duration of a process.
+ * - Output is streamed to the output logger provided.
+ * - The most recent lines of output are saved to a linked list.
+ * - A synchronous callback, {@link LongLivedProcessLifecycleEvent},
+ * is raised on the start and finish of a process.
+ *
+ *
+ * Usage:
+ *
+ * The service can be built in the constructor,
+ * {@link #WorkflowForkedProcessService(String, Map, List)},
+ * or have its simple constructor used to instantiate the service, then the
+ * {@link #build(Map, List)} command used to define the environment variables
+ * and list of commands to execute. One of these two options MUST be exercised
+ * before calling the services's {@link #start()} method.
+ *
+ * The forked process is executed in the service's {@link #serviceStart()}
+ * method;
+ * if still running when the service is stopped, {@link #serviceStop()} will
+ * attempt to stop it.
+ *
+ *
+ * The service delegates process execution to {@link LongLivedProcess},
+ * receiving callbacks via the {@link LongLivedProcessLifecycleEvent}.
+ * When the service receives a callback notifying that the process has
+ * completed, it calls its {@link #stop()} method. If the error code was
+ * non-zero, the service is logged as having failed.
+ */
+public class WorkflowForkedProcessService
+ extends AbstractWorkflowExecutorService
+ implements LongLivedProcessLifecycleEvent, Runnable {
+
+ /**
+ * Log for the forked master process.
+ */
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WorkflowForkedProcessService.class);
+ public static final String ERROR_PROCESS_NOT_SET =
+ "Process not yet configured";
+
+ private final AtomicBoolean processTerminated = new AtomicBoolean(false);
+ private boolean processStarted = false;
+ private LongLivedProcess process;
+ private int executionTimeout = -1;
+ private int timeoutCode = 1;
+
+ /**
+ * Log to log to; defaults to this service log.
+ */
+ private Logger processLog = LOG;
+
+ /**
+ * Exit code set when the spawned process exits.
+ */
+ private final AtomicInteger exitCode = new AtomicInteger(0);
+
+ /**
+ * Create an instance of the service.
+ * @param name a name
+ */
+ public WorkflowForkedProcessService(String name) {
+ super(name);
+ }
+
+ /**
+ * Create an instance of the service, set up the process.
+ * @param name a name
+ * @param commandList list of commands is inserted on the front
+ * @param env environment variables above those generated by
+ * @throws IOException IO problems
+ */
+ public WorkflowForkedProcessService(String name,
+ Map env,
+ List commandList) throws IOException {
+ super(name);
+ build(env, commandList);
+ }
+
+ /**
+ * Start the service by starting the inner process.
+ * @throws ServiceStateException if no process has been set up to launch.
+ * @throws Exception
+ */
+ @Override //AbstractService
+ protected void serviceStart() throws Exception {
+ if (process == null) {
+ throw new ServiceStateException(ERROR_PROCESS_NOT_SET);
+ }
+ //now spawn the process -expect updates via callbacks
+ process.start();
+ }
+
+ @Override //AbstractService
+ protected void serviceStop() throws Exception {
+ //tag as completed if not already; use the current exit code
+ completed(exitCode.get());
+ stopForkedProcess();
+ }
+
+ /**
+ * Stopped the forked process.
+ */
+ private void stopForkedProcess() {
+ if (process != null) {
+ process.stop();
+ }
+ }
+
+ /**
+ * Set the process log. This may be null for "do not log".
+ * @param processLog process log
+ */
+ public void setProcessLog(Logger processLog) {
+ this.processLog = processLog;
+ process.setProcessLog(processLog);
+ }
+
+ /**
+ * Set the timeout by which time a process must have finished
+ * -or -1 for forever.
+ * @param timeout timeout in milliseconds
+ */
+ public void setTimeout(int timeout, int code) {
+ this.executionTimeout = timeout;
+ this.timeoutCode = code;
+ }
+
+ /**
+ * Build the process to execute when the service is started.
+ * @param commandList list of commands is inserted on the front
+ * @param env environment variables above those generated by
+ * @throws IOException IO problems
+ */
+ public void build(Map env,
+ List commandList)
+ throws IOException {
+ Preconditions.checkState(process == null, "process already started");
+ process = new LongLivedProcess(getName(), processLog, commandList);
+ process.setLifecycleCallback(this);
+ //set the env variable mapping
+ process.putEnvMap(env);
+ }
+
+ @Override // notification from executed process
+ public synchronized void onProcessStarted(LongLivedProcess ps) {
+ LOG.debug("Process has started");
+ processStarted = true;
+ if (executionTimeout > 0) {
+ setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
+ execute(this);
+ }
+ }
+
+ @Override // notification from executed process
+ public void onProcessExited(LongLivedProcess ps,
+ int uncorrected,
+ int code) {
+ try {
+ synchronized (this) {
+ completed(code);
+ //note whether or not the service had already stopped
+ LOG.debug("Process has exited with exit code {}", code);
+ if (code != 0) {
+ reportFailure(code, getName() + " failed with code " + code);
+ }
+ }
+ } finally {
+ stop();
+ }
+ }
+
+ /**
+ * Report a failure by building an {@link ExitUtil.ExitException}
+ * with the given exit code, then calling {@link #noteFailure(Exception)}
+ * to log it as this services failure exception.
+ * @param code exit code
+ * @param text error text
+ */
+ private void reportFailure(int code, String text) {
+ //error
+ ExitUtil.ExitException execEx = new ExitUtil.ExitException(code, text);
+ LOG.debug("Noting failure", execEx);
+ noteFailure(execEx);
+ }
+
+ /**
+ * handle timeout response by escalating it to a failure.
+ */
+ @Override
+ public void run() {
+ try {
+ synchronized (processTerminated) {
+ if (!processTerminated.get()) {
+ processTerminated.wait(executionTimeout);
+ }
+ }
+
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted");
+ //assume signalled; exit
+ }
+ //check the status; if the marker isn't true, bail
+ if (!processTerminated.getAndSet(true)) {
+ LOG.info("process timeout: reporting error code {}", timeoutCode);
+
+ //timeout
+ if (isInState(Service.STATE.STARTED)) {
+ //trigger a failure
+ stopForkedProcess();
+ }
+ reportFailure(timeoutCode,
+ getName() + ": timeout after " + executionTimeout
+ + " millis: exit code =" + timeoutCode);
+ }
+ }
+
+ /**
+ * Note the process as having completed.
+ * The exit code is stored, the process marked as terminated
+ * -and anything synchronized on processTerminated
+ * is notified
+ * @param code exit code
+ */
+ protected void completed(int code) {
+ LOG.debug("Completed with exit code {}", code);
+ synchronized (processTerminated) {
+ // set the exit first, to guarantee that it will always be
+ // valid when processTerminated holds.
+ // only do this if the value is currently 0
+ exitCode.compareAndSet(0, code);
+ processTerminated.set(true);
+ processTerminated.notify();
+ }
+ }
+
+ /**
+ * Is the process terminated?
+ *
+ * This is false until the process is started and then completes.
+ * @return true if the process has been executed to completion.
+ */
+ public boolean isProcessTerminated() {
+ return processTerminated.get();
+ }
+
+ /**
+ * Has the process started?
+ * @return true if the process started.
+ */
+ public synchronized boolean didProcessStart() {
+ return processStarted;
+ }
+
+ /**
+ * Is a process running: between started and terminated.
+ * @return true if the process is up.
+ */
+ public synchronized boolean isProcessRunning() {
+ return processStarted && !isProcessTerminated();
+ }
+
+
+ /**
+ * Get the process exit code.
+ *
+ * If the process has not yet completed, this will be zero.
+ * @return an exit code in the range -127 to +128
+ */
+ public int getExitCode() {
+ return exitCode.get();
+ }
+
+ /**
+ * Get the recent output from the process, or an empty if not defined.
+ * @return a possibly empty list
+ */
+ public List getRecentOutput() {
+ return process != null
+ ? process.getRecentOutput()
+ : new LinkedList();
+ }
+
+ /**
+ * Get the recent output from the process, or an empty list.
+ *
+ * @param finalOutput flag to indicate
+ * "wait for the final output of the process"
+ * @param duration the duration, in ms,
+ * to wait for recent output to become non-empty
+ * @return a possibly empty list
+ */
+ public List getRecentOutput(boolean finalOutput, int duration) {
+ if (process == null) {
+ return new LinkedList<>();
+ }
+ return process.getRecentOutput(finalOutput, duration);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(super.toString());
+ if (process != null) {
+ sb.append(", ").append(process.toString());
+ }
+ sb.append(", processStarted=").append(processStarted);
+ sb.append(", processTerminated=").append(processTerminated);
+ sb.append(", exitCode=").append(exitCode);
+ return sb.toString();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowRpcService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowRpcService.java
new file mode 100644
index 0000000..a2c3fb2
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowRpcService.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A YARN service that maps the start/stop lifecycle of an RPC server
+ * to the YARN service lifecycle.
+ */
+public class WorkflowRpcService extends AbstractService {
+
+ /**
+ * RPC server.
+ *
+ * Invariant: this is non-null.
+ */
+ private final Server server;
+
+ /**
+ * Construct an instance.
+ * @param name service name
+ * @param server service to stop. It must not be null
+ * @throws IllegalArgumentException if server is null.
+ */
+ public WorkflowRpcService(String name, Server server) {
+ super(name);
+ Preconditions.checkArgument(server != null, "Null server");
+ this.server = server;
+ }
+
+ /**
+ * Get the server.
+ * @return the server
+ */
+ public Server getServer() {
+ return server;
+ }
+
+ /**
+ * Get the socket address of this server.
+ * @return the address this server is listening on
+ */
+ public InetSocketAddress getConnectAddress() {
+ return NetUtils.getConnectAddress(server);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ server.start();
+ }
+
+ /**
+ * Stop the service.
+ *
+ * @throws Exception any failure.
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ server.stop();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowSequenceService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowSequenceService.java
new file mode 100644
index 0000000..065b46e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/WorkflowSequenceService.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceParent;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.hadoop.service.ServiceStateException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This resembles the YARN CompositeService, except that it
+ * starts one service after another
+ *
+ * Workflow
+ *
+ * - When the
WorkflowSequenceService instance is
+ * initialized, it only initializes itself.
+ *
+ * - When the
WorkflowSequenceService instance is
+ * started, it initializes then starts the first of its children.
+ * If there are no children, it immediately stops.
+ *
+ * - When the active child stops, it did not fail, and the parent has not
+ * stopped -then the next service is initialized and started. If there is no
+ * remaining child the parent service stops.
+ *
+ * - If the active child did fail, the parent service notes the exception
+ * and stops -effectively propagating up the failure.
+ *
+ *
+ *
+ * New service instances MAY be added to a running instance -but no guarantees
+ * can be made as to whether or not they will be run.
+ */
+
+public class WorkflowSequenceService extends AbstractService implements
+ ServiceParent, ServiceStateChangeListener {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WorkflowSequenceService.class);
+
+ /**
+ * list of services.
+ */
+ private final List serviceList = new ArrayList<>();
+
+ /**
+ * The currently active service.
+ * Volatile -may change & so should be read into a
+ * local variable before working with
+ */
+ private volatile Service activeService;
+
+ /**
+ * The previous service to the one (if any) that is running.
+ * null if none have finished yet; this implicitly
+ * holds before any service has been started.
+ */
+ private volatile Service previousService;
+
+ /**
+ * Construct an instance.
+ * @param name service name
+ */
+ public WorkflowSequenceService(String name) {
+ super(name);
+ }
+
+ /**
+ * Construct an instance with the default name.
+ */
+ public WorkflowSequenceService() {
+ this("WorkflowSequenceService");
+ }
+
+ /**
+ * Create a service sequence with the given list of services.
+ * @param name service name
+ * @param children initial sequence
+ */
+ public WorkflowSequenceService(String name, Service... children) {
+ super(name);
+ for (Service service : children) {
+ addService(service);
+ }
+ }
+
+ /**
+ * Create a service sequence with the given list of services.
+ * @param name service name
+ * @param children initial sequence
+ */
+ public WorkflowSequenceService(String name, List children) {
+ super(name);
+ for (Service service : children) {
+ addService(service);
+ }
+ }
+
+ /**
+ * Get the current service -which may be null.
+ * @return service running
+ */
+ public Service getActiveService() {
+ return activeService;
+ }
+
+ /**
+ * Get the previously active service.
+ * @return the service last run, or null if there is none.
+ */
+ public Service getPreviousService() {
+ return previousService;
+ }
+
+ /**
+ * start the first service.
+ *
+ * If there are none to start, stop() this service.
+ * @throws Exception
+ */
+ @Override
+ protected void serviceStart() throws Exception {
+ if (!startNextService()) {
+ //nothing to start -so stop
+ stop();
+ }
+ }
+
+ /**
+ * Stop the service.
+ *
+ * This stops any currently running child service.
+ * @throws Exception on any failure
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ //stop current service.
+ //this triggers a callback that is caught and ignored
+ Service current = activeService;
+ previousService = current;
+ activeService = null;
+ if (current != null) {
+ current.stop();
+ }
+ }
+
+ /**
+ * Start the next service in the list.
+ *
+ * Return false if there are no more services to run, or this
+ * service has stopped
+ * @return true if a service was started
+ * @throws RuntimeException from any init or start failure
+ * @throws ServiceStateException if this call is made before
+ * the service is started
+ */
+ public synchronized boolean startNextService() {
+ LOG.debug("Starting next child service");
+ if (isInState(STATE.STOPPED)) {
+ //downgrade to a failed
+ LOG.debug("Not starting next service -{} is stopped", this);
+ return false;
+ }
+ if (!isInState(STATE.STARTED)) {
+ //reject attempts to start a service too early
+ throw new ServiceStateException(
+ "Cannot start a child service when not started");
+ }
+ if (serviceList.isEmpty()) {
+ //nothing left to run
+ LOG.debug("No services left to start");
+ return false;
+ }
+ if (activeService != null && activeService.getFailureCause() != null) {
+ //did the last service fail? Is this caused by some premature callback?
+ LOG.debug("Not starting next service due to a failure of {}",
+ activeService);
+ return false;
+ }
+
+ //bear in mind that init & start can fail, which
+ //can trigger re-entrant calls into the state change listener.
+ //by setting the current service to null
+ //the start-next-service logic is skipped.
+ //now, what does that mean w.r.t exit states?
+
+ activeService = null;
+ Service head = serviceList.remove(0);
+ LOG.debug("Starting {}", head);
+ try {
+ head.init(getConfig());
+ head.registerServiceListener(this);
+ head.start();
+ } catch (RuntimeException e) {
+ noteFailure(e);
+ throw e;
+ }
+ //at this point the service must have explicitly started & not failed,
+ //else an exception would have been raised
+ activeService = head;
+
+ return true;
+ }
+
+ /**
+ * State change event relays service stop events to
+ * {@link #onServiceCompleted(Service)}.
+ *
+ * Subclasses can extend that with extra logic
+ * @param service the service that has changed.
+ */
+ @Override
+ public void stateChanged(Service service) {
+ // only react to the state change when it is the current service
+ // and it has entered the STOPPED state
+ if (service == activeService && service.isInState(STATE.STOPPED)) {
+ onServiceCompleted(service);
+ }
+ }
+
+ /**
+ * Handler for service completion: base class starts the next service.
+ * @param service service that has completed
+ */
+ protected synchronized void onServiceCompleted(Service service) {
+ LOG.info("Running service stopped: {}", service);
+ previousService = activeService;
+ //start the next service if we are not stopped ourselves
+ if (isInState(STATE.STARTED)) {
+
+ //did the service fail? if so: propagate
+ Throwable failureCause = service.getFailureCause();
+ if (failureCause != null) {
+ Exception e = (failureCause instanceof Exception)
+ ? (Exception) failureCause
+ : new Exception(failureCause);
+ noteFailure(e);
+ stop();
+ }
+
+ //start the next service
+ boolean started;
+ try {
+ started = startNextService();
+ } catch (Exception e) {
+ //something went wrong here
+ noteFailure(e);
+ started = false;
+ }
+ if (!started) {
+ //no start because list is empty
+ //stop and expect the notification to go upstream
+ stop();
+ }
+ } else {
+ //not started, so just note that the current service
+ //has gone away
+ activeService = null;
+ }
+ }
+
+ /**
+ * Add the passed {@link Service} to the list of services managed by this.
+ * {@link WorkflowSequenceService}
+ * @param service the {@link Service} to be added
+ */
+ @Override
+ public synchronized void addService(Service service) {
+ Preconditions.checkArgument(service != null, "null service argument");
+ LOG.debug("Adding service {} ", service.getName());
+ synchronized (serviceList) {
+ serviceList.add(service);
+ }
+ }
+
+ /**
+ * Get an unmodifiable list of services.
+ * @return a list of child services at the time of invocation -
+ * added services will not be picked up.
+ */
+ @Override //Parent
+ public synchronized List getServices() {
+ return Collections.unmodifiableList(serviceList);
+ }
+
+ @Override // Object
+ public synchronized String toString() {
+ return super.toString() + "; current service " + activeService
+ + "; queued service count=" + serviceList.size();
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/package-info.java
new file mode 100644
index 0000000..c418857
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/workflow/package-info.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+
+
+This package contains classes which can be aggregated to build up
+complex workflows of services: sequences of operations, callbacks
+and composite services with a shared lifespan.
+
+
+Core concepts:
+
+
+The Workflow Services are set of Hadoop YARN services, all implementing
+the {@link org.apache.hadoop.service.Service} API.
+They are designed to be aggregated, to be composed to produce larger
+composite services which than perform ordered operations, notify other services
+when work has completed, and to propagate failure up the service hierarchy.
+
+
+Service instances may a limited lifespan, and may self-terminate when
+they consider it appropriate.
+
+
+Workflow Services that have children implement the
+{@link org.apache.hadoop.service.ServiceParent}
+class, which provides (thread-safe) access to the children -allowing new
+children to be added, and existing children to be enumerated.
+They implement policies
+on how to react to the termination of children -so can sequence operations
+which terminate themselves when complete.
+
+
+Workflow Services may be subclassed to extend their behavior, or to use them
+in specific applications. Just as the standard
+{@link org.apache.hadoop.service.CompositeService}
+is often subclassed to aggregate child services, the
+{@link org.apache.hadoop.service.workflow.WorkflowCompositeService}
+can be used instead -adding the feature that failing services trigger automatic
+parent shutdown. If that is the desired operational mode of a class,
+swapping the composite service implementation may be sufficient to adopt it.
+
+
+
How do the workflow services differ from the YARN Composite Service?
+
+There was originally one YARN service for managing children, the
+{@link org.apache.hadoop.service.CompositeService}.
+
+The {@link org.apache.hadoop.service.workflow.WorkflowCompositeService}
+shares the same model of "child services, all inited and started together".
+Where it differs is that if any child service stops -either due to a failure
+or to an action which invokes that service's
+{@link org.apache.hadoop.service.Service#stop()} method.
+
+In contrast, the original {@link org.apache.hadoop.service.CompositeService}
+class starts its children in its
+{@link org.apache.hadoop.service.Service#start()}
+method, but does not listen or react to any child service halting.
+
+
+As a result, changes in child
+state are not automatically detected or propagated, other than failures in
+the actual init() and start() methods.
+
+
+If a child service runs until completed; is it will not be stopped until
+instructed to do so, and if it is only the parent service that attempts to
+stop the child, then this difference is unimportant.
+
+
+However, if any service that depends upon all it child services running -
+and if those child services are written so as to stop when they fail, using
+the WorkflowCompositeService as a base class will enable the
+parent service to be automatically notified of a child stopping.
+
+
+The {@link org.apache.hadoop.service.workflow.WorkflowSequenceService}
+resembles the composite service in API, but its workflow is different. It
+initializes and starts its children one-by-one, only starting the second after
+the first one succeeds, the third after the second, etc. If any service in
+the sequence fails, the parent WorkflowSequenceService stops,
+reporting the same exception.
+
+
+The {@link org.apache.hadoop.service.workflow.WorkflowForkedProcessService}:
+Executes a process when started, and binds to the life of that process. When the
+process terminates, so does the service -and vice versa. This service enables
+external processes to be executed as part of a sequence of operations -or,
+using the {@link org.apache.hadoop.service.workflow.WorkflowCompositeService}
+in parallel with other services, terminating the process when the other services
+stop -and vice versa.
+
+
+The {@link org.apache.hadoop.service.workflow.WorkflowCallbackService}
+executes a {@link java.util.concurrent.Callable} callback a specified delay
+after the service is started, then potentially terminates itself.
+This is useful for callbacks when a workflow reaches a specific point
+-or simply for executing arbitrary code in the workflow.
+
+
+
+Other Workflow Services
+
+
+There are some minor services that have proven useful within aggregate
+workflows, and simply in applications which are built from composite YARN
+services.
+
+
+ -
+ {@link org.apache.hadoop.service.workflow.WorkflowRpcService }:
+ Maintains a reference to an RPC {@link org.apache.hadoop.ipc.Server} instance.
+ When the service is started, so is the RPC server. Similarly, when the service
+ is stopped, so is the RPC server instance.
+
+
+ -
+ {@link org.apache.hadoop.service.workflow.WorkflowClosingService}: Closes
+ an instance of {@link java.io.Closeable} when the service is stopped. This
+ is purely a housekeeping class.
+
+
+
+
+ Lower-level classes
+
+
+ -
+ {@link org.apache.hadoop.service.workflow.ServiceTerminatingRunnable }:
+ A {@link java.lang.Runnable} which runs the runnable supplied in its
+ constructor then signals its owning service to stop once that runnable
+ is completed.
+ Any exception raised in the run is stored.
+
+
+ -
+ {@link org.apache.hadoop.service.workflow.AbstractWorkflowExecutorService}:
+ A base class for services that wish to have a
+ {@link java.util.concurrent.ExecutorService}
+ with a lifespan mapped to that of a service. When the service is stopped, the
+ {@link java.util.concurrent.ExecutorService#shutdownNow()} method is called to
+ attempt to shut down all running tasks.
+
+
+ -
+ {@link org.apache.hadoop.service.workflow.ServiceThreadFactory}:
+ This is a simple {@link java.util.concurrent.ThreadFactory} which generates
+ meaningful thread names. It can be used as a parameter to constructors of
+ {@link java.util.concurrent.ExecutorService} instances, to ensure that
+ log information can tie back text to the related services
+
+
+ */
+
+package org.apache.hadoop.service.workflow;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/EndOfServiceWaiter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/EndOfServiceWaiter.java
new file mode 100644
index 0000000..8416faa
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/EndOfServiceWaiter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.junit.Assert;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * a {@link ServiceStateChangeListener} that waits for a service to stop;
+ */
+public class EndOfServiceWaiter implements ServiceStateChangeListener {
+
+ private final AtomicBoolean finished = new AtomicBoolean(false);
+
+ public EndOfServiceWaiter(Service svc) {
+ svc.registerServiceListener(this);
+ }
+
+ /**
+ * Wait for a service to stop. Raises an assertion if the
+ * service does not stop in the time period.
+ * @param timeout time to wait in millis before failing.
+ * @throws InterruptedException
+ */
+ public synchronized void waitForServiceToStop(long timeout) throws
+ InterruptedException {
+ if (!finished.get()) {
+ wait(timeout);
+ }
+ Assert.assertTrue("Service did not finish in time period",
+ finished.get());
+ }
+
+ @Override
+ public synchronized void stateChanged(Service service) {
+ if (service.isInState(Service.STATE.STOPPED)) {
+ finished.set(true);
+ notify();
+ }
+ }
+
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/MockService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/MockService.java
new file mode 100644
index 0000000..823340c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/MockService.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.ServiceStateException;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Mock service which can be set to fail at different points
+ */
+public class MockService extends AbstractService {
+ private final boolean fail;
+ private final int lifespan;
+ private final ExecutorService executorService =
+ Executors.newSingleThreadExecutor();
+
+ /**
+ * Create with an infinite lifespan and no failures
+ */
+ MockService() {
+ this("mock", false, -1);
+ }
+
+ /**
+ * Create with a name, optional failure option and a lifespan in millis
+ * @param name
+ * @param fail
+ * @param lifespan
+ */
+ MockService(String name, boolean fail, int lifespan) {
+ super(name);
+ this.fail = fail;
+ this.lifespan = lifespan;
+ }
+
+ /**
+ * Start the service. This will start an asynchronous thread (in an executor service)
+ * which will sleep for the given time period.
+ * @throws Exception on any failure
+ */
+ @Override
+ protected void serviceStart() throws Exception {
+ //act on the lifespan here
+ if (lifespan > 0) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(lifespan);
+ } catch (InterruptedException ignored) {
+
+ }
+ finish();
+ }
+ });
+ } else {
+ if (lifespan == 0) {
+ finish();
+ } else {
+ //continue until told not to
+ }
+ }
+ }
+
+ /**
+ * Finish the service
+ */
+ private void finish() {
+ if (fail) {
+ ServiceStateException e =
+ new ServiceStateException(getName() + " failed");
+
+ noteFailure(e);
+ stop();
+ throw e;
+ } else {
+ stop();
+ }
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ParentWorkflowTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ParentWorkflowTestBase.java
new file mode 100644
index 0000000..499db22
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ParentWorkflowTestBase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceParent;
+
+/**
+ * Extends {@link WorkflowServiceTestBase} with parent-specific operations
+ * and logic to build up and run the parent service
+ */
+public abstract class ParentWorkflowTestBase extends WorkflowServiceTestBase {
+
+ /**
+ * Wait a second for the service parent to stop
+ * @param parent the service to wait for
+ */
+ protected void waitForParentToStop(ServiceParent parent) {
+ waitForParentToStop(parent, 1000);
+ }
+
+ /**
+ * Wait for the service parent to stop
+ * @param parent the service to wait for
+ * @param timeout time in milliseconds
+ */
+ protected void waitForParentToStop(ServiceParent parent, int timeout) {
+ boolean stop = parent.waitForServiceToStop(timeout);
+ if (!stop) {
+ logState(parent);
+ fail("Service failed to stop : after " + timeout + " millis " + parent);
+ }
+ }
+
+ /**
+ * Subclasses are require to implement this and return an instance of a
+ * ServiceParent
+ * @param services a possibly empty list of services
+ * @return an inited -but -not-started- service parent instance
+ */
+ protected abstract ServiceParent buildService(Service... services);
+
+ /**
+ * Use {@link #buildService(Service...)} to create service and then start it
+ * @param services
+ * @return
+ */
+ protected ServiceParent startService(Service... services) {
+ ServiceParent parent = buildService(services);
+ //expect service to start and stay started
+ parent.start();
+ return parent;
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ProcessCommandFactory.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ProcessCommandFactory.java
new file mode 100644
index 0000000..5e5dc19
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/ProcessCommandFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A source of commands, with the goal being to allow for adding different
+ * implementations for different platforms
+ */
+public class ProcessCommandFactory {
+
+ protected ProcessCommandFactory() {
+ }
+
+ /**
+ * The command to list a directory
+ * @param dir directory
+ * @return commands
+ */
+ public List ls(File dir) {
+ List commands = new ArrayList<>(5);
+ commands.add("ls");
+ commands.add("-1");
+ commands.add(dir.getAbsolutePath());
+ return commands;
+ }
+
+ /**
+ * Echo some text to stdout
+ * @param text text
+ * @return commands
+ */
+ public List echo(String text) {
+ List commands = new ArrayList<>(5);
+ commands.add("echo");
+ commands.add(text);
+ return commands;
+ }
+
+ /**
+ * print env variables
+ * @return commands
+ */
+ public List env() {
+ List commands = new ArrayList<>(1);
+ commands.add("env");
+ return commands;
+ }
+
+ /**
+ * execute a command that returns with an error code that will
+ * be converted into a number
+ * @return commands
+ */
+ public List exitFalse() {
+ List commands = new ArrayList<>(2);
+ commands.add("false");
+ return commands;
+ }
+
+ /**
+ * Create a process command factory for this OS
+ * @return
+ */
+ public static ProcessCommandFactory createProcessCommandFactory() {
+ return new ProcessCommandFactory();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/SimpleRunnable.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/SimpleRunnable.java
new file mode 100644
index 0000000..d948c68
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/SimpleRunnable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+/**
+ * Test runnable that can be made to exit, or throw an exception
+ * during its run
+ */
+class SimpleRunnable implements Runnable {
+ boolean throwException = false;
+
+ /**
+ * Create an instance
+ */
+ SimpleRunnable() {
+ }
+
+ /**
+ * Create an instance
+ * @param throwException throw an exception in the run operation
+ */
+ SimpleRunnable(boolean throwException) {
+ this.throwException = throwException;
+ }
+
+ @Override
+ public synchronized void run() {
+ try {
+ if (throwException) {
+ throw new RuntimeException("SimpleRunnable");
+ }
+ } finally {
+ this.notify();
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestLongLivedProcess.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestLongLivedProcess.java
new file mode 100644
index 0000000..a323ca5
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestLongLivedProcess.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * Test the {@link LongLivedProcess} class
+ */
+public class TestLongLivedProcess extends WorkflowServiceTestBase implements
+ LongLivedProcessLifecycleEvent {
+ private static final Logger
+ LOG = LoggerFactory.getLogger(TestLongLivedProcess.class);
+
+ private LongLivedProcess process;
+ private File testDir = new File("target");
+ private ProcessCommandFactory commandFactory;
+ private volatile boolean started, stopped;
+
+ @Before
+ public void setupProcesses() {
+ commandFactory = ProcessCommandFactory.createProcessCommandFactory();
+ }
+
+ @After
+ public void stopProcesses() {
+ if (process != null) {
+ process.stop();
+ }
+ }
+
+ @Test
+ public void testLs() throws Throwable {
+
+ initProcess(commandFactory.ls(testDir));
+ process.start();
+ //in-thread wait
+ process.run();
+
+ //here stopped
+ assertTrue("process start callback not received", started);
+ assertTrue("process stop callback not received", stopped);
+ assertFalse(process.isRunning());
+ assertEquals(0, process.getExitCode().intValue());
+
+ assertStringInOutput("test-classes", getFinalOutput());
+ }
+
+ @Test
+ public void testExitCodes() throws Throwable {
+
+ initProcess(commandFactory.exitFalse());
+ process.start();
+ //in-thread wait
+ process.run();
+
+ //here stopped
+
+ assertFalse(process.isRunning());
+ int exitCode = process.getExitCode();
+ assertTrue(exitCode != 0);
+ int corrected = process.getExitCodeSignCorrected();
+
+ assertEquals(1, corrected);
+ }
+
+ @Test
+ public void testEcho() throws Throwable {
+
+ String echoText = "hello, world";
+ initProcess(commandFactory.echo(echoText));
+ process.start();
+ //in-thread wait
+ process.run();
+
+ //here stopped
+ assertTrue("process stop callback not received", stopped);
+ assertEquals(0, process.getExitCode().intValue());
+ assertStringInOutput(echoText, getFinalOutput());
+ }
+
+ @Test
+ public void testSetenv() throws Throwable {
+
+ String var = "TEST_RUN";
+ String val = "TEST-RUN-ENV-VALUE";
+ initProcess(commandFactory.env());
+ process.setEnv(var, val);
+ process.start();
+ //in-thread wait
+ process.run();
+
+ //here stopped
+ assertTrue("process stop callback not received", stopped);
+ assertEquals(0, process.getExitCode().intValue());
+ assertStringInOutput(val, getFinalOutput());
+ }
+
+ /**
+ * Get the final output. includes a quick sleep for the tail output
+ * @return the last output
+ */
+ private List getFinalOutput() {
+ return process.getRecentOutput();
+ }
+
+ private LongLivedProcess initProcess(List commands) {
+ process = new LongLivedProcess(name.getMethodName(), LOG, commands);
+ process.setLifecycleCallback(this);
+ return process;
+ }
+
+ /**
+ * Handler for callback events on the process
+ */
+
+ @Override
+ public void onProcessStarted(LongLivedProcess process) {
+ started = true;
+ }
+
+ /**
+ * Handler for callback events on the process
+ */
+ @Override
+ public void onProcessExited(LongLivedProcess process,
+ int exitCode,
+ int signCorrectedCode) {
+ stopped = true;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowClosingService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowClosingService.java
new file mode 100644
index 0000000..630e352
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowClosingService.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Test the {@link WorkflowClosingService}
+ */
+public class TestWorkflowClosingService extends WorkflowServiceTestBase {
+
+ @Test
+ public void testSimpleClose() throws Throwable {
+ WorkflowClosingService svc = instance(false);
+ OpenClose openClose = svc.getCloseable();
+ assertFalse(openClose.closed);
+ svc.stop();
+ assertTrue(openClose.closed);
+ }
+
+ @Test
+ public void testNullClose() throws Throwable {
+ WorkflowClosingService
+ svc = new WorkflowClosingService<>(null);
+ svc.init(new Configuration());
+ svc.start();
+ assertNull(svc.getCloseable());
+ svc.stop();
+ }
+
+ @Test
+ public void testFailingClose() throws Throwable {
+ WorkflowClosingService svc = instance(false);
+ OpenClose openClose = svc.getCloseable();
+ openClose.raiseExceptionOnClose = true;
+ svc.stop();
+ assertTrue(openClose.closed);
+ Throwable cause = svc.getFailureCause();
+ assertNotNull(cause);
+
+ //retry should be a no-op
+ svc.close();
+ }
+
+ @Test
+ public void testDoubleClose() throws Throwable {
+ WorkflowClosingService svc = instance(false);
+ OpenClose openClose = svc.getCloseable();
+ openClose.raiseExceptionOnClose = true;
+ svc.stop();
+ assertTrue(openClose.closed);
+ Throwable cause = svc.getFailureCause();
+ assertNotNull(cause);
+ openClose.closed = false;
+ svc.stop();
+ assertEquals(cause, svc.getFailureCause());
+ }
+
+ /**
+ * This does not recurse forever, as the service has already entered the
+ * STOPPED state before the inner close tries to stop it -that operation
+ * is a no-op
+ * @throws Throwable
+ */
+ @Test
+ public void testCloseSelf() throws Throwable {
+ WorkflowClosingService svc =
+ new WorkflowClosingService<>(null);
+ svc.setCloseable(svc);
+ svc.stop();
+ }
+
+
+ private WorkflowClosingService instance(boolean raiseExceptionOnClose) {
+ WorkflowClosingService
+ svc = new WorkflowClosingService(new OpenClose(
+ raiseExceptionOnClose));
+ svc.init(new Configuration());
+ svc.start();
+ return svc;
+ }
+
+ private static class OpenClose implements Closeable {
+ public boolean closed = false;
+ public boolean raiseExceptionOnClose;
+
+ private OpenClose(boolean raiseExceptionOnClose) {
+ this.raiseExceptionOnClose = raiseExceptionOnClose;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ if (raiseExceptionOnClose) {
+ throw new IOException("OpenClose");
+ }
+ }
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowCompositeService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowCompositeService.java
new file mode 100644
index 0000000..352f6d9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowCompositeService.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceParent;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the {@link org.apache.hadoop.service.workflow.WorkflowCompositeService}
+ */
+public class TestWorkflowCompositeService extends ParentWorkflowTestBase {
+ private static final Logger
+ log = LoggerFactory.getLogger(TestWorkflowCompositeService.class);
+
+ @Test
+ public void testSingleChild() throws Throwable {
+ Service parent = startService(new MockService());
+ parent.stop();
+ }
+
+ @Test
+ public void testSingleChildTerminating() throws Throwable {
+ ServiceParent parent =
+ startService(new MockService("1", false, 100));
+ waitForParentToStop(parent);
+ }
+
+ @Test
+ public void testSingleChildFailing() throws Throwable {
+ ServiceParent parent =
+ startService(new MockService("1", true, 100));
+ waitForParentToStop(parent);
+ assert parent.getFailureCause() != null;
+ }
+
+ @Test
+ public void testTwoChildren() throws Throwable {
+ MockService one = new MockService("one", false, 100);
+ MockService two = new MockService("two", false, 100);
+ ServiceParent parent = startService(one, two);
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertStopped(two);
+ }
+
+ @Test
+ public void testCallableChild() throws Throwable {
+
+ MockService one = new MockService("one", false, 100);
+ CallableHandler handler = new CallableHandler("hello");
+ WorkflowCallbackService ens =
+ new WorkflowCallbackService("handler", handler, 100, true);
+ MockService two = new MockService("two", false, 100);
+ ServiceParent parent = startService(one, ens, two);
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertStopped(ens);
+ assertStopped(two);
+ assertTrue(handler.notified);
+ String s = ens.getScheduledFuture().get();
+ assertEquals("hello", s);
+ }
+
+ @Test
+ public void testNestedComposite() throws Throwable {
+ MockService one = new MockService("one", false, 100);
+ MockService two = new MockService("two", false, 100);
+ ServiceParent parent = buildService(one, two);
+ ServiceParent outer = startService(parent);
+ assertTrue(outer.waitForServiceToStop(1000));
+ assertStopped(one);
+ assertStopped(two);
+ }
+
+ @Test
+ public void testFailingComposite() throws Throwable {
+ MockService one = new MockService("one", true, 10);
+ MockService two = new MockService("two", false, 1000);
+ ServiceParent parent = startService(one, two);
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertStopped(two);
+ assertNotNull(one.getFailureCause());
+ assertNotNull(parent.getFailureCause());
+ assertEquals(one.getFailureCause(), parent.getFailureCause());
+ }
+
+ @Override
+ public ServiceParent buildService(Service... services) {
+ ServiceParent parent =
+ new WorkflowCompositeService("test", services);
+ parent.init(new Configuration());
+ return parent;
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowExecutorService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowExecutorService.java
new file mode 100644
index 0000000..5b3fcea
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowExecutorService.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.junit.Test;
+
+/**
+ * Test the {@link AbstractWorkflowExecutorService}
+ */
+public class TestWorkflowExecutorService extends WorkflowServiceTestBase {
+
+ @Test
+ public void testAsyncRun() throws Throwable {
+
+ ExecutorSvc svc = run(new ExecutorSvc());
+ ServiceTerminatingRunnable runnable =
+ new ServiceTerminatingRunnable(svc, new SimpleRunnable());
+
+ // synchronous in-thread execution
+ svc.execute(runnable);
+ Thread.sleep(1000);
+ assertStopped(svc);
+ }
+
+ @Test
+ public void testFailureRun() throws Throwable {
+
+ ExecutorSvc svc = run(new ExecutorSvc());
+ ServiceTerminatingRunnable runnable =
+ new ServiceTerminatingRunnable(svc, new SimpleRunnable(true));
+
+ // synchronous in-thread execution
+ svc.execute(runnable);
+ Thread.sleep(1000);
+ assertStopped(svc);
+ assertNotNull(runnable.getException());
+ }
+
+ private static class ExecutorSvc extends AbstractWorkflowExecutorService {
+ private ExecutorSvc() {
+ super("ExecutorService",
+ ServiceThreadFactory.singleThreadExecutor("test", true));
+ }
+
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowForkedProcessService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowForkedProcessService.java
new file mode 100644
index 0000000..59d4721
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowForkedProcessService.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the {@link WorkflowForkedProcessService}
+ */
+public class TestWorkflowForkedProcessService extends WorkflowServiceTestBase {
+
+ private WorkflowForkedProcessService process;
+ private File testDir = new File("target");
+ private ProcessCommandFactory commandFactory;
+ private Map env = new HashMap<>();
+
+ @Before
+ public void setupProcesses() {
+ commandFactory = ProcessCommandFactory.createProcessCommandFactory();
+ }
+
+ @After
+ public void stopProcesses() {
+ ServiceOperations.stop(process);
+ }
+
+ @Test
+ public void testLs() throws Throwable {
+ skipOnWindows();
+ initProcess(commandFactory.ls(testDir));
+ assertExecCompletes(0);
+ // assert that the service did not fail
+ assertNull(process.getFailureCause());
+ }
+
+ @Test
+ public void testExitCodes() throws Throwable {
+ skipOnWindows();
+ initProcess(commandFactory.exitFalse());
+ assertExecCompletes(1);
+ // assert that the exit code was uprated to a service failure
+ assertNotNull("process failure cause is null",
+ process.getFailureCause());
+ }
+
+ @Test
+ public void testEcho() throws Throwable {
+ skipOnWindows();
+ String echoText = "hello, world";
+ initProcess(commandFactory.echo(echoText));
+ assertExecCompletes(0);
+ assertStringInOutput(echoText, getFinalOutput());
+ }
+
+ protected void assertExecCompletes(int expected) throws InterruptedException {
+ exec();
+ assertFalse("process is still running", process.isProcessRunning());
+ int exitCode = process.getExitCode();
+ assertEquals("process exit code is zero: " + process.toString(),
+ expected, exitCode);
+ }
+
+ @Test
+ public void testSetenv() throws Throwable {
+ String var = "TEST_RUN";
+ String val = "TEST-RUN-ENV-VALUE";
+ env.put(var, val);
+ initProcess(commandFactory.env());
+ assertExecCompletes(0);
+ assertStringInOutput(val, getFinalOutput());
+ }
+
+ /**
+ * Get the final output. includes a quick sleep for the tail output
+ * @return the last output
+ */
+ private List getFinalOutput() {
+ return process.getRecentOutput();
+ }
+
+ private WorkflowForkedProcessService initProcess(List commands)
+ throws IOException {
+ process = new WorkflowForkedProcessService(name.getMethodName(), env,
+ commands);
+ process.init(new Configuration());
+ return process;
+ }
+
+ public void exec() throws InterruptedException {
+ assertNotNull(process);
+ EndOfServiceWaiter waiter = new EndOfServiceWaiter(process);
+ process.start();
+ waiter.waitForServiceToStop(5000);
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowRpcService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowRpcService.java
new file mode 100644
index 0000000..f00a62f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowRpcService.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Test the {@link WorkflowRpcService}
+ */
+public class TestWorkflowRpcService extends WorkflowServiceTestBase {
+
+ @Test
+ public void testCreateMockRPCService() throws Throwable {
+ MockRPC rpc = new MockRPC();
+ rpc.start();
+ assertTrue(rpc.started);
+ rpc.getListenerAddress();
+ rpc.stop();
+ assertTrue(rpc.stopped);
+ }
+
+ @Test
+ public void testLifecycle() throws Throwable {
+ MockRPC rpc = new MockRPC();
+ WorkflowRpcService svc = new WorkflowRpcService("test", rpc);
+ run(svc);
+ assertTrue(rpc.started);
+ svc.getConnectAddress();
+ svc.stop();
+ assertTrue(rpc.stopped);
+ }
+
+ @Test
+ public void testStartFailure() throws Throwable {
+ MockRPC rpc = new MockRPC();
+ rpc.failOnStart = true;
+ WorkflowRpcService svc = new WorkflowRpcService("test", rpc);
+ svc.init(new Configuration());
+ try {
+ svc.start();
+ fail("expected an exception");
+ } catch (RuntimeException e) {
+ assertEquals("failOnStart", e.getMessage());
+ }
+ svc.stop();
+ assertTrue(rpc.stopped);
+ }
+
+ /**
+ * Mock RPC server; can be set to fail on startup
+ */
+ private static class MockRPC extends Server {
+
+ public boolean stopped;
+ public boolean started;
+ /**
+ * Flag to indicate that the server should fail when started
+ */
+ public boolean failOnStart;
+
+ private MockRPC() throws IOException {
+ super("localhost", 0, null, 1, new Configuration());
+ }
+
+ @Override
+ public synchronized void start() {
+ if (failOnStart) {
+ throw new RuntimeException("failOnStart");
+ }
+ started = true;
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ stopped = true;
+ super.stop();
+ }
+
+ @Override
+ public synchronized InetSocketAddress getListenerAddress() {
+ return super.getListenerAddress();
+ }
+
+ @Override
+ public Writable call(RPC.RpcKind rpcKind,
+ String protocol,
+ Writable param,
+ long receiveTime) throws Exception {
+ return null;
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowSequenceService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowSequenceService.java
new file mode 100644
index 0000000..4fa21eb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowSequenceService.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceParent;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the {@link WorkflowSequenceService}
+ */
+public class TestWorkflowSequenceService extends ParentWorkflowTestBase {
+ private static final Logger
+ log = LoggerFactory.getLogger(TestWorkflowSequenceService.class);
+
+ @Test
+ public void testSingleSequence() throws Throwable {
+ ServiceParent parent = startService(new MockService());
+ parent.stop();
+ }
+
+ @Test
+ public void testEmptySequence() throws Throwable {
+ ServiceParent parent = startService();
+ waitForParentToStop(parent);
+ }
+
+ @Test
+ public void testSequence() throws Throwable {
+ MockService one = new MockService("one", false, 100);
+ MockService two = new MockService("two", false, 100);
+ ServiceParent parent = startService(one, two);
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertStopped(two);
+ assert ((WorkflowSequenceService) parent).getPreviousService().equals(two);
+ }
+
+ @Test
+ public void testCallableChild() throws Throwable {
+
+ MockService one = new MockService("one", false, 100);
+ CallableHandler handler = new CallableHandler("hello");
+ WorkflowCallbackService ens =
+ new WorkflowCallbackService("handler", handler, 100, true);
+ MockService two = new MockService("two", false, 100);
+ ServiceParent parent = startService(one, ens, two);
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertStopped(ens);
+ assertStopped(two);
+ assertTrue(handler.notified);
+ String s = ens.getScheduledFuture().get();
+ assertEquals("hello", s);
+ }
+
+
+ @Test
+ public void testFailingSequence() throws Throwable {
+ MockService one = new MockService("one", true, 100);
+ MockService two = new MockService("two", false, 100);
+ WorkflowSequenceService parent =
+ (WorkflowSequenceService) startService(one, two);
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertInState(two, Service.STATE.NOTINITED);
+ assertEquals(one, parent.getPreviousService());
+ }
+
+ @Test
+ public void testFailInStartNext() throws Throwable {
+ MockService one = new MockService("one", false, 100);
+ MockService two = new MockService("two", true, 0);
+ MockService three = new MockService("3", false, 0);
+ ServiceParent parent = startService(one, two, three);
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertStopped(two);
+ Throwable failureCause = two.getFailureCause();
+ assertNotNull(failureCause);
+ Throwable parentFailureCause = parent.getFailureCause();
+ assertNotNull(parentFailureCause);
+ assertEquals(parentFailureCause, failureCause);
+ assertInState(three, Service.STATE.NOTINITED);
+ }
+
+ @Test
+ public void testSequenceInSequence() throws Throwable {
+ MockService one = new MockService("one", false, 100);
+ MockService two = new MockService("two", false, 100);
+ ServiceParent parent = buildService(one, two);
+ ServiceParent outer = startService(parent);
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertStopped(two);
+ }
+
+ @Test
+ public void testVarargsConstructor() throws Throwable {
+ MockService one = new MockService("one", false, 100);
+ MockService two = new MockService("two", false, 100);
+ ServiceParent parent = new WorkflowSequenceService("test", one, two);
+ parent.init(new Configuration());
+ parent.start();
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertStopped(two);
+ }
+
+ @Test
+ public void testAddChild() throws Throwable {
+ MockService one = new MockService("one", false, 5000);
+ MockService two = new MockService("two", false, 100);
+ ServiceParent parent = startService(one, two);
+ CallableHandler handler = new CallableHandler("hello");
+ WorkflowCallbackService ens =
+ new WorkflowCallbackService("handler", handler, 100, true);
+ parent.addService(ens);
+ waitForParentToStop(parent, 10000);
+ assertStopped(one);
+ assertStopped(two);
+ assertStopped(ens);
+ assertStopped(two);
+ assertEquals("hello", ens.getScheduledFuture().get());
+ }
+
+ public WorkflowSequenceService buildService(Service... services) {
+ WorkflowSequenceService parent =
+ new WorkflowSequenceService("test", services);
+ parent.init(new Configuration());
+ return parent;
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowServiceTerminatingRunnable.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowServiceTerminatingRunnable.java
new file mode 100644
index 0000000..a66ef53
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/TestWorkflowServiceTerminatingRunnable.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.junit.Test;
+
+/**
+ * Test the {@link TestWorkflowServiceTerminatingRunnable}
+ */
+public class TestWorkflowServiceTerminatingRunnable extends WorkflowServiceTestBase {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoservice() throws Throwable {
+ new ServiceTerminatingRunnable(null, new SimpleRunnable());
+ }
+
+ @Test
+ public void testBasicRun() throws Throwable {
+
+ WorkflowCompositeService svc = run(new WorkflowCompositeService());
+ ServiceTerminatingRunnable runnable = new ServiceTerminatingRunnable(svc,
+ new SimpleRunnable());
+
+ // synchronous in-thread execution
+ runnable.run();
+ assertStopped(svc);
+ }
+
+ @Test
+ public void testFailureRun() throws Throwable {
+
+ WorkflowCompositeService svc = run(new WorkflowCompositeService());
+ ServiceTerminatingRunnable runnable =
+ new ServiceTerminatingRunnable(svc, new SimpleRunnable(true));
+
+ // synchronous in-thread execution
+ runnable.run();
+ assertStopped(svc);
+ assertNotNull(runnable.getException());
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/WorkflowServiceTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/WorkflowServiceTestBase.java
new file mode 100644
index 0000000..3f0fa32
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/workflow/WorkflowServiceTestBase.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceParent;
+import org.apache.hadoop.util.Shell;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Test base for workflow service tests.
+ */
+public abstract class WorkflowServiceTestBase extends Assert {
+ private static final Logger
+ LOG = LoggerFactory.getLogger(WorkflowServiceTestBase.class);
+
+ /**
+ * Set the timeout for every test
+ */
+ @Rule
+ public Timeout testTimeout = new Timeout(15000);
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Before
+ public void nameThread() {
+ Thread.currentThread().setName("JUnit");
+ }
+
+ /**
+ * Assert that a service is in a state
+ * @param service service
+ * @param expected expected state
+ */
+ protected void assertInState(Service service, Service.STATE expected) {
+ Service.STATE actual = service.getServiceState();
+ if (actual != expected) {
+ fail("Service " + service.getName() + " in state " + actual
+ + " -expected " + expected);
+ }
+ }
+
+ /**
+ * assert that a service has stopped.
+ * @param service service to check
+ */
+ protected void assertStopped(Service service) {
+ assertInState(service, Service.STATE.STOPPED);
+ }
+
+ /**
+ * Log the state of a service parent, and that of all its children
+ * @param parent
+ */
+ protected void logState(ServiceParent parent) {
+ logService(parent);
+ for (Service s : parent.getServices()) {
+ logService(s);
+ }
+ }
+
+ /**
+ * Log details about a service, including any failure cause.
+ * @param service service to log
+ */
+ protected void logService(Service service) {
+ LOG.info(service.toString());
+ Throwable failureCause = service.getFailureCause();
+ if (failureCause != null) {
+ LOG.info("Failed in state {} with {}", service.getFailureState(),
+ failureCause, failureCause);
+ }
+ }
+
+ /**
+ * Init and start a service
+ * @param svc the service
+ * @return the service
+ */
+ protected S run(S svc) {
+ svc.init(new Configuration());
+ svc.start();
+ return svc;
+ }
+
+ /**
+ * Handler for callable events
+ */
+ public static class CallableHandler implements Callable {
+ public volatile boolean notified = false;
+ public final String result;
+
+ public CallableHandler(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public String call() throws Exception {
+ LOG.info("CallableHandler::call");
+ notified = true;
+ return result;
+ }
+ }
+
+ /**
+ * Assert that a string is in an output list. Fails fast if the output
+ * list is empty
+ * @param text text to scan for
+ * @param output list of output lines.
+ */
+ public void assertStringInOutput(String text, List output) {
+ assertTrue("Empty output list", !output.isEmpty());
+ boolean found = false;
+ StringBuilder builder = new StringBuilder();
+ for (String s : output) {
+ builder.append(s).append('\n');
+ if (s.contains(text)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ String message =
+ "Text \"" + text + "\" not found in " + output.size() + " lines\n";
+ fail(message + builder);
+ }
+ }
+
+
+ /**
+ * skip a test on windows
+ */
+ public static void skipOnWindows() {
+ if (Shell.WINDOWS) {
+ skip("Not supported on windows");
+ }
+ }
+
+ public static void skip(String message) {
+ LOG.warn("Skipping test: {}", message);
+ Assume.assumeTrue(message, false);
+ }
+}